feat: refactored how the pipeline runners are structured

This commit is contained in:
D. Moonfire 2023-08-02 03:41:14 -05:00
parent 7ec38c160d
commit f32eca146e
26 changed files with 541 additions and 213 deletions

View file

@ -87,6 +87,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MfGames.Serilog.SpectreExpr
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NitrideCopyFiles", "examples\NitrideCopyFiles\NitrideCopyFiles.csproj", "{1843ECA6-18FD-4CE3-BCD5-6B478C4F893D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NitridePipelines", "examples\NitridePipelines\NitridePipelines.csproj", "{B044CB47-0024-4338-A56B-DCC049E06DED}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -568,6 +570,18 @@ Global
{1843ECA6-18FD-4CE3-BCD5-6B478C4F893D}.Release|x64.Build.0 = Release|Any CPU
{1843ECA6-18FD-4CE3-BCD5-6B478C4F893D}.Release|x86.ActiveCfg = Release|Any CPU
{1843ECA6-18FD-4CE3-BCD5-6B478C4F893D}.Release|x86.Build.0 = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|x64.ActiveCfg = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|x64.Build.0 = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|x86.ActiveCfg = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Debug|x86.Build.0 = Debug|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|Any CPU.Build.0 = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|x64.ActiveCfg = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|x64.Build.0 = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|x86.ActiveCfg = Release|Any CPU
{B044CB47-0024-4338-A56B-DCC049E06DED}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{5253E2A6-9565-45AF-92EA-1BFD3A63AC23} = {9C845D9A-B359-43B3-AE9E-B84CE945AF21}
@ -609,5 +623,6 @@ Global
{D58365E6-E98B-4A04-8447-4B9417850D85} = {F79B6838-B175-43A3-8C52-69A414CC1386}
{25457946-9CD0-498E-8B46-03C420CCF103} = {9C845D9A-B359-43B3-AE9E-B84CE945AF21}
{1843ECA6-18FD-4CE3-BCD5-6B478C4F893D} = {F79B6838-B175-43A3-8C52-69A414CC1386}
{B044CB47-0024-4338-A56B-DCC049E06DED} = {F79B6838-B175-43A3-8C52-69A414CC1386}
EndGlobalSection
EndGlobal

View file

@ -9,6 +9,8 @@ using MfGames.Nitride.IO.Directories;
using MfGames.Nitride.IO.Paths;
using MfGames.Nitride.Pipelines;
using Serilog;
namespace CopyFiles;
/// <summary>

1
examples/NitridePipelines/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
output/

View file

@ -0,0 +1,44 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MfGames.Gallium;
using MfGames.Nitride.Pipelines;
using Serilog;
using Zio;
namespace NitridePipelines;
public class DelayPipeline1 : PipelineBase
{
private readonly ILogger logger;
public DelayPipeline1(
ILogger logger,
InputPipeline1 input1)
{
this.logger = logger.ForContext<DelayPipeline1>();
this.AddDependency(input1);
}
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default)
{
entities = entities
.Select(
entity =>
{
Thread.Sleep(1000);
this.logger.Information(
"Delayed {Value}",
entity.Get<UPath>());
return entity;
});
return entities.ToAsyncEnumerable();
}
}

View file

@ -0,0 +1,30 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MfGames.Gallium;
using MfGames.Nitride.IO.Contents;
using MfGames.Nitride.Pipelines;
namespace NitridePipelines;
public class InputPipeline1 : PipelineBase
{
private readonly ReadFiles readFiles;
public InputPipeline1(ReadFiles readFiles)
{
this.readFiles = readFiles
.WithPattern("/input/input1/*.txt");
}
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> _,
CancellationToken cancellationToken = default)
{
IEnumerable<Entity> entities = this.readFiles.Run();
return entities.ToAsyncEnumerable();
}
}

View file

@ -0,0 +1,30 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MfGames.Gallium;
using MfGames.Nitride.IO.Contents;
using MfGames.Nitride.Pipelines;
namespace NitridePipelines;
public class InputPipeline2 : PipelineBase
{
private readonly ReadFiles readFiles;
public InputPipeline2(ReadFiles readFiles)
{
this.readFiles = readFiles
.WithPattern("/input/input2/*.txt");
}
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> _,
CancellationToken cancellationToken = default)
{
IEnumerable<Entity> entities = this.readFiles.Run();
return entities.ToAsyncEnumerable();
}
}

