using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Humanizer; using MfGames.Gallium; using Serilog; namespace MfGames.Nitride.Pipelines; /// /// A wrapper class to handle a pipeline along with various methods for /// tracking the operation of the pipeline and thread coordination. /// /// /// This is intended to only be used within a PipelineManager and there /// should be little reason to use or extend this class. /// public class PipelineRunner { /// /// The manual reset event used to coordinate thread operations. /// private readonly ManualResetEventSlim blockDependencies; /// /// A manual reset event to tell the thread when consumers are done. /// private readonly ManualResetEventSlim consumersDone; private readonly ILogger logger; private DateTime changed; private bool signaledDoneWithInputs; private DateTime started; /// /// Contains the number of consumers we're currently waiting to finish /// processing. /// private int waitingOnConsumers; public PipelineRunner( ILogger logger, IPipeline pipeline) { this.Pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline)); this.Incoming = new List(); this.Outgoing = new List(); this.Outputs = new List(); this.logger = logger.ForContext(); this.blockDependencies = new ManualResetEventSlim(false); this.consumersDone = new ManualResetEventSlim(false); this.started = DateTime.Now; this.changed = DateTime.Now; } /// /// Public factory method for creating a new pipeline manager entry. /// public delegate PipelineRunner Factory(IPipeline pipeline); public TimeSpan ElapsedFromInitialized => DateTime.Now - this.started; public TimeSpan ElapsedFromState => DateTime.Now - this.changed; /// /// A collection of incoming entries that produce data for the pipeline. /// public ICollection Incoming { get; } /// /// Gets a value indicating whether the output of this entry is not /// used by any other pipeline. /// public bool IsFinal => this.Outgoing.Count == 0; /// /// Gets a value indicating whether this pipeline is done running. /// public bool IsFinished => this.State is PipelineRunnerState.Finalized or PipelineRunnerState.Errored; /// /// Gets a value indicating whether this entry is a starting one /// that consumes no data. /// public bool IsStarting => this.Incoming.Count == 0; /// /// The collection of outgoing entries that consume the results of /// the pipeline in this entry. /// public ICollection Outgoing { get; } /// /// Contains the list of all the outputs from this pipeline. This is /// only ensured to be valid after the pipeline is in the `Providing` /// state. /// public List Outputs { get; } /// /// The pipeline associated with the entry. /// public IPipeline Pipeline { get; } /// /// Gets the current state of the pipeline. /// public PipelineRunnerState State { get; private set; } /// /// A method that tells the pipeline that one of the dependencies has /// completed consuming the input. /// public void ConsumerDoneWithOutputs() { int current = Interlocked.Decrement(ref this.waitingOnConsumers); this.logger.Verbose( "{Pipeline:l}: Consumer signalled, waiting for {Count:n0}", this.Pipeline, current); if (current == 0) { this.consumersDone.Set(); } } /// /// Initializes the runner after all external properties have been /// set and configured. /// public void Initialize() { this.ChangeState(PipelineRunnerState.Initialized); } /// /// Executes the pipeline, including waiting for any or all /// dependencies. /// public async Task RunAsync() { try { // Make sure we have a valid state. switch (this.State) { case PipelineRunnerState.Initialized: case PipelineRunnerState.Finalized: break; default: this.logger.Error( "{Pipeline:l}: Pipeline cannot be started in a {State}" + " state (not Initialized or Finalized)", this.Pipeline, this.State); break; } // Prepare ourselves for running. We have a start/stop state because // this may be non-zero time. this.started = DateTime.Now; this.changed = DateTime.Now; this.ChangeState(PipelineRunnerState.Preparing); this.signaledDoneWithInputs = false; this.ChangeState(PipelineRunnerState.Prepared); // Go through the incoming and wait for each of the manual resets // on the dependency pipelines. if (this.WaitForDependencies()) { this.SignalDoneWithInputs(); return; } // Grab the outputs from the incoming. They will be populated // because we have waited for the reset events. this.ChangeState(PipelineRunnerState.Started); List input = this.GatherDependencyOutputs(); // Run the pipeline. This may not be resolved until we gather // the output below. await this.RunPipeline(input); // At this point, we are completely done with our inputs, so signal // to them in case they have to clean up any of their structures. this.SignalDoneWithInputs(); // If we have outgoing runners, provide them data until they are // done. this.SendToDependants(); // Finalize ourselves. this.ChangeState(PipelineRunnerState.Finalized); } catch (Exception exception) { // Report the exception. this.logger.Error( exception, "{Pipeline:l}: There was an exception running pipeline", this.Pipeline); // Change our state and then release any pipeline waiting for us // so they can pick up the error and fail themselves. this.ChangeState(PipelineRunnerState.Errored); this.blockDependencies.Set(); this.SignalDoneWithInputs(); } } /// /// A method to block the call until this runner is done processing and /// is ready to provide output. /// public void WaitUntilProviding() { this.blockDependencies.Wait(); } /// /// Changes the state of the pipeline into a new state. /// /// The state to change the pipeline into. private void ChangeState(PipelineRunnerState newState) { this.logger.Verbose( "{Pipeline:l}: Switching from state {Old} to {New} (elapsed {Elapsed}, duration {Duration})", this.Pipeline, this.State, newState, this.ElapsedFromInitialized, this.ElapsedFromState); this.changed = DateTime.Now; this.State = newState; } private List GatherDependencyOutputs() { if (this.Incoming.Count <= 0) { return new List(); } // Report that we are gathering our outputs. this.logger.Verbose( "{Pipeline:l}: Gathering outputs from {Count:n0} dependencies", this.Pipeline, this.Incoming.Count); var input = this.Incoming.SelectMany(x => x.Outputs) .ToList(); this.logger.Debug( "{Pipeline:l}: Got {Count:l} from dependencies", this.Pipeline, "entity".ToQuantity(input.Count, "N0")); return input; } private async Task RunPipeline(List input) { // Get the sequence of data, but this doesn't drain the enumeration. List output = await this.Pipeline .RunAsync(input) .ToListAsync(); // Gather all the output. this.logger.Debug("{Pipeline:l}: Gathering output", this.Pipeline); this.Outputs.Clear(); this.Outputs.AddRange(output); } private void SendToDependants() { if (this.Outgoing.Count <= 0) { return; } // Make sure our internal wait for the consumers it set. this.logger.Verbose( "{Pipeline:l}: Setting up internal thread controls", this.Pipeline); this.waitingOnConsumers = this.Outgoing.Count; this.consumersDone.Reset(); // Report how many files we're sending out and then use manual // reset and the semaphore to control the threads. this.logger.Debug( "{Pipeline:l}: Output {Count:l} from pipeline", this.Pipeline, "entity".ToQuantity(this.Outputs.Count, "N0")); // Release our manual reset to allow operations to continue. this.ChangeState(PipelineRunnerState.Providing); this.logger.Verbose( "{Pipeline:l}: Release manual reset for consumers", this.Pipeline); this.blockDependencies.Set(); // Wait until all consumers have finished processing. this.consumersDone.Wait(); } private void SignalDoneWithInputs() { if (this.Incoming.Count <= 0 || this.signaledDoneWithInputs) { return; } this.signaledDoneWithInputs = true; this.logger.Debug( "{Pipeline:l}: Signaling {Count:n0} dependencies done", this.Pipeline, this.Incoming.Count); foreach (PipelineRunner? dependency in this.Incoming) { dependency.ConsumerDoneWithOutputs(); } } private bool WaitForDependencies() { if (this.Incoming.Count <= 0) { return false; } // Wait for the dependencies to run first. this.ChangeState(PipelineRunnerState.Waiting); this.logger.Verbose( "{Pipeline:l}: Waiting for {Count:l} to complete", this.Pipeline, "dependency".ToQuantity(this.Incoming.Count)); foreach (PipelineRunner? dependency in this.Incoming) { dependency.WaitUntilProviding(); } // Check for any error state in the dependency, if we have one, // then we need to stop ourselves. bool hasError = this.Incoming.Any(x => x.State == PipelineRunnerState.Errored); if (!hasError) { return false; } this.logger.Error( "{Pipeline:l}: There was an exception in an dependency", this.Pipeline); this.ChangeState(PipelineRunnerState.Errored); this.blockDependencies.Set(); return true; } }