2021-09-07 05:15:45 +00:00
|
|
|
using System;
|
|
|
|
using System.Collections.Generic;
|
|
|
|
using System.Linq;
|
|
|
|
using System.Threading.Tasks;
|
2022-06-05 18:44:51 +00:00
|
|
|
|
2021-09-07 05:15:45 +00:00
|
|
|
using Humanizer;
|
2022-06-05 18:44:51 +00:00
|
|
|
|
2021-09-07 05:15:45 +00:00
|
|
|
using Serilog;
|
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
namespace Nitride.Pipelines;
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// A manager class for all of the pipelines. This class is responsible for
|
|
|
|
/// hooking everything up and handling the ordering of pipelines as they
|
|
|
|
/// are run.
|
|
|
|
/// </summary>
|
|
|
|
public class PipelineManager
|
2021-09-07 05:15:45 +00:00
|
|
|
{
|
2022-06-05 18:44:51 +00:00
|
|
|
private readonly PipelineRunner.Factory createEntry;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
private readonly ILogger logger;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
private List<PipelineRunner> entries;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
private bool isSetup;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
private ICollection<IPipeline> pipelines;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
public PipelineManager(ILogger logger, IEnumerable<IPipeline> pipelines, PipelineRunner.Factory createEntry)
|
|
|
|
{
|
|
|
|
this.createEntry = createEntry;
|
|
|
|
this.logger = logger.ForContext<PipelineManager>();
|
|
|
|
this.pipelines = new HashSet<IPipeline>(pipelines);
|
|
|
|
this.entries = null!;
|
|
|
|
}
|
|
|
|
|
|
|
|
public ICollection<IPipeline> Pipelines
|
|
|
|
{
|
|
|
|
get => this.pipelines;
|
|
|
|
set => this.pipelines = value ?? throw new ArgumentNullException(nameof(value));
|
|
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// Runs all of the pipelines in the appropriate order while running
|
|
|
|
/// across multiple threads.
|
|
|
|
/// </summary>
|
|
|
|
/// <returns>A task with zero for success or otherwise an error code.</returns>
|
|
|
|
public Task<int> RunAsync()
|
|
|
|
{
|
|
|
|
// Make sure everything is setup.
|
|
|
|
DateTime started = DateTime.UtcNow;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
if (!this.Setup())
|
2021-09-07 05:15:45 +00:00
|
|
|
{
|
2022-06-05 18:44:51 +00:00
|
|
|
return Task.FromResult(1);
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// Go through all the entries and start each one. We gather the
|
|
|
|
// resulting tasks and then wait for all of them to end.
|
|
|
|
this.logger.Verbose("Starting {Count:l}", "pipeline".ToQuantity(this.pipelines.Count));
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
Task[] tasks = this.entries.Select(x => Task.Run(async () => await x.RunAsync())).ToArray();
|
|
|
|
var report = TimeSpan.FromSeconds(15);
|
|
|
|
|
|
|
|
while (!Task.WaitAll(tasks, report))
|
|
|
|
{
|
|
|
|
var waiting = this.entries.Where(x => !x.IsFinished).ToList();
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
this.logger.Debug("Waiting for {Count:l} to finish running", "pipeline".ToQuantity(waiting.Count));
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
IOrderedEnumerable<IGrouping<PipelineRunnerState, PipelineRunner>> states =
|
|
|
|
waiting.GroupBy(x => x.State, x => x).OrderBy(x => (int)x.Key);
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
foreach (IGrouping<PipelineRunnerState, PipelineRunner>? state in states)
|
2021-09-07 05:15:45 +00:00
|
|
|
{
|
2022-06-05 18:44:51 +00:00
|
|
|
var statePipelines = state.OrderBy(x => x.Pipeline.ToString()).ToList();
|
|
|
|
|
|
|
|
this.logger.Verbose(
|
|
|
|
"Waiting for {Count:l} in {State}: {List:l}",
|
|
|
|
"pipeline".ToQuantity(statePipelines.Count),
|
|
|
|
state.Key,
|
|
|
|
string.Join(
|
|
|
|
", ",
|
|
|
|
state.Key == PipelineRunnerState.Started
|
|
|
|
? statePipelines.Select(x => $"{x.Pipeline} ({x.ElapsedFromState})")
|
|
|
|
: statePipelines.Select(x => x.Pipeline.ToString())));
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
2022-06-05 18:44:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Figure out our return code.
|
|
|
|
bool hasErrors = this.entries.Any(x => x.State == PipelineRunnerState.Errored);
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
this.logger.Information("Completed in {Elapsed}", DateTime.UtcNow - started);
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
return Task.FromResult(hasErrors ? 2 : 0);
|
|
|
|
}
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
/// <summary>
|
|
|
|
/// Performs the final initialization and preparation for the pipelines
|
|
|
|
/// and get them ready for deploying.
|
|
|
|
/// </summary>
|
|
|
|
private bool Setup()
|
|
|
|
{
|
|
|
|
// If we've already set up ourselves, then we do nothing.
|
|
|
|
if (this.isSetup)
|
|
|
|
{
|
|
|
|
return true;
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// If we don't have any pipelines, then we can't process.
|
|
|
|
if (this.pipelines.Count == 0)
|
2021-09-07 05:15:45 +00:00
|
|
|
{
|
2022-06-05 18:44:51 +00:00
|
|
|
this.logger.Error(
|
|
|
|
"There are no registered pipelines run, use" + " ConfigureContainer to include IPipeline instances");
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
return false;
|
|
|
|
}
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
this.logger.Verbose("Setting up {Count:l}", "pipeline".ToQuantity(this.pipelines.Count));
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// Wrap all the pipelines into entries. We do this before the next
|
|
|
|
// step so we can have the entries depend on the entries.
|
|
|
|
this.entries = this.pipelines.Select(x => this.createEntry(x)).ToList();
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// Go through and connect the pipelines together.
|
|
|
|
foreach (PipelineRunner? entry in this.entries)
|
|
|
|
{
|
|
|
|
var dependencies = entry.Pipeline.GetDependencies().ToList();
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
foreach (IPipeline? dependency in dependencies)
|
2021-09-07 05:15:45 +00:00
|
|
|
{
|
2022-06-05 18:44:51 +00:00
|
|
|
// Get the entry for the dependency.
|
|
|
|
PipelineRunner dependencyPipeline = this.entries.Single(x => x.Pipeline == dependency);
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// Set up the bi-directional connection.
|
|
|
|
entry.Incoming.Add(dependencyPipeline);
|
|
|
|
dependencyPipeline.Outgoing.Add(entry);
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
2022-06-05 18:44:51 +00:00
|
|
|
}
|
2021-09-07 05:15:45 +00:00
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
// Loop through all the entries and tell them we are done providing
|
|
|
|
// and they can set up internal threads other structures.
|
|
|
|
foreach (PipelineRunner? entry in this.entries)
|
|
|
|
{
|
|
|
|
entry.Initialize();
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
2022-06-05 18:44:51 +00:00
|
|
|
|
|
|
|
// We have run successfully.
|
|
|
|
this.isSetup = true;
|
|
|
|
|
|
|
|
return true;
|
2021-09-07 05:15:45 +00:00
|
|
|
}
|
|
|
|
}
|