View file

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\MfGames.Nitride.IO\MfGames.Nitride.IO.csproj" />
<ProjectReference Include="..\..\src\MfGames.Nitride\MfGames.Nitride.csproj" />
</ItemGroup>
</Project>

View file

@ -0,0 +1,19 @@
using Autofac;
namespace NitridePipelines;
public class NitridePipelinesModule : Module
{
/// <inheritdoc />
protected override void Load(ContainerBuilder builder)
{
// This just registers all the non-static classes as singletons
// within the system. We use lifetimes in other components depending
// on how they are used, but in this case, we don't need it.
builder
.RegisterAssemblyTypes(this.GetType().Assembly)
.AsSelf()
.AsImplementedInterfaces()
.SingleInstance();
}
}

View file

@ -0,0 +1,31 @@
using System.IO;
using System.Threading.Tasks;
using Autofac;
using MfGames.IO.Extensions;
using MfGames.Nitride;
using MfGames.Nitride.IO;
namespace NitridePipelines;
/// <summary>
/// Main entry point into the CopyFiles sample generator.
/// </summary>
public static class NitridePipelinesProgram
{
public static async Task<int> Main(string[] args)
{
DirectoryInfo rootDir = typeof(NitridePipelinesProgram)
.GetDirectory()!
.FindGitRoot()!
.GetDirectory("examples/NitridePipelines");
return await new NitrideBuilder(args)
.UseIO()
.WithRootDirectory(rootDir)
.ConfigureContainer(
x => x.RegisterModule<NitridePipelinesModule>())
.RunAsync();
}
}

View file

@ -0,0 +1,47 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MfGames.Gallium;
using MfGames.Nitride.Pipelines;
using Serilog;
using Zio;
namespace NitridePipelines;
public class OutputPipeline1 : PipelineBase
{
private readonly ILogger logger;
public OutputPipeline1(
ILogger logger,
DelayPipeline1 delay1,
InputPipeline2 input2)
{
this.logger = logger.ForContext<OutputPipeline1>();
this.AddDependency(delay1, input2);
}
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default)
{
entities = entities
.Select(
entity =>
{
Thread.Sleep(1000);
this.logger.Information(
"Pretended to write {Value}",
entity.Get<UPath>());
return entity;
});
return entities.ToAsyncEnumerable();
}
}

View file

@ -0,0 +1,46 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MfGames.Gallium;
using MfGames.Nitride.Pipelines;
using Serilog;
using Zio;
namespace NitridePipelines;
public class OutputPipeline2 : PipelineBase
{
private readonly ILogger logger;
public OutputPipeline2(
ILogger logger,
InputPipeline2 input2)
{
this.logger = logger.ForContext<OutputPipeline2>();
this.AddDependency(input2);
}
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default)
{
entities = entities
.Select(
entity =>
{
Thread.Sleep(1000);
this.logger.Information(
"Pretended to write {Value}",
entity.Get<UPath>());
return entity;
});
return entities.ToAsyncEnumerable();
}
}

View file

@ -0,0 +1,6 @@
# MfGames.Nitride - Copy Files
This is probably the most basic generator possible. It simply copies files from
the input and places them into the output. However, it also demonstrates a basic
setup including creating a pipeline, wiring everything up with modules, and
configuring everything.

View file

@ -28,7 +28,7 @@ public partial class WriteFiles : FileSystemOperationBase, IOperation
IFileSystem fileSystem)
: base(fileSystem)
{
this.Logger = logger;
this.Logger = logger.ForContext<WriteFiles>();
this.validator = validator;
this.TextEncoding = Encoding.UTF8;

View file

@ -26,7 +26,7 @@ public partial class ClearDirectory : FileSystemOperationBase, IOperation
ILogger logger)
: base(fileSystem)
{
this.Logger = logger;
this.Logger = logger.ForContext<ClearDirectory>();
this.validator = validator;
}

View file

