377 lines
13 KiB
C#
377 lines
13 KiB
C#
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Linq;
|
||
|
using System.Threading;
|
||
|
using System.Threading.Tasks;
|
||
|
using Gallium;
|
||
|
using Humanizer;
|
||
|
using Serilog;
|
||
|
|
||
|
namespace Nitride.Pipelines
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// A wrapper class to handle a pipeline along with various methods for
|
||
|
/// tracking the operation of the pipeline and thread coordination.
|
||
|
/// </summary>
|
||
|
/// <remarks>
|
||
|
/// This is intended to only be used within a PipelineManager and there
|
||
|
/// should be little reason to use or extend this class.
|
||
|
/// </remarks>
|
||
|
public class PipelineRunner
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// The manual reset event used to coordinate thread operations.
|
||
|
/// </summary>
|
||
|
private readonly ManualResetEventSlim blockDependencies;
|
||
|
|
||
|
/// <summary>
|
||
|
/// A manual reset event to tell the thread when consumers are done.
|
||
|
/// </summary>
|
||
|
private readonly ManualResetEventSlim consumersDone;
|
||
|
|
||
|
private readonly ILogger logger;
|
||
|
|
||
|
private DateTime changed;
|
||
|
|
||
|
private bool signaledDoneWithInputs;
|
||
|
|
||
|
private DateTime started;
|
||
|
|
||
|
/// <summary>
|
||
|
/// Contains the number of consumers we're currently waiting to finish
|
||
|
/// processing.
|
||
|
/// </summary>
|
||
|
private int waitingOnConsumers;
|
||
|
|
||
|
public PipelineRunner(
|
||
|
ILogger logger,
|
||
|
IPipeline pipeline)
|
||
|
{
|
||
|
this.Pipeline = pipeline
|
||
|
?? throw new ArgumentNullException(nameof(pipeline));
|
||
|
this.Incoming = new List<PipelineRunner>();
|
||
|
this.Outgoing = new List<PipelineRunner>();
|
||
|
this.Outputs = new List<Entity>();
|
||
|
this.logger = logger.ForContext<PipelineRunner>();
|
||
|
this.blockDependencies = new ManualResetEventSlim(false);
|
||
|
this.consumersDone = new ManualResetEventSlim(false);
|
||
|
this.started = DateTime.Now;
|
||
|
this.changed = DateTime.Now;
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Public factory method for creating a new pipeline manager entry.
|
||
|
/// </summary>
|
||
|
public delegate PipelineRunner Factory(IPipeline pipeline);
|
||
|
|
||
|
public TimeSpan ElapsedFromInitialized => DateTime.Now - this.started;
|
||
|
|
||
|
public TimeSpan ElapsedFromState => DateTime.Now - this.changed;
|
||
|
|
||
|
/// <summary>
|
||
|
/// A collection of incoming entries that produce data for the pipeline.
|
||
|
/// </summary>
|
||
|
public ICollection<PipelineRunner> Incoming { get; }
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets a value indicating whether the output of this entry is not
|
||
|
/// used by any other pipeline.
|
||
|
/// </summary>
|
||
|
public bool IsFinal => this.Outgoing.Count == 0;
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets a value indicating whether this pipeline is done running.
|
||
|
/// </summary>
|
||
|
public bool IsFinished => this.State
|
||
|
is PipelineRunnerState.Finalized
|
||
|
or PipelineRunnerState.Errored;
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets a value indicating whether this entry is a starting one
|
||
|
/// that consumes no data.
|
||
|
/// </summary>
|
||
|
public bool IsStarting => this.Incoming.Count == 0;
|
||
|
|
||
|
/// <summary>
|
||
|
/// The collection of outgoing entries that consume the results of
|
||
|
/// the pipeline in this entry.
|
||
|
/// </summary>
|
||
|
public ICollection<PipelineRunner> Outgoing { get; }
|
||
|
|
||
|
/// <summary>
|
||
|
/// Contains the list of all the outputs from this pipeline. This is
|
||
|
/// only ensured to be valid after the pipeline is in the `Providing`
|
||
|
/// state.
|
||
|
/// </summary>
|
||
|
public List<Entity> Outputs { get; }
|
||
|
|
||
|
/// <summary>
|
||
|
/// The pipeline associated with the entry.
|
||
|
/// </summary>
|
||
|
public IPipeline Pipeline { get; }
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets the current state of the pipeline.
|
||
|
/// </summary>
|
||
|
public PipelineRunnerState State { get; private set; }
|
||
|
|
||
|
/// <summary>
|
||
|
/// A method that tells the pipeline that one of the dependencies has
|
||
|
/// completed consuming the input.
|
||
|
/// </summary>
|
||
|
public void ConsumerDoneWithOutputs()
|
||
|
{
|
||
|
int current = Interlocked.Decrement(ref this.waitingOnConsumers);
|
||
|
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Consumer signalled, waiting for {Count:n0}",
|
||
|
this.Pipeline,
|
||
|
current);
|
||
|
|
||
|
if (current == 0)
|
||
|
{
|
||
|
this.consumersDone.Set();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Initializes the runner after all external properties have been
|
||
|
/// set and configured.
|
||
|
/// </summary>
|
||
|
public void Initialize()
|
||
|
{
|
||
|
this.ChangeState(PipelineRunnerState.Initialized);
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Executes the pipeline, including waiting for any or all
|
||
|
/// dependencies.
|
||
|
/// </summary>
|
||
|
public async Task RunAsync()
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
// Make sure we have a valid state.
|
||
|
switch (this.State)
|
||
|
{
|
||
|
case PipelineRunnerState.Initialized:
|
||
|
case PipelineRunnerState.Finalized:
|
||
|
break;
|
||
|
|
||
|
default:
|
||
|
this.logger.Error(
|
||
|
"{Pipeline:l}: Pipeline cannot be started in a {State}"
|
||
|
+ " state (not Initialized or Finalized)",
|
||
|
this.Pipeline,
|
||
|
this.State);
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
// Prepare ourselves for running. We have a start/stop state because
|
||
|
// this may be non-zero time.
|
||
|
this.started = DateTime.Now;
|
||
|
this.changed = DateTime.Now;
|
||
|
this.ChangeState(PipelineRunnerState.Preparing);
|
||
|
this.signaledDoneWithInputs = false;
|
||
|
this.ChangeState(PipelineRunnerState.Prepared);
|
||
|
|
||
|
// Go through the incoming and wait for each of the manual resets
|
||
|
// on the dependency pipelines.
|
||
|
if (this.WaitForDependencies())
|
||
|
{
|
||
|
this.SignalDoneWithInputs();
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Grab the outputs from the incoming. They will be populated
|
||
|
// because we have waited for the reset events.
|
||
|
this.ChangeState(PipelineRunnerState.Started);
|
||
|
List<Entity> input = this.GatherDependencyOutputs();
|
||
|
|
||
|
// Run the pipeline. This may not be resolved until we gather
|
||
|
// the output below.
|
||
|
await this.RunPipeline(input);
|
||
|
|
||
|
// At this point, we are completely done with our inputs, so signal
|
||
|
// to them in case they have to clean up any of their structures.
|
||
|
this.SignalDoneWithInputs();
|
||
|
|
||
|
// If we have outgoing runners, provide them data until they are
|
||
|
// done.
|
||
|
this.SendToDependants();
|
||
|
|
||
|
// Finalize ourselves.
|
||
|
this.ChangeState(PipelineRunnerState.Finalized);
|
||
|
}
|
||
|
catch (Exception exception)
|
||
|
{
|
||
|
// Report the exception.
|
||
|
this.logger.Error(
|
||
|
exception,
|
||
|
"{Pipeline:l}: There was an exception running pipeline",
|
||
|
this.Pipeline);
|
||
|
|
||
|
// Change our state and then release any pipeline waiting for us
|
||
|
// so they can pick up the error and fail themselves.
|
||
|
this.ChangeState(PipelineRunnerState.Errored);
|
||
|
this.blockDependencies.Set();
|
||
|
this.SignalDoneWithInputs();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// A method to block the call until this runner is done processing and
|
||
|
/// is ready to provide output.
|
||
|
/// </summary>
|
||
|
public void WaitUntilProviding()
|
||
|
{
|
||
|
this.blockDependencies.Wait();
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Changes the state of the pipeline into a new state.
|
||
|
/// </summary>
|
||
|
/// <param name="newState">The state to change the pipeline into.</param>
|
||
|
private void ChangeState(PipelineRunnerState newState)
|
||
|
{
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Switching from state {Old} to {New} (elapsed {Elapsed}, duration {Duration})",
|
||
|
this.Pipeline,
|
||
|
this.State,
|
||
|
newState,
|
||
|
this.ElapsedFromInitialized,
|
||
|
this.ElapsedFromState);
|
||
|
this.changed = DateTime.Now;
|
||
|
this.State = newState;
|
||
|
}
|
||
|
|
||
|
private List<Entity> GatherDependencyOutputs()
|
||
|
{
|
||
|
if (this.Incoming.Count <= 0)
|
||
|
{
|
||
|
return new List<Entity>();
|
||
|
}
|
||
|
|
||
|
// Report that we are gathering our outputs.
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Gathering outputs from {Count:n0} dependencies",
|
||
|
this.Pipeline,
|
||
|
this.Incoming.Count);
|
||
|
|
||
|
List<Entity> input = this.Incoming
|
||
|
.SelectMany(x => x.Outputs)
|
||
|
.ToList();
|
||
|
|
||
|
this.logger.Debug(
|
||
|
"{Pipeline:l}: Got {Count:l} from dependencies",
|
||
|
this.Pipeline,
|
||
|
"entity".ToQuantity(input.Count, "N0"));
|
||
|
|
||
|
return input;
|
||
|
}
|
||
|
|
||
|
private async Task RunPipeline(List<Entity> input)
|
||
|
{
|
||
|
IEnumerable<Entity>
|
||
|
output = await this.Pipeline.RunAsync(input);
|
||
|
|
||
|
// Gather all the output.
|
||
|
this.logger.Debug(
|
||
|
"{Pipeline:l}: Gathering output",
|
||
|
this.Pipeline);
|
||
|
this.Outputs.Clear();
|
||
|
this.Outputs.AddRange(output);
|
||
|
}
|
||
|
|
||
|
private void SendToDependants()
|
||
|
{
|
||
|
if (this.Outgoing.Count <= 0)
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Make sure our internal wait for the consumers it set.
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Setting up internal thread controls",
|
||
|
this.Pipeline);
|
||
|
this.waitingOnConsumers = this.Outgoing.Count;
|
||
|
this.consumersDone.Reset();
|
||
|
|
||
|
// Report how many files we're sending out and then use manual
|
||
|
// reset and the semaphore to control the threads.
|
||
|
this.logger.Debug(
|
||
|
"{Pipeline:l}: Output {Count:l} from pipeline",
|
||
|
this.Pipeline,
|
||
|
"entity".ToQuantity(this.Outputs.Count, "N0"));
|
||
|
|
||
|
// Release our manual reset to allow operations to continue.
|
||
|
this.ChangeState(PipelineRunnerState.Providing);
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Release manual reset for consumers",
|
||
|
this.Pipeline);
|
||
|
this.blockDependencies.Set();
|
||
|
|
||
|
// Wait until all consumers have finished processing.
|
||
|
this.consumersDone.Wait();
|
||
|
}
|
||
|
|
||
|
private void SignalDoneWithInputs()
|
||
|
{
|
||
|
if (this.Incoming.Count <= 0 || this.signaledDoneWithInputs)
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this.signaledDoneWithInputs = true;
|
||
|
|
||
|
this.logger.Debug(
|
||
|
"{Pipeline:l}: Signaling {Count:n0} dependencies done",
|
||
|
this.Pipeline,
|
||
|
this.Incoming.Count);
|
||
|
|
||
|
foreach (var dependency in this.Incoming)
|
||
|
{
|
||
|
dependency.ConsumerDoneWithOutputs();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private bool WaitForDependencies()
|
||
|
{
|
||
|
if (this.Incoming.Count <= 0)
|
||
|
{
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
// Wait for the dependencies to run first.
|
||
|
this.ChangeState(PipelineRunnerState.Waiting);
|
||
|
this.logger.Verbose(
|
||
|
"{Pipeline:l}: Waiting for {Count:l} to complete",
|
||
|
this.Pipeline,
|
||
|
"dependency".ToQuantity(this.Incoming.Count));
|
||
|
|
||
|
foreach (var dependency in this.Incoming)
|
||
|
{
|
||
|
dependency.WaitUntilProviding();
|
||
|
}
|
||
|
|
||
|
// Check for any error state in the dependency, if we have one,
|
||
|
// then we need to stop ourselves.
|
||
|
bool hasError = this.Incoming
|
||
|
.Any(x => x.State == PipelineRunnerState.Errored);
|
||
|
|
||
|
if (!hasError)
|
||
|
{
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
this.logger.Error(
|
||
|
"{Pipeline:l}: There was an exception in an dependency",
|
||
|
this.Pipeline);
|
||
|
this.ChangeState(PipelineRunnerState.Errored);
|
||
|
this.blockDependencies.Set();
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
}
|