feat!: added cancellation token support to pipelines and operations
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful

This commit is contained in:
D. Moonfire 2023-01-17 19:24:09 -06:00
parent 08aafb144c
commit 2892ec3445
62 changed files with 386 additions and 255 deletions

View file

@ -1,5 +1,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride; using MfGames.Nitride;
@ -43,7 +44,9 @@ public class CopyFilesPipeline : PipelineBase
} }
/// <inheritdoc /> /// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(IEnumerable<Entity> _) public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> _,
CancellationToken cancellationToken = default)
{ {
// We don't care about the incoming entities which means we can // We don't care about the incoming entities which means we can
// ignore them and use the entities from the ReadFiles operation // ignore them and use the entities from the ReadFiles operation
@ -73,8 +76,9 @@ public class CopyFilesPipeline : PipelineBase
// read. Coming out of this, we will have one entity that fulfills: // read. Coming out of this, we will have one entity that fulfills:
// //
// entity.Get<UPath> == "/output/a.txt" // entity.Get<UPath> == "/output/a.txt"
entities = entities.Run(this.removePathPrefix) entities = entities
.Run(this.addPathPrefix); .Run(this.removePathPrefix, cancellationToken)
.Run(this.addPathPrefix, cancellationToken);
// Then we write out the files to the output. First we make sure we // Then we write out the files to the output. First we make sure we
// clear out the output. This operation performs an action when it // clear out the output. This operation performs an action when it
@ -95,8 +99,9 @@ public class CopyFilesPipeline : PipelineBase
// The third way is to use an extension on entities which lets us // The third way is to use an extension on entities which lets us
// chain calls, ala Gulp's pipelines. The below code does this along // chain calls, ala Gulp's pipelines. The below code does this along
// with writing the files to the output. // with writing the files to the output.
entities = entities.Run(this.clearDirectory) entities = entities
.Run(this.writeFiles); .Run(this.clearDirectory, cancellationToken)
.Run(this.writeFiles, cancellationToken);
// If we are chaining this pipeline into another, we return the // If we are chaining this pipeline into another, we return the
// entities. Otherwise, we can just return an empty list. The // entities. Otherwise, we can just return an empty list. The

View file

@ -27,8 +27,9 @@ public class CopyFilesTest : NitrideTestBase
public async Task Run() public async Task Run()
{ {
// Figure out the paths for this test. // Figure out the paths for this test.
DirectoryInfo rootDir = DirectoryInfo rootDir = typeof(CopyFilesProgram)
typeof(CopyFilesProgram).GetDirectory()!.FindGitRoot()! .GetDirectory()
!.FindGitRoot()!
.GetDirectory("examples/CopyFiles"); .GetDirectory("examples/CopyFiles");
DirectoryInfo outputDir = rootDir.GetDirectory("output"); DirectoryInfo outputDir = rootDir.GetDirectory("output");

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -57,7 +58,9 @@ public partial class CreateCalender : OperationBase
public UPath? Path { get; set; } public UPath? Path { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -93,7 +94,9 @@ public partial class CreateAtomFeed : OperationBase
public Func<Entity, Uri>? GetUrl { get; set; } public Func<Entity, Uri>? GetUrl { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -43,7 +43,8 @@ public class SingletonComponentSourceGenerator
// Create the namespace. // Create the namespace.
SyntaxToken cls = cds.Identifier; SyntaxToken cls = cds.Identifier;
buffer.AppendLine(string.Join( buffer.AppendLine(
string.Join(
"\n", "\n",
$"using MfGames.Gallium;", $"using MfGames.Gallium;",
$"", $"",

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -46,7 +47,9 @@ public partial class ApplyStyleTemplate : OperationBase
public IHandlebars? Handlebars { get; set; } public IHandlebars? Handlebars { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Make sure we have sane data. // Make sure we have sane data.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -27,7 +28,9 @@ public partial class IdentifyHandlebarsFromComponent : IOperation
public Func<Entity, bool> HasHandlebarsTest { get; set; } = null!; public Func<Entity, bool> HasHandlebarsTest { get; set; } = null!;
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride.Contents; using MfGames.Nitride.Contents;
@ -12,7 +13,9 @@ namespace MfGames.Nitride.Handlebars;
public class IdentifyHandlebarsFromContent : IOperation public class IdentifyHandlebarsFromContent : IOperation
{ {
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
return input.SelectEntity<ITextContent>(this.ScanContent); return input.SelectEntity<ITextContent>(this.ScanContent);
} }

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -39,7 +40,9 @@ public partial class RenderContentTemplate : OperationBase
public Func<Entity, object>? CreateModelCallback { get; set; } public Func<Entity, object>? CreateModelCallback { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Net; using System.Net;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride.Contents; using MfGames.Nitride.Contents;
@ -13,7 +14,9 @@ namespace MfGames.Nitride.Html;
public class ConvertHtmlEntitiesToUnicode : OperationBase public class ConvertHtmlEntitiesToUnicode : OperationBase
{ {
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
return input.SelectEntity<ITextContent>(this.ResolveHtmlEntities); return input.SelectEntity<ITextContent>(this.ResolveHtmlEntities);
} }

View file

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading;
using DotNet.Globbing; using DotNet.Globbing;
@ -43,9 +44,10 @@ public partial class ReadFiles : FileSystemOperationBase
/// minimatch pattern (as defined by DotNet.Blob). /// minimatch pattern (as defined by DotNet.Blob).
/// </summary> /// </summary>
/// <returns>A populated collection of entities.</returns> /// <returns>A populated collection of entities.</returns>
public IEnumerable<Entity> Run() public IEnumerable<Entity> Run(
CancellationToken cancellationToken = default)
{ {
return this.Run(Array.Empty<Entity>()); return this.Run(Array.Empty<Entity>(), cancellationToken);
} }
/// <summary> /// <summary>
@ -53,7 +55,9 @@ public partial class ReadFiles : FileSystemOperationBase
/// minimatch pattern (as defined by DotNet.Blob). /// minimatch pattern (as defined by DotNet.Blob).
/// </summary> /// </summary>
/// <returns>A populated collection of entities.</returns> /// <returns>A populated collection of entities.</returns>
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -63,8 +64,11 @@ public partial class WriteFiles : FileSystemOperationBase, IOperation
/// a path and a registered writer will be written. /// a path and a registered writer will be written.
/// </summary> /// </summary>
/// <param name="entities">The entities to parse.</param> /// <param name="entities">The entities to parse.</param>
/// <param name="cancellationToken"></param>
/// <returns>The same list of entities without changes.</returns> /// <returns>The same list of entities without changes.</returns>
public override IEnumerable<Entity> Run(IEnumerable<Entity> entities) public override IEnumerable<Entity> Run(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -40,13 +41,16 @@ public partial class ClearDirectory : FileSystemOperationBase, IOperation
/// </summary> /// </summary>
public UPath? Path { get; set; } public UPath? Path { get; set; }
public IEnumerable<Entity> Run() public IEnumerable<Entity> Run(
CancellationToken cancellationToken = default)
{ {
return this.Run(new List<Entity>()); return this.Run(new List<Entity>(), cancellationToken);
} }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -16,5 +17,7 @@ public abstract class FileSystemOperationBase : IOperation
public IFileSystem FileSystem { get; set; } public IFileSystem FileSystem { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public abstract IEnumerable<Entity> Run(IEnumerable<Entity> input); public abstract IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default);
} }

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -29,7 +30,9 @@ public partial class AddPathPrefix : OperationBase
/// </summary> /// </summary>
public UPath? PathPrefix { get; set; } public UPath? PathPrefix { get; set; }
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -32,7 +33,9 @@ public partial class ChangePathExtension : IOperation
/// </summary> /// </summary>
public string? Extension { get; set; } public string? Extension { get; set; }
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -29,7 +30,9 @@ public partial class LinkDirectChildren : CreateOrUpdateIndex
} }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
if (this.Scanner != null!) if (this.Scanner != null!)
{ {

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -53,7 +54,9 @@ public partial class MoveToIndexPath : OperationBase
}; };
} }
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -32,7 +33,9 @@ public partial class RemovePathPrefix : IOperation
/// </summary> /// </summary>
public UPath PathPrefix { get; set; } public UPath PathPrefix { get; set; }
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -36,8 +37,11 @@ public partial class ReplacePath : IOperation
/// will be updated, the others will be passed on as-is. /// will be updated, the others will be passed on as-is.
/// </summary> /// </summary>
/// <param name="input">The list of input entities.</param> /// <param name="input">The list of input entities.</param>
/// <param name="cancellationToken"></param>
/// <returns>The output entities.</returns> /// <returns>The output entities.</returns>
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -28,7 +29,9 @@ public abstract partial class ConvertMarkdownToBase : IOperation
public Action<MarkdownPipelineBuilder>? ConfigureMarkdown { get; set; } public Action<MarkdownPipelineBuilder>? ConfigureMarkdown { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Validate the inputs. // Validate the inputs.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -29,7 +30,9 @@ public partial class IdentifyMarkdown : IOperation
public Func<Entity, UPath, bool> IsMarkdownTest { get; set; } = null!; public Func<Entity, UPath, bool> IsMarkdownTest { get; set; } = null!;
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride.Contents; using MfGames.Nitride.Contents;
@ -23,7 +24,9 @@ public class MakeSingleLinkListItems : IOperation
} }
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<Entity> Run(IEnumerable<Entity> input) public IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
return input return input
.SelectManyEntity<IsMarkdown>( .SelectManyEntity<IsMarkdown>(

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -39,7 +40,9 @@ public partial class ApplySchedules : OperationBase
public Timekeeper Timekeeper { get; set; } public Timekeeper Timekeeper { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -53,7 +54,9 @@ public partial class CreateDateIndexes : OperationBase, IResolvingOperation
public Timekeeper Timekeeper { get; } public Timekeeper Timekeeper { get; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Validate our input. // Validate our input.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -28,7 +29,9 @@ public partial class FilterOutExpiredInstant : OperationBase
public Timekeeper Timekeeper { get; set; } public Timekeeper Timekeeper { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride.Generators; using MfGames.Nitride.Generators;
@ -22,7 +23,9 @@ public partial class FilterOutFutureInstant : OperationBase
public Timekeeper Timekeeper { get; set; } public Timekeeper Timekeeper { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
Instant now = this.Timekeeper.Clock.GetCurrentInstant(); Instant now = this.Timekeeper.Clock.GetCurrentInstant();

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -39,7 +40,9 @@ public class SetInstantFromComponent<TComponent> : OperationBase
} }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -43,7 +44,9 @@ public partial class SetInstantFromPath : OperationBase
public Regex? PathRegex { get; set; } public Regex? PathRegex { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Text; using System.Text;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
using MfGames.Nitride.Contents; using MfGames.Nitride.Contents;
@ -44,7 +45,9 @@ public class ParseYamlHeader<TModel> : OperationBase
private bool RemoveHeader { get; set; } private bool RemoveHeader { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Set up the YAML parsing. // Set up the YAML parsing.
DeserializerBuilder builder = DeserializerBuilder builder =

View file

@ -1,6 +1,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.CommandLine; using System.CommandLine;
using System.CommandLine.Invocation; using System.CommandLine.Invocation;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MfGames.Nitride.Pipelines; using MfGames.Nitride.Pipelines;
@ -48,6 +49,9 @@ public class BuildCommand : Command, ICommandHandler
/// <inheritdoc /> /// <inheritdoc />
public async Task<int> InvokeAsync(InvocationContext context) public async Task<int> InvokeAsync(InvocationContext context)
{ {
// Get the cancellation token so we can be interrupted.
CancellationToken cancellationToken = context.GetCancellationToken();
// Process any injected options. // Process any injected options.
this.logger.Debug( this.logger.Debug(
"Processing {Count:N0} pipeline options", "Processing {Count:N0} pipeline options",
@ -63,7 +67,7 @@ public class BuildCommand : Command, ICommandHandler
// all the pipelines once and then quits when it finishes. // all the pipelines once and then quits when it finishes.
this.logger.Information("Running pipelines"); this.logger.Information("Running pipelines");
int pipelinesResults = await this.pipelines.RunAsync(); int pipelinesResults = await this.pipelines.RunAsync(cancellationToken);
return pipelinesResults; return pipelinesResults;
} }

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -67,7 +68,9 @@ public partial class CreateOrUpdateIndex : OperationBase
} = null!; } = null!;
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Make sure we have sane data. // Make sure we have sane data.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -92,7 +93,9 @@ public partial class EntityScanner : OperationBase
} }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Make sure we have sane data. // Make sure we have sane data.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using FluentValidation; using FluentValidation;
@ -46,7 +47,9 @@ public partial class LinkEntitySequence : OperationBase, IResolvingOperation
} }
/// <inheritdoc /> /// <inheritdoc />
public override IEnumerable<Entity> Run(IEnumerable<Entity> input) public override IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default)
{ {
// Make sure everything is good. // Make sure everything is good.
this.validator.ValidateAndThrow(this); this.validator.ValidateAndThrow(this);

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -10,6 +11,9 @@ public interface IOperation
/// Runs the input entities through the operation and returns the results. /// Runs the input entities through the operation and returns the results.
/// </summary> /// </summary>
/// <param name="input"></param> /// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns> /// <returns></returns>
IEnumerable<Entity> Run(IEnumerable<Entity> input); IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default);
} }

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -15,11 +16,13 @@ public static class NitrideOperationExtensions
/// </summary> /// </summary>
/// <param name="input">The entities to perform the operation against.</param> /// <param name="input">The entities to perform the operation against.</param>
/// <param name="operation">The operation to run.</param> /// <param name="operation">The operation to run.</param>
/// <param name="cancellationToken">The cancellation token of the request.</param>
/// <returns>The results of the operation.</returns> /// <returns>The results of the operation.</returns>
public static IEnumerable<Entity> Run( public static IEnumerable<Entity> Run(
this IEnumerable<Entity> input, this IEnumerable<Entity> input,
IOperation operation) IOperation operation,
CancellationToken cancellationToken = default)
{ {
return operation.Run(input); return operation.Run(input, cancellationToken);
} }
} }

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -10,5 +11,7 @@ namespace MfGames.Nitride;
public abstract class OperationBase : IOperation public abstract class OperationBase : IOperation
{ {
/// <inheritdoc /> /// <inheritdoc />
public abstract IEnumerable<Entity> Run(IEnumerable<Entity> input); public abstract IEnumerable<Entity> Run(
IEnumerable<Entity> input,
CancellationToken cancellationToken = default);
} }

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -22,6 +23,9 @@ public interface IPipeline
/// entities. /// entities.
/// </summary> /// </summary>
/// <param name="entities">The entities to process.</param> /// <param name="entities">The entities to process.</param>
/// <param name="cancellationToken">The token for cancelling processing.</param>
/// <returns>The resulting entities after the process runs.</returns> /// <returns>The resulting entities after the process runs.</returns>
IAsyncEnumerable<Entity> RunAsync(IEnumerable<Entity> entities); IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default);
} }

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using MfGames.Gallium; using MfGames.Gallium;
@ -41,12 +42,12 @@ public abstract class PipelineBase : IPipeline
/// <inheritdoc /> /// <inheritdoc />
public abstract IAsyncEnumerable<Entity> RunAsync( public abstract IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities); IEnumerable<Entity> entities,
CancellationToken cancellationToken = default);
/// <inheritdoc /> /// <inheritdoc />
public override string ToString() public override string ToString()
{ {
return this.GetType() return this.GetType().Name;
.Name;
} }
} }

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Humanizer; using Humanizer;
@ -48,8 +49,9 @@ public class PipelineManager
/// Runs all of the pipelines in the appropriate order while running /// Runs all of the pipelines in the appropriate order while running
/// across multiple threads. /// across multiple threads.
/// </summary> /// </summary>
/// <param name="cancellationToken">The token for cancelling processing.</param>
/// <returns>A task with zero for success or otherwise an error code.</returns> /// <returns>A task with zero for success or otherwise an error code.</returns>
public Task<int> RunAsync() public Task<int> RunAsync(CancellationToken cancellationToken)
{ {
// Make sure everything is setup. // Make sure everything is setup.
DateTime started = DateTime.UtcNow; DateTime started = DateTime.UtcNow;
@ -66,14 +68,18 @@ public class PipelineManager
"pipeline".ToQuantity(this.pipelines.Count)); "pipeline".ToQuantity(this.pipelines.Count));
Task[] tasks = this.entries Task[] tasks = this.entries
.Select(x => Task.Run(async () => await x.RunAsync())) .Select(
x => Task.Run(
async () => await x.RunAsync(cancellationToken),
cancellationToken))
.ToArray(); .ToArray();
var report = TimeSpan.FromSeconds(15); var report = TimeSpan.FromSeconds(15);
while (!Task.WaitAll(tasks, report)) while (!Task.WaitAll(tasks, report))
{ {
var waiting = this.entries.Where(x => !x.IsFinished) var waiting = this.entries
.Where(x => !x.IsFinished)
.ToList(); .ToList();
this.logger.Debug( this.logger.Debug(
@ -81,14 +87,15 @@ public class PipelineManager
"pipeline".ToQuantity(waiting.Count)); "pipeline".ToQuantity(waiting.Count));
IOrderedEnumerable<IGrouping<PipelineRunnerState, PipelineRunner>> IOrderedEnumerable<IGrouping<PipelineRunnerState, PipelineRunner>>
states = states = waiting
waiting.GroupBy(x => x.State, x => x) .GroupBy(x => x.State, x => x)
.OrderBy(x => (int)x.Key); .OrderBy(x => (int)x.Key);
foreach (IGrouping<PipelineRunnerState, PipelineRunner>? state in foreach (IGrouping<PipelineRunnerState, PipelineRunner>? state in
states) states)
{ {
var statePipelines = state.OrderBy(x => x.Pipeline.ToString()) var statePipelines = state
.OrderBy(x => x.Pipeline.ToString())
.ToList(); .ToList();
this.logger.Verbose( this.logger.Verbose(
@ -106,8 +113,8 @@ public class PipelineManager
} }
// Figure out our return code. // Figure out our return code.
bool hasErrors = bool hasErrors = this.entries
this.entries.Any(x => x.State == PipelineRunnerState.Errored); .Any(x => x.State == PipelineRunnerState.Errored);
this.logger.Information( this.logger.Information(
"Completed in {Elapsed}", "Completed in {Elapsed}",
@ -144,20 +151,22 @@ public class PipelineManager
// Wrap all the pipelines into entries. We do this before the next // Wrap all the pipelines into entries. We do this before the next
// step so we can have the entries depend on the entries. // step so we can have the entries depend on the entries.
this.entries = this.pipelines.Select(x => this.createEntry(x)) this.entries = this.pipelines
.Select(x => this.createEntry(x))
.ToList(); .ToList();
// Go through and connect the pipelines together. // Go through and connect the pipelines together.
foreach (PipelineRunner? entry in this.entries) foreach (PipelineRunner? entry in this.entries)
{ {
var dependencies = entry.Pipeline.GetDependencies() var dependencies = entry.Pipeline
.GetDependencies()
.ToList(); .ToList();
foreach (IPipeline? dependency in dependencies) foreach (IPipeline? dependency in dependencies)
{ {
// Get the entry for the dependency. // Get the entry for the dependency.
PipelineRunner dependencyPipeline = PipelineRunner dependencyPipeline = this.entries
this.entries.Single(x => x.Pipeline == dependency); .Single(x => x.Pipeline == dependency);
// Set up the bi-directional connection. // Set up the bi-directional connection.
entry.Incoming.Add(dependencyPipeline); entry.Incoming.Add(dependencyPipeline);

View file

@ -149,7 +149,7 @@ public class PipelineRunner
/// Executes the pipeline, including waiting for any or all /// Executes the pipeline, including waiting for any or all
/// dependencies. /// dependencies.
/// </summary> /// </summary>
public async Task RunAsync() public async Task RunAsync(CancellationToken cancellationToken = default)
{ {
try try
{ {
@ -194,7 +194,7 @@ public class PipelineRunner
// Run the pipeline. This may not be resolved until we gather // Run the pipeline. This may not be resolved until we gather
// the output below. // the output below.
await this.RunPipeline(input); await this.RunPipeline(input, cancellationToken);
// At this point, we are completely done with our inputs, so signal // At this point, we are completely done with our inputs, so signal
// to them in case they have to clean up any of their structures. // to them in case they have to clean up any of their structures.
@ -274,12 +274,14 @@ public class PipelineRunner
return input; return input;
} }
private async Task RunPipeline(List<Entity> input) private async Task RunPipeline(
List<Entity> input,
CancellationToken cancellationToken)
{ {
// Get the sequence of data, but this doesn't drain the enumeration. // Get the sequence of data, but this doesn't drain the enumeration.
List<Entity> output = await this.Pipeline List<Entity> output = await this.Pipeline
.RunAsync(input) .RunAsync(input, cancellationToken)
.ToListAsync(); .ToListAsync(cancellationToken);
// Gather all the output. // Gather all the output.
this.logger.Verbose("{Pipeline:l}: Gathering output", this.Pipeline); this.logger.Verbose("{Pipeline:l}: Gathering output", this.Pipeline);