@ -21,24 +21,24 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="7.0.1"/>
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="8.0.0"/>
<PackageReference Include="FluentValidation" Version="11.6.0"/>
<PackageReference Include="Autofac" Version="7.0.1" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="FluentValidation" Version="11.6.0" />
<PackageReference Include="GitVersion.MSBuild" Version="5.12.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Humanizer.Core" Version="2.14.1"/>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.1"/>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0"/>
<PackageReference Include="Serilog" Version="3.0.1"/>
<PackageReference Include="Serilog.Extensions.Autofac.DependencyInjection" Version="5.0.0"/>
<PackageReference Include="Serilog.Extensions.Hosting" Version="7.0.0"/>
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0"/>
<PackageReference Include="SerilogAnalyzer" Version="0.15.0"/>
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1"/>
<PackageReference Include="System.Linq.Async" Version="6.0.1"/>
<PackageReference Include="Zio" Version="0.16.2"/>
<PackageReference Include="Humanizer.Core" Version="2.14.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
<PackageReference Include="Serilog" Version="3.0.1" />
<PackageReference Include="Serilog.Extensions.Autofac.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="SerilogAnalyzer" Version="0.15.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="Zio" Version="0.16.2" />
</ItemGroup>
<!-- Include the source generator -->
@ -47,12 +47,12 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\MfGames.Gallium\MfGames.Gallium.csproj"/>
<ProjectReference Include="..\MfGames.Gallium\MfGames.Gallium.csproj" />
<ProjectReference Include="..\MfGames.Nitride.Generators\MfGames.Nitride.Generators.csproj">
<OutputItemType>Analyzer</OutputItemType>
<ReferenceOutputAssembly>False</ReferenceOutputAssembly>
</ProjectReference>
<ProjectReference Include="..\MfGames.ToolBuilder\MfGames.ToolBuilder.csproj"/>
<ProjectReference Include="..\MfGames.ToolBuilder\MfGames.ToolBuilder.csproj" />
</ItemGroup>
</Project>

View file

@ -20,7 +20,8 @@ public class NitrideModule : Module
protected override void Load(ContainerBuilder builder)
{
// Pipelines
builder.RegisterType<PipelineRunner>()
builder
.RegisterType<PipelineRunner>()
.AsSelf();
builder.RegisterType<PipelineManager>()
@ -39,35 +40,36 @@ public class NitrideModule : Module
// MfGames.ToolBuilder requires the RootCommand to be registered. This is because
// of various things, mostly coordinating between different systems.
builder.Register(
c =>
{
// Create the new root command.
var root = new RootCommand();
if (!string.IsNullOrWhiteSpace(this.ApplicationName))
{
root.Name = this.ApplicationName;
}
if (!string.IsNullOrWhiteSpace(this.Description))
{
root.Description = this.Description;
}
;
// Add in the commands.
IEnumerable<Command> commands =
c.Resolve<IEnumerable<Command>>();
foreach (Command command in commands)
{
root.AddCommand(command);
}
return root;
})
builder
.Register(this.CreateRootCommand)
.AsSelf();
}
private RootCommand CreateRootCommand(IComponentContext c)
{
// Create the new root command.
var root = new RootCommand();
if (!string.IsNullOrWhiteSpace(this.ApplicationName))
{
root.Name = this.ApplicationName;
}
if (!string.IsNullOrWhiteSpace(this.Description))
{
root.Description = this.Description;
}
;
// Add in the commands.
IEnumerable<Command> commands = c.Resolve<IEnumerable<Command>>();
foreach (Command command in commands)
{
root.AddCommand(command);
}
return root;
}
}

View file

@ -1,5 +1,7 @@
using MfGames.Gallium;
using Serilog;
namespace MfGames.Nitride.Pipelines;
/// <summary>

View file

