using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MfGames.Gallium;
using Humanizer;
using Serilog;
namespace MfGames.Nitride.Pipelines;
///
/// A wrapper class to handle a pipeline along with various methods for
/// tracking the operation of the pipeline and thread coordination.
///
///
/// This is intended to only be used within a PipelineManager and there
/// should be little reason to use or extend this class.
///
public class PipelineRunner
{
///
/// The manual reset event used to coordinate thread operations.
///
private readonly ManualResetEventSlim blockDependencies;
///
/// A manual reset event to tell the thread when consumers are done.
///
private readonly ManualResetEventSlim consumersDone;
private readonly ILogger logger;
private DateTime changed;
private bool signaledDoneWithInputs;
private DateTime started;
///
/// Contains the number of consumers we're currently waiting to finish
/// processing.
///
private int waitingOnConsumers;
public PipelineRunner(
ILogger logger,
IPipeline pipeline)
{
this.Pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
this.Incoming = new List();
this.Outgoing = new List();
this.Outputs = new List();
this.logger = logger.ForContext();
this.blockDependencies = new ManualResetEventSlim(false);
this.consumersDone = new ManualResetEventSlim(false);
this.started = DateTime.Now;
this.changed = DateTime.Now;
}
///
/// Public factory method for creating a new pipeline manager entry.
///
public delegate PipelineRunner Factory(IPipeline pipeline);
public TimeSpan ElapsedFromInitialized => DateTime.Now - this.started;
public TimeSpan ElapsedFromState => DateTime.Now - this.changed;
///
/// A collection of incoming entries that produce data for the pipeline.
///
public ICollection Incoming { get; }
///
/// Gets a value indicating whether the output of this entry is not
/// used by any other pipeline.
///
public bool IsFinal => this.Outgoing.Count == 0;
///
/// Gets a value indicating whether this pipeline is done running.
///
public bool IsFinished => this.State is PipelineRunnerState.Finalized or PipelineRunnerState.Errored;
///
/// Gets a value indicating whether this entry is a starting one
/// that consumes no data.
///
public bool IsStarting => this.Incoming.Count == 0;
///
/// The collection of outgoing entries that consume the results of
/// the pipeline in this entry.
///
public ICollection Outgoing { get; }
///
/// Contains the list of all the outputs from this pipeline. This is
/// only ensured to be valid after the pipeline is in the `Providing`
/// state.
///
public List Outputs { get; }
///
/// The pipeline associated with the entry.
///
public IPipeline Pipeline { get; }
///
/// Gets the current state of the pipeline.
///
public PipelineRunnerState State { get; private set; }
///
/// A method that tells the pipeline that one of the dependencies has
/// completed consuming the input.
///
public void ConsumerDoneWithOutputs()
{
int current = Interlocked.Decrement(ref this.waitingOnConsumers);
this.logger.Verbose("{Pipeline:l}: Consumer signalled, waiting for {Count:n0}", this.Pipeline, current);
if (current == 0)
{
this.consumersDone.Set();
}
}
///
/// Initializes the runner after all external properties have been
/// set and configured.
///
public void Initialize()
{
this.ChangeState(PipelineRunnerState.Initialized);
}
///
/// Executes the pipeline, including waiting for any or all
/// dependencies.
///
public async Task RunAsync()
{
try
{
// Make sure we have a valid state.
switch (this.State)
{
case PipelineRunnerState.Initialized:
case PipelineRunnerState.Finalized:
break;
default:
this.logger.Error(
"{Pipeline:l}: Pipeline cannot be started in a {State}"
+ " state (not Initialized or Finalized)",
this.Pipeline,
this.State);
break;
}
// Prepare ourselves for running. We have a start/stop state because
// this may be non-zero time.
this.started = DateTime.Now;
this.changed = DateTime.Now;
this.ChangeState(PipelineRunnerState.Preparing);
this.signaledDoneWithInputs = false;
this.ChangeState(PipelineRunnerState.Prepared);
// Go through the incoming and wait for each of the manual resets
// on the dependency pipelines.
if (this.WaitForDependencies())
{
this.SignalDoneWithInputs();
return;
}
// Grab the outputs from the incoming. They will be populated
// because we have waited for the reset events.
this.ChangeState(PipelineRunnerState.Started);
List input = this.GatherDependencyOutputs();
// Run the pipeline. This may not be resolved until we gather
// the output below.
await this.RunPipeline(input);
// At this point, we are completely done with our inputs, so signal
// to them in case they have to clean up any of their structures.
this.SignalDoneWithInputs();
// If we have outgoing runners, provide them data until they are
// done.
this.SendToDependants();
// Finalize ourselves.
this.ChangeState(PipelineRunnerState.Finalized);
}
catch (Exception exception)
{
// Report the exception.
this.logger.Error(exception, "{Pipeline:l}: There was an exception running pipeline", this.Pipeline);
// Change our state and then release any pipeline waiting for us
// so they can pick up the error and fail themselves.
this.ChangeState(PipelineRunnerState.Errored);
this.blockDependencies.Set();
this.SignalDoneWithInputs();
}
}
///
/// A method to block the call until this runner is done processing and
/// is ready to provide output.
///
public void WaitUntilProviding()
{
this.blockDependencies.Wait();
}
///
/// Changes the state of the pipeline into a new state.
///
/// The state to change the pipeline into.
private void ChangeState(PipelineRunnerState newState)
{
this.logger.Verbose(
"{Pipeline:l}: Switching from state {Old} to {New} (elapsed {Elapsed}, duration {Duration})",
this.Pipeline,
this.State,
newState,
this.ElapsedFromInitialized,
this.ElapsedFromState);
this.changed = DateTime.Now;
this.State = newState;
}
private List GatherDependencyOutputs()
{
if (this.Incoming.Count <= 0)
{
return new List();
}
// Report that we are gathering our outputs.
this.logger.Verbose(
"{Pipeline:l}: Gathering outputs from {Count:n0} dependencies",
this.Pipeline,
this.Incoming.Count);
var input = this.Incoming.SelectMany(x => x.Outputs)
.ToList();
this.logger.Debug(
"{Pipeline:l}: Got {Count:l} from dependencies",
this.Pipeline,
"entity".ToQuantity(input.Count, "N0"));
return input;
}
private async Task RunPipeline(List input)
{
// Get the sequence of data, but this doesn't drain the enumeration.
List output = await this.Pipeline
.RunAsync(input)
.ToListAsync();
// Gather all the output.
this.logger.Debug("{Pipeline:l}: Gathering output", this.Pipeline);
this.Outputs.Clear();
this.Outputs.AddRange(output);
}
private void SendToDependants()
{
if (this.Outgoing.Count <= 0)
{
return;
}
// Make sure our internal wait for the consumers it set.
this.logger.Verbose("{Pipeline:l}: Setting up internal thread controls", this.Pipeline);
this.waitingOnConsumers = this.Outgoing.Count;
this.consumersDone.Reset();
// Report how many files we're sending out and then use manual
// reset and the semaphore to control the threads.
this.logger.Debug(
"{Pipeline:l}: Output {Count:l} from pipeline",
this.Pipeline,
"entity".ToQuantity(this.Outputs.Count, "N0"));
// Release our manual reset to allow operations to continue.
this.ChangeState(PipelineRunnerState.Providing);
this.logger.Verbose("{Pipeline:l}: Release manual reset for consumers", this.Pipeline);
this.blockDependencies.Set();
// Wait until all consumers have finished processing.
this.consumersDone.Wait();
}
private void SignalDoneWithInputs()
{
if (this.Incoming.Count <= 0 || this.signaledDoneWithInputs)
{
return;
}
this.signaledDoneWithInputs = true;
this.logger.Debug("{Pipeline:l}: Signaling {Count:n0} dependencies done", this.Pipeline, this.Incoming.Count);
foreach (PipelineRunner? dependency in this.Incoming)
{
dependency.ConsumerDoneWithOutputs();
}
}
private bool WaitForDependencies()
{
if (this.Incoming.Count <= 0)
{
return false;
}
// Wait for the dependencies to run first.
this.ChangeState(PipelineRunnerState.Waiting);
this.logger.Verbose(
"{Pipeline:l}: Waiting for {Count:l} to complete",
this.Pipeline,
"dependency".ToQuantity(this.Incoming.Count));
foreach (PipelineRunner? dependency in this.Incoming)
{
dependency.WaitUntilProviding();
}
// Check for any error state in the dependency, if we have one,
// then we need to stop ourselves.
bool hasError = this.Incoming.Any(x => x.State == PipelineRunnerState.Errored);
if (!hasError)
{
return false;
}
this.logger.Error("{Pipeline:l}: There was an exception in an dependency", this.Pipeline);
this.ChangeState(PipelineRunnerState.Errored);
this.blockDependencies.Set();
return true;
}
}