using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Humanizer;
using Serilog;
namespace Nitride.Pipelines;
///
/// A manager class for all of the pipelines. This class is responsible for
/// hooking everything up and handling the ordering of pipelines as they
/// are run.
///
public class PipelineManager
{
private readonly PipelineRunner.Factory createEntry;
private readonly ILogger logger;
private List entries;
private bool isSetup;
private ICollection pipelines;
public PipelineManager(
ILogger logger,
IEnumerable pipelines,
PipelineRunner.Factory createEntry)
{
this.createEntry = createEntry;
this.logger = logger.ForContext();
this.pipelines = new HashSet(pipelines);
this.entries = null!;
}
public ICollection Pipelines
{
get => this.pipelines;
set => this.pipelines = value ?? throw new ArgumentNullException(nameof(value));
}
///
/// Runs all of the pipelines in the appropriate order while running
/// across multiple threads.
///
/// A task with zero for success or otherwise an error code.
public Task RunAsync()
{
// Make sure everything is setup.
DateTime started = DateTime.UtcNow;
if (!this.Setup())
{
return Task.FromResult(1);
}
// Go through all the entries and start each one. We gather the
// resulting tasks and then wait for all of them to end.
this.logger.Verbose("Starting {Count:l}", "pipeline".ToQuantity(this.pipelines.Count));
Task[] tasks = this.entries.Select(x => Task.Run(async () => await x.RunAsync()))
.ToArray();
var report = TimeSpan.FromSeconds(15);
while (!Task.WaitAll(tasks, report))
{
var waiting = this.entries.Where(x => !x.IsFinished)
.ToList();
this.logger.Debug("Waiting for {Count:l} to finish running", "pipeline".ToQuantity(waiting.Count));
IOrderedEnumerable> states =
waiting.GroupBy(x => x.State, x => x)
.OrderBy(x => (int)x.Key);
foreach (IGrouping? state in states)
{
var statePipelines = state.OrderBy(x => x.Pipeline.ToString())
.ToList();
this.logger.Verbose(
"Waiting for {Count:l} in {State}: {List:l}",
"pipeline".ToQuantity(statePipelines.Count),
state.Key,
string.Join(
", ",
state.Key == PipelineRunnerState.Started
? statePipelines.Select(x => $"{x.Pipeline} ({x.ElapsedFromState})")
: statePipelines.Select(x => x.Pipeline.ToString())));
}
}
// Figure out our return code.
bool hasErrors = this.entries.Any(x => x.State == PipelineRunnerState.Errored);
this.logger.Information("Completed in {Elapsed}", DateTime.UtcNow - started);
return Task.FromResult(hasErrors ? 2 : 0);
}
///
/// Performs the final initialization and preparation for the pipelines
/// and get them ready for deploying.
///
private bool Setup()
{
// If we've already set up ourselves, then we do nothing.
if (this.isSetup)
{
return true;
}
// If we don't have any pipelines, then we can't process.
if (this.pipelines.Count == 0)
{
this.logger.Error(
"There are no registered pipelines run, use" + " ConfigureContainer to include IPipeline instances");
return false;
}
this.logger.Verbose("Setting up {Count:l}", "pipeline".ToQuantity(this.pipelines.Count));
// Wrap all the pipelines into entries. We do this before the next
// step so we can have the entries depend on the entries.
this.entries = this.pipelines.Select(x => this.createEntry(x))
.ToList();
// Go through and connect the pipelines together.
foreach (PipelineRunner? entry in this.entries)
{
var dependencies = entry.Pipeline.GetDependencies()
.ToList();
foreach (IPipeline? dependency in dependencies)
{
// Get the entry for the dependency.
PipelineRunner dependencyPipeline = this.entries.Single(x => x.Pipeline == dependency);
// Set up the bi-directional connection.
entry.Incoming.Add(dependencyPipeline);
dependencyPipeline.Outgoing.Add(entry);
}
}
// Loop through all the entries and tell them we are done providing
// and they can set up internal threads other structures.
foreach (PipelineRunner? entry in this.entries)
{
entry.Initialize();
}
// We have run successfully.
this.isSetup = true;
return true;
}
}