@ -11,32 +11,21 @@ namespace MfGames.Nitride.Pipelines;
/// </summary>
public class PipelineManager
{
private readonly PipelineRunner.Factory createEntry;
private readonly ILogger logger;
private List<PipelineRunner> entries;
private List<PipelineRunner> runners;
private bool isSetup;
private ICollection<IPipeline> pipelines;
public PipelineManager(
ILogger logger,
IEnumerable<IPipeline> pipelines,
PipelineRunner.Factory createEntry)
PipelineRunner.Factory runnerFactory)
{
this.createEntry = createEntry;
this.logger = logger.ForContext<PipelineManager>();
this.pipelines = new HashSet<IPipeline>(pipelines);
this.entries = null!;
}
public ICollection<IPipeline> Pipelines
{
get => this.pipelines;
set => this.pipelines =
value ?? throw new ArgumentNullException(nameof(value));
this.runners = pipelines
.Select(pipeline => runnerFactory(pipeline))
.ToList();
}
/// <summary>
@ -59,9 +48,9 @@ public class PipelineManager
// resulting tasks and then wait for all of them to end.
this.logger.Verbose(
"Starting {Count:l}",
"pipeline".ToQuantity(this.pipelines.Count));
"pipeline".ToQuantity(this.runners.Count));
Task[] tasks = this.entries
Task[] tasks = this.runners
.Select(
x => Task.Run(
async () => await x.RunAsync(cancellationToken),
@ -72,7 +61,7 @@ public class PipelineManager
while (!Task.WaitAll(tasks, report))
{
var waiting = this.entries
var waiting = this.runners
.Where(x => !x.IsFinished)
.ToList();
@ -107,7 +96,7 @@ public class PipelineManager
}
// Figure out our return code.
bool hasErrors = this.entries
bool hasErrors = this.runners
.Any(x => x.State == PipelineRunnerState.Errored);
this.logger.Information(
@ -130,7 +119,7 @@ public class PipelineManager
}
// If we don't have any pipelines, then we can't process.
if (this.pipelines.Count == 0)
if (this.runners.Count == 0)
{
this.logger.Error(
"There are no registered pipelines run, use"
@ -141,16 +130,12 @@ public class PipelineManager
this.logger.Verbose(
"Setting up {Count:l}",
"pipeline".ToQuantity(this.pipelines.Count));
"pipeline".ToQuantity(this.runners.Count));
// Wrap all the pipelines into entries. We do this before the next
// step so we can have the entries depend on the entries.
this.entries = this.pipelines
.Select(x => this.createEntry(x))
.ToList();
// Go through and connect the pipelines together.
foreach (PipelineRunner? entry in this.entries)
// Go through and connect the pipelines together using the dependencies
// that were built through the constructors of the pipelines and then
// registered with `AddDependency`.
foreach (PipelineRunner? entry in this.runners)
{
var dependencies = entry.Pipeline
.GetDependencies()
@ -159,7 +144,7 @@ public class PipelineManager
foreach (IPipeline? dependency in dependencies)
{
// Get the entry for the dependency.
PipelineRunner dependencyPipeline = this.entries
PipelineRunner dependencyPipeline = this.runners
.Single(x => x.Pipeline == dependency);
// Set up the bi-directional connection.
@ -170,9 +155,9 @@ public class PipelineManager
// Loop through all the entries and tell them we are done providing
// and they can set up internal threads other structures.
foreach (PipelineRunner? entry in this.entries)
foreach (PipelineRunner runner in this.runners)
{
entry.Initialize();
runner.Setup();
}
// We have run successfully.

View file

@ -16,20 +16,22 @@ namespace MfGames.Nitride.Pipelines;
/// </remarks>
public class PipelineRunner
{
private readonly ILogger logger;
/// <summary>
/// The manual reset event used to coordinate thread operations.
/// </summary>
private readonly ManualResetEventSlim blockDependencies;
private readonly ManualResetEventSlim outgoingBlock;
/// <summary>
/// A manual reset event to tell the thread when consumers are done.
/// </summary>
private readonly ManualResetEventSlim consumersDone;
private readonly ILogger logger;
private readonly ManualResetEventSlim outgoingDone;
private DateTime changed;
private List<Entity> outputs;
private bool signaledDoneWithInputs;
private DateTime started;
@ -44,15 +46,15 @@ public class PipelineRunner
ILogger logger,
IPipeline pipeline)
{
this.Pipeline =
pipeline ?? throw new ArgumentNullException(nameof(pipeline));
this.Pipeline = pipeline
?? throw new ArgumentNullException(nameof(pipeline));
this.Incoming = new List<PipelineRunner>();
this.Outgoing = new List<PipelineRunner>();
this.Outputs = new List<Entity>();
this.logger = logger.ForContext<PipelineRunner>();
this.blockDependencies = new ManualResetEventSlim(false);
this.consumersDone = new ManualResetEventSlim(false);
this.started = DateTime.Now;
this.outputs = new List<Entity>();
this.logger = logger
.ForContext(this.Pipeline.GetType());
this.outgoingBlock = new ManualResetEventSlim(false);
this.outgoingDone = new ManualResetEventSlim(false);
this.changed = DateTime.Now;
}
@ -79,12 +81,13 @@ public class PipelineRunner
/// <summary>
/// Gets a value indicating whether this pipeline is done running.
/// </summary>
public bool IsFinished => this.State is PipelineRunnerState.Finalized
public bool IsFinished => this.State
is PipelineRunnerState.Finalized
or PipelineRunnerState.Errored;
/// <summary>
/// Gets a value indicating whether this entry is a starting one
/// that consumes no data.
/// Gets a value indicating whether this entry is one that has no
/// dependencies and therefore could be considered a starting pipeline.
/// </summary>
public bool IsStarting => this.Incoming.Count == 0;
@ -95,14 +98,7 @@ public class PipelineRunner
public ICollection<PipelineRunner> Outgoing { get; }
/// <summary>
/// Contains the list of all the outputs from this pipeline. This is
/// only ensured to be valid after the pipeline is in the `Providing`
/// state.
/// </summary>
public List<Entity> Outputs { get; }
/// <summary>
/// The pipeline associated with the entry.
/// The pipeline associated with the runner.
/// </summary>
public IPipeline Pipeline { get; }
@ -112,30 +108,46 @@ public class PipelineRunner
public PipelineRunnerState State { get; private set; }
/// <summary>
/// A method that tells the pipeline that one of the dependencies has
/// completed consuming the input.
/// A method that tells the pipeline one of the outgoing pipelines has
/// completed consuming the output from this runner.
/// </summary>
public void ConsumerDoneWithOutputs()
public void ConsumerDone(PipelineRunner runner)
{
int current = Interlocked.Decrement(ref this.waitingOnConsumers);
this.logger.Verbose(
"{Pipeline:l}: Consumer signalled, waiting for {Count:n0}",
this.Pipeline,
"{Runner} signalled, waiting for {Count:n0} more",
runner,
current);
if (current == 0)
{
this.consumersDone.Set();
this.outgoingDone.Set();
}
}
/// <summary>
/// Initializes the runner after all external properties have been
/// set and configured.
/// Contains the list of all the outputs from this pipeline. This is
/// only ensured to be valid after the pipeline is in the `Providing`
/// state.
/// </summary>
public void Initialize()
public List<Entity> GetOutputs()
{
return !this.IsValidState(
PipelineRunnerState.Providing,
PipelineRunnerState.Started,
PipelineRunnerState.Restarted)
? new List<Entity>()
: this.outputs;
}
/// <summary>
/// Resets the internal state for running again. This also goes through
/// </summary>
public void Reset()
{
this.started = DateTime.Now;
this.outputs = new List<Entity>();
this.ChangeState(PipelineRunnerState.Initialized);
}
@ -148,55 +160,46 @@ public class PipelineRunner
try
{
// Make sure we have a valid state.
switch (this.State)
if (!this.IsValidState(
PipelineRunnerState.Initialized,
PipelineRunnerState.Restarted,
PipelineRunnerState.Finalized))
{
case PipelineRunnerState.Initialized:
case PipelineRunnerState.Finalized:
break;
default:
this.logger.Error(
"{Pipeline:l}: Pipeline cannot be started in a {State}"
+ " state (not Initialized or Finalized)",
this.Pipeline,
this.State);
break;
return;
}
// Prepare ourselves for running. We have a start/stop state because
// this may be non-zero time.
this.started = DateTime.Now;
this.changed = DateTime.Now;
this.ChangeState(PipelineRunnerState.Preparing);
this.signaledDoneWithInputs = false;
this.ChangeState(PipelineRunnerState.Prepared);
// Go through the incoming and wait for each of the manual resets
// on the dependency pipelines.
if (this.WaitForDependencies())
{
this.SignalDoneWithInputs();
// on the dependency pipelines. If there is an error, then we will
// indicate to our dependencies that we're done processing but
// nothing will happen because our error state will propagate out.
this.WaitForIncoming();
if (this.State == PipelineRunnerState.Errored)
{
this.SendDoneToIncoming();
return;
}
// Grab the outputs from the incoming. They will be populated
// because we have waited for the reset events.
this.ChangeState(PipelineRunnerState.Started);
List<Entity> input = this.GatherDependencyOutputs();
List<Entity> input = this.GetInputFromIncoming();
// Run the pipeline. This may not be resolved until we gather
// the output below.
await this.RunPipeline(input, cancellationToken);
// At this point, we are completely done with our inputs, so signal
// to them in case they have to clean up any of their structures.
this.SignalDoneWithInputs();
// If we have outgoing runners, provide them data until they are
// done.
this.SendToDependants();
// If we have outgoing runners, provide them with the entities we've
// produced and start providing those values to to them. This will
// block until the dependencies are done consuming.
this.UnlockOutgoing();
// Finalize ourselves.
this.ChangeState(PipelineRunnerState.Finalized);
@ -206,24 +209,30 @@ public class PipelineRunner
// Report the exception.
this.logger.Error(
exception,
"{Pipeline:l}: There was an exception running pipeline",
this.Pipeline);
"There was an exception running pipeline");
// Change our state and then release any pipeline waiting for us
// so they can pick up the error and fail themselves.
this.ChangeState(PipelineRunnerState.Errored);
this.blockDependencies.Set();
this.SignalDoneWithInputs();
this.UnlockOutgoingAsErrored();
}
}
/// <summary>
/// A method to block the call until this runner is done processing and
/// is ready to provide output.
/// Initializes the runner after all external properties have been
/// set and configured.
/// </summary>
public void WaitUntilProviding()
public void Setup()
{
this.blockDependencies.Wait();
this.started = DateTime.Now;
this.outputs = new List<Entity>();
this.ChangeState(PipelineRunnerState.Initialized);
}
/// <inheritdoc />
public override string ToString()
{
return $"PipelineRunner<{this.Pipeline}>";
}
/// <summary>
@ -233,8 +242,7 @@ public class PipelineRunner
private void ChangeState(PipelineRunnerState newState)
{
this.logger.Verbose(
"{Pipeline:l}: Switching from state {Old} to {New} (elapsed {Elapsed}, duration {Duration})",
this.Pipeline,
"Switching from state {Old} to {New} (elapsed {Elapsed}, duration {Duration})",
this.State,
newState,
this.ElapsedFromInitialized,
@ -244,8 +252,9 @@ public class PipelineRunner
this.State = newState;
}
private List<Entity> GatherDependencyOutputs()
private List<Entity> GetInputFromIncoming()
{
// If we have no incoming dependencies, then there is nothing to gather.
if (this.Incoming.Count <= 0)
{
return new List<Entity>();
@ -253,21 +262,50 @@ public class PipelineRunner
// Report that we are gathering our outputs.
this.logger.Verbose(
"{Pipeline:l}: Gathering outputs from {Count:n0} dependencies",
this.Pipeline,
"Gathering outputs from {Count:n0} dependencies",
this.Incoming.Count);
var input = this.Incoming.SelectMany(x => x.Outputs)
// Gather all the entities from the dependencies into a single list.
var input = this.Incoming
.SelectMany(x => x.GetOutputs())
.ToList();
this.logger.Debug(
"{Pipeline:l}: Got {Count:l} from dependencies",
this.Pipeline,
"Got {Count:l} from dependencies",
"entity".ToQuantity(input.Count, "N0"));
// Since we gathered all the inputs, we can have this thread do its
// signalling while not waiting for the pipeline to finish.
this.SendDoneToIncoming();
return input;
}
/// <summary>
/// Ensures that the pipeline runner is in the correct state.
/// </summary>
/// <param name="states">The states that the pipeline runner is considered valid.</param>
private bool IsValidState(params PipelineRunnerState[] states)
{
// If we are in any of the given states, then we're good and nothing
// will happen.
if (states.Any(a => a == this.State))
{
return true;
}
// Otherwise, we are in an invalid state.
this.logger.Error(
"Pipeline is in an invalid state of {State}"
+ " (not {ValidStates})",
this.State,
states);
this.State = PipelineRunnerState.Errored;
return false;
}
private async Task RunPipeline(
List<Entity> input,
CancellationToken cancellationToken)
@ -277,45 +315,13 @@ public class PipelineRunner
.RunAsync(input, cancellationToken)
.ToListAsync(cancellationToken);
// Gather all the output.
this.logger.Verbose("{Pipeline:l}: Gathering output", this.Pipeline);
this.Outputs.Clear();
this.Outputs.AddRange(output);
// Gather all the output and drain the inputs.
this.logger.Verbose("Gathering output from incoming pipelines");
this.outputs.Clear();
this.outputs.AddRange(output);
}
private void SendToDependants()
{
if (this.Outgoing.Count <= 0)
{
return;
}
// Make sure our internal wait for the consumers it set.
this.logger.Verbose(
"{Pipeline:l}: Setting up internal thread controls",
this.Pipeline);
this.waitingOnConsumers = this.Outgoing.Count;
this.consumersDone.Reset();
// Report how many files we're sending out and then use manual
// reset and the semaphore to control the threads.
this.logger.Debug(
"{Pipeline:l}: Output {Count:l} from pipeline",
this.Pipeline,
"entity".ToQuantity(this.Outputs.Count, "N0"));
// Release our manual reset to allow operations to continue.
this.ChangeState(PipelineRunnerState.Providing);
this.logger.Verbose(
"{Pipeline:l}: Release manual reset for consumers",
this.Pipeline);
this.blockDependencies.Set();
// Wait until all consumers have finished processing.
this.consumersDone.Wait();
}
private void SignalDoneWithInputs()
private void SendDoneToIncoming()
{
if (this.Incoming.Count <= 0 || this.signaledDoneWithInputs)
{
@ -325,52 +331,95 @@ public class PipelineRunner
this.signaledDoneWithInputs = true;
this.logger.Verbose(
"{Pipeline:l}: Signaling {Count:n0} dependencies done",
this.Pipeline,
"Signaling {Count:n0} dependencies done",
this.Incoming.Count);
foreach (PipelineRunner? dependency in this.Incoming)
foreach (PipelineRunner dependency in this.Incoming)
{
dependency.ConsumerDoneWithOutputs();
dependency.ConsumerDone(this);
}
}
private bool WaitForDependencies()
private void UnlockOutgoing()
{
// If we don't have any outgoing pipelines, then there is nothing to
// do and we can finish running.
if (this.Outgoing.Count <= 0)
{
return;
}
// Make sure our internal wait for the consumers it set.
this.logger.Verbose("Setting up internal thread controls");
this.waitingOnConsumers = this.Outgoing.Count;
this.outgoingDone.Reset();
// Report how many files we're sending out and then use manual
// reset and the semaphore to control the threads.
this.logger.Debug(
"Output {Count:l} from pipeline",
"entity".ToQuantity(this.GetOutputs().Count, "N0"));
// Release our manual reset to allow operations to continue.
this.ChangeState(PipelineRunnerState.Providing);
this.logger.Verbose("Release manual reset for consumers");
this.outgoingBlock.Set();
// Wait until all consumers have finished processing.
this.outgoingDone.Wait();
}
private void UnlockOutgoingAsErrored()
{
this.ChangeState(PipelineRunnerState.Errored);
this.outgoingBlock.Set();
}
/// <summary>
/// Waits for all the incoming pipelines to be completed and ready to provide
/// us input before returning.
/// </summary>
private void WaitForIncoming()
{
// If we have no incoming pipelines, then there is nothing to wait for.
if (this.Incoming.Count <= 0)
{
return false;
return;
}
// Wait for the dependencies to run first.
this.ChangeState(PipelineRunnerState.Waiting);
this.logger.Verbose(
"{Pipeline:l}: Waiting for {Count:l} to complete",
this.Pipeline,
"Waiting for {Count:l} to complete",
"dependency".ToQuantity(this.Incoming.Count));
foreach (PipelineRunner? dependency in this.Incoming)
foreach (PipelineRunner dependency in this.Incoming)
{
dependency.WaitUntilProviding();
dependency.WaitUntilIncomingReady();
}
// Check for any error state in the dependency, if we have one,
// then we need to stop ourselves.
bool hasError =
this.Incoming.Any(x => x.State == PipelineRunnerState.Errored);
// then we need to stop ourselves and any dependency that is waiting
// on us.
bool hasError = this.Incoming
.Any(x => x.State == PipelineRunnerState.Errored);
if (!hasError)
{
return false;
return;
}
this.logger.Error(
"{Pipeline:l}: There was an exception in an dependency",
this.Pipeline);
this.ChangeState(PipelineRunnerState.Errored);
this.blockDependencies.Set();
this.logger.Error("There was an exception in a dependency");
this.UnlockOutgoingAsErrored();
}
return true;
/// <summary>
/// A method to block the call until this runner is done processing and
/// is ready to provide output.
/// </summary>
private void WaitUntilIncomingReady()
{
this.outgoingBlock.Wait();
}
}

View file

@ -17,6 +17,11 @@ public enum PipelineRunnerState
/// </summary>
Initialized,
/// <summary>
/// Indicates that the runner has been reset, usually by the "watch" command.
/// </summary>
Restarted,
/// <summary>
/// Indicates that the pipeline is prepare for a new run. This is done
/// when the system determines it needs to run.