fix: code clean up and honoring cancellation

This commit is contained in:
D. Moonfire 2023-08-02 15:46:19 -05:00
parent 6ae43e5097
commit 3ddb2fd236

View file

@ -35,6 +35,8 @@ public class PipelineRunner
private readonly RunnerStateMachine state;
private CancellationToken cancellation;
private DateTime changed;
private List<Entity> outputs;
@ -163,11 +165,6 @@ public class PipelineRunner
/// </summary>
public ICollection<PipelineRunner> Incoming { get; }
/// <summary>
/// Gets a value indicating whether the pipeline failed.
/// </summary>
public bool IsErrored => this.state.State == PipelineRunnerState.Errored;
/// <summary>
/// Gets a value indicating whether the output of this entry is not
/// used by any other pipeline.
@ -208,6 +205,11 @@ public class PipelineRunner
/// </summary>
public PipelineRunnerState State => this.state.State;
/// <summary>
/// Gets a value indicating whether the pipeline failed.
/// </summary>
private bool IsErrored => this.state.State == PipelineRunnerState.Errored;
/// <summary>
/// Resets the internal state for running again. This also goes through
/// </summary>
@ -223,6 +225,7 @@ public class PipelineRunner
/// </summary>
public async Task RunAsync(CancellationToken cancellationToken = default)
{
this.cancellation = cancellationToken;
await this.state.FireAsync(PipelineRunnerTrigger.StartPipeline);
}
@ -297,7 +300,7 @@ public class PipelineRunner
this.outgoingBlock.Set();
// Wait until all consumers have finished processing.
this.outgoingDone.Wait();
this.outgoingDone.Wait(this.cancellation);
this.state.Fire(PipelineRunnerTrigger.ProvidingDone);
}
@ -306,7 +309,7 @@ public class PipelineRunner
try
{
List<Entity> input = this.GetInputFromIncoming();
await this.RunPipeline(input, default);
await this.RunPipeline(input);
await this.state.FireAsync(PipelineRunnerTrigger.FinishPipeline);
}
catch (Exception exception)
@ -378,39 +381,7 @@ public class PipelineRunner
this.changed = DateTime.Now;
}
private void OnUnlockOutgoing()
{
this.outgoingBlock.Set();
}
private void OnWaiting()
{
this.observers.OnWaitForIncomingStarted(this);
foreach (PipelineRunner dependency in this.Incoming)
{
dependency.WaitUntilIncomingReady();
}
// Check for any error state in the dependency, if we have one,
// then we need to stop ourselves and any dependency that is waiting
// on us.
bool hasError = this.Incoming
.Any(x => x.state.State == PipelineRunnerState.Errored);
if (!hasError)
{
this.state.Fire(PipelineRunnerTrigger.UnlockOutputs);
return;
}
this.logger.Error("There was an exception in a dependency");
this.state.Fire(PipelineRunnerTrigger.PipelineFailed);
}
private async Task RunPipeline(
List<Entity> input,
CancellationToken cancellationToken)
private async Task RunPipeline(List<Entity> input)
{
// Let the observers be aware of the input starting. Even thought we
// are using `Select` here, it doesn't collect/resolve the list until
@ -427,8 +398,8 @@ public class PipelineRunner
});
List<Entity> output = await this.Pipeline
.RunAsync(observedInput, cancellationToken)
.ToListAsync(cancellationToken);
.RunAsync(observedInput, this.cancellation)
.ToListAsync(this.cancellation);
this.observers.OnRunFinished(this);
this.outputs.Clear();
@ -441,6 +412,6 @@ public class PipelineRunner
/// </summary>
private void WaitUntilIncomingReady()
{
this.outgoingBlock.Wait();
this.outgoingBlock.Wait(this.cancellation);
}
}