feat: implemented watch mode

This commit is contained in:
D. Moonfire 2023-08-02 22:26:27 -05:00
parent 3ddb2fd236
commit 77ed31f12b
14 changed files with 240 additions and 46 deletions

View file

@ -1,11 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MfGames.Gallium;
using MfGames.Nitride.IO;
using MfGames.Nitride.IO.Contents;
using MfGames.Nitride.Pipelines;
using Serilog;
@ -13,21 +14,23 @@ using Zio;
namespace NitridePipelines;
public class InputPipeline1 : PipelineBase
public class InputPipeline1 : FileSystemWatchablePipelineBase
{
private readonly ILogger logger;
private readonly ReadFiles readFiles;
public InputPipeline1(
ILogger logger,
IFileSystem fileSystem,
ReadFiles readFiles)
: base(logger, fileSystem)
{
this.logger = logger.ForContext<InputPipeline1>();
this.readFiles = readFiles
.WithPattern("/input/input1/*.txt");
}
/// <inheritdoc />
protected override UPath WatchPath => "/input/input1";
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> _,
@ -40,7 +43,7 @@ public class InputPipeline1 : PipelineBase
{
Task.Delay(1000, cancellationToken).Wait(cancellationToken);
this.logger.Information(
this.Logger.Information(
"Read {Value}",
entity.Get<UPath>());

View file

@ -4,8 +4,8 @@ using System.Threading;
using System.Threading.Tasks;
using MfGames.Gallium;
using MfGames.Nitride.IO;
using MfGames.Nitride.IO.Contents;
using MfGames.Nitride.Pipelines;
using Serilog;
@ -13,7 +13,7 @@ using Zio;
namespace NitridePipelines;
public class InputPipeline2 : PipelineBase
public class InputPipeline2 : FileSystemWatchablePipelineBase
{
private readonly ILogger logger;
@ -21,13 +21,18 @@ public class InputPipeline2 : PipelineBase
public InputPipeline2(
ILogger logger,
IFileSystem fileSystem,
ReadFiles readFiles)
: base(logger, fileSystem)
{
this.logger = logger.ForContext<InputPipeline2>();
this.readFiles = readFiles
.WithPattern("/input/input2/*.txt");
}
/// <inheritdoc />
protected override UPath WatchPath => "/input/input2";
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> _,

View file

@ -0,0 +1 @@
File 101

View file

@ -0,0 +1 @@
File 201

View file

@ -0,0 +1 @@
File 202

View file

@ -0,0 +1,86 @@
using MfGames.Nitride.Pipelines;
using Serilog;
using Zio;
namespace MfGames.Nitride.IO;
/// <summary>
/// A watchable pipeline base that has all the plumbing to handle watching
/// the virtual file system.
/// </summary>
public abstract class FileSystemWatchablePipelineBase
: WatchablePipelineBase, IDisposable
{
private Action? watchAction;
private IFileSystemWatcher? watcher;
protected FileSystemWatchablePipelineBase(
ILogger logger,
IFileSystem fileSystem)
{
this.Logger = logger.ForContext(this.GetType());
this.FileSystem = fileSystem;
}
/// <summary>
/// Gets the file system associated with the pipeline.
/// </summary>
protected IFileSystem FileSystem { get; }
/// <summary>
/// Gets the logger associated with the pipeline.
/// </summary>
protected ILogger Logger { get; }
/// <summary>
/// Gets the path to watch for changes.
/// </summary>
protected abstract UPath WatchPath { get; }
/// <inheritdoc />
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
/// <inheritdoc />
public override void Watch(Action onChange)
{
// If we already have a watcher, then unwind that one.
if (this.watcher != null && this.watchAction != null)
{
this.watcher.Created -= this.OnFileSystemChange;
this.watcher.Changed -= this.OnFileSystemChange;
this.watcher.Deleted -= this.OnFileSystemChange;
this.watcher.Renamed -= this.OnFileSystemChange;
this.watcher.Dispose();
}
// Create a new watcher and wire up various events.
this.watchAction = onChange;
this.watcher = this.FileSystem.Watch(this.WatchPath);
this.watcher.IncludeSubdirectories = true;
this.watcher.EnableRaisingEvents = true;
this.watcher.Created += this.OnFileSystemChange;
this.watcher.Changed += this.OnFileSystemChange;
this.watcher.Deleted += this.OnFileSystemChange;
this.watcher.Renamed += this.OnFileSystemChange;
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.watcher?.Dispose();
}
}
private void OnFileSystemChange(object? sender, FileChangedEventArgs e)
{
this.watchAction?.Invoke();
}
}

View file

@ -50,6 +50,8 @@ public class BuildCommand : Command, ICommandHandler
// Get the cancellation token so we can be interrupted.
CancellationToken cancellationToken = context.GetCancellationToken();
this.pipelines.CancellationToken = cancellationToken;
// Process any injected options.
this.logger.Debug(
"Processing {Count:N0} pipeline options",
@ -67,7 +69,7 @@ public class BuildCommand : Command, ICommandHandler
this.logger.Information("Running pipelines");
int pipelinesResults = await this.pipelines.RunAsync(cancellationToken);
int pipelinesResults = await this.pipelines.RunAsync();
this.logger.Information(
"Command took {Elapsed}",

View file

@ -51,22 +51,26 @@ public class WatchCommand : Command, ICommandHandler
// Get the cancellation token so we can be interrupted.
CancellationToken cancellationToken = context.GetCancellationToken();
this.pipelines.CancellationToken = cancellationToken;
// Process any injected options.
this.logger.Debug(
"Processing {Count:N0} pipeline options",
this.pipelineOptions.Count);
foreach (IPipelineCommandOption? option in this.pipelineOptions)
foreach (IPipelineCommandOption option in this.pipelineOptions)
{
this.logger.Verbose("Processing pipeline option: {Option}", option);
option.Handle(context);
}
// Go through and start watching on all the files.
this.pipelines.Watch();
// This duplicates the "build" command with the initial build.
if (await this.RunPipelinesAsync(
"Running initial pipelines",
"Initial build took {Elapsed}",
cancellationToken))
"Initial build took {Elapsed}"))
{
return 1;
}
@ -91,15 +95,7 @@ public class WatchCommand : Command, ICommandHandler
// We have at least one stale pipeline, wait for the restart delay
// to catch any files have changed, then kick it off.
await Task.Delay(restartDelay, cancellationToken);
// Kick off another build.
if (await this.RunPipelinesAsync(
"Restarting pipelines",
"Restarted build took {Elapsed}",
cancellationToken))
{
return 1;
}
await this.pipelines.RestartAsync();
}
return 0;
@ -111,13 +107,12 @@ public class WatchCommand : Command, ICommandHandler
Justification = "Intentional format string brought in")]
private async Task<bool> RunPipelinesAsync(
string startMessage,
string finishMessage,
CancellationToken cancellationToken)
string finishMessage)
{
this.logger.Information(startMessage);
DateTime start = DateTime.UtcNow;
int pipelinesResults = await this.pipelines.RunAsync(cancellationToken);
int pipelinesResults = await this.pipelines.RunAsync();
this.logger.Information(
finishMessage,

View file

@ -0,0 +1,14 @@
namespace MfGames.Nitride.Pipelines;
/// <summary>
/// Indicates a pipeline that can be watched for changes.
/// </summary>
public interface IWatchablePipeline : IPipeline
{
/// <summary>
/// A trigger that indicates that the pipeline should start watching its
/// resources.
/// </summary>
/// <param name="onChange">The action to call when the inputs change.</param>
void Watch(Action onChange);
}

View file

@ -13,10 +13,10 @@ public class PipelineManager
{
private readonly ILogger logger;
private bool isSetup;
private readonly List<PipelineRunner> runners;
private bool isSetup;
public PipelineManager(
ILogger logger,
IEnumerable<IPipeline> pipelines,
@ -28,18 +28,29 @@ public class PipelineManager
.ToList();
}
public CancellationToken CancellationToken { get; set; }
/// <summary>
/// Gets a value indicating if any pipeline is stale.
/// </summary>
public bool IsStale => this.runners.Any(a => a.IsStale);
public async Task RestartAsync()
{
Task[] resetTasks = this.runners
.Select(runner => runner.ResetAsync())
.ToArray();
await Task.WhenAll(resetTasks);
await this.RunAsync();
}
/// <summary>
/// Runs all of the pipelines in the appropriate order while running
/// across multiple threads.
/// </summary>
/// <param name="cancellationToken">The token for cancelling processing.</param>
/// <returns>A task with zero for success or otherwise an error code.</returns>
public Task<int> RunAsync(CancellationToken cancellationToken)
public Task<int> RunAsync()
{
// Make sure everything is setup.
DateTime started = DateTime.UtcNow;
@ -58,8 +69,8 @@ public class PipelineManager
Task[] tasks = this.runners
.Select(
x => Task.Run(
async () => await x.RunAsync(cancellationToken),
cancellationToken))
async () => await x.StartAsync(this.CancellationToken),
this.CancellationToken))
.ToArray();
var report = TimeSpan.FromSeconds(15);
@ -111,6 +122,21 @@ public class PipelineManager
return Task.FromResult(hasErrors ? 2 : 0);
}
/// <summary>
/// Indicates that the system should start watching for changes.
/// </summary>
public void Watch()
{
var watchableList = this.runners
.Where(a => a.Pipeline is IWatchablePipeline)
.ToList();
foreach (PipelineRunner pipeline in watchableList)
{
pipeline.Watch();
}
}
/// <summary>
/// Performs the final initialization and preparation for the pipelines
/// and get them ready for deploying.

View file

@ -70,8 +70,7 @@ public class PipelineRunner
// Configure the state machine.
this.state =
new RunnerStateMachine(
PipelineRunnerState.Initialized);
new RunnerStateMachine(PipelineRunnerState.Initialized);
this.state.OnTransitioned(this.OnTransitioned);
@ -146,9 +145,15 @@ public class PipelineRunner
this.state
.Configure(PipelineRunnerState.Finalized)
.Permit(
.PermitIf(
PipelineRunnerTrigger.ResetPipeline,
PipelineRunnerState.Initialized);
PipelineRunnerState.Initialized,
() => this.IsStale)
.Ignore(
PipelineRunnerTrigger.StartPipeline)
.IgnoreIf(
PipelineRunnerTrigger.ResetPipeline,
() => !this.IsStale);
}
/// <summary>
@ -213,17 +218,16 @@ public class PipelineRunner
/// <summary>
/// Resets the internal state for running again. This also goes through
/// </summary>
public void Reset()
public async Task ResetAsync()
{
this.state.Fire(PipelineRunnerTrigger.ResetPipeline);
this.state.Fire(PipelineRunnerTrigger.StartPipeline);
await this.state.FireAsync(PipelineRunnerTrigger.ResetPipeline);
}
/// <summary>
/// Executes the pipeline, including waiting for any or all
/// dependencies.
/// </summary>
public async Task RunAsync(CancellationToken cancellationToken = default)
public async Task StartAsync(CancellationToken cancellationToken = default)
{
this.cancellation = cancellationToken;
await this.state.FireAsync(PipelineRunnerTrigger.StartPipeline);
@ -235,6 +239,18 @@ public class PipelineRunner
return $"PipelineRunner<{this.Pipeline}>";
}
/// <summary>
/// Indicates that the runner should start watching for changes on the
/// pipeline.
/// </summary>
public void Watch()
{
if (this.Pipeline is IWatchablePipeline watchable)
{
watchable.Watch(() => this.SetStale(PipelineRunnerStale.Watch));
}
}
/// <summary>
/// A method that tells the pipeline one of the outgoing pipelines has
/// completed consuming the output from this runner.
@ -369,6 +385,9 @@ public class PipelineRunner
this.started = DateTime.Now;
this.outputs = new List<Entity>();
this.IsStale = false;
this.outgoingBlock.Reset();
this.outgoingDone.Set();
this.waitingOnConsumers = this.Outgoing.Count;
}
private void OnTransitioned(RunnerStateMachine.Transition transition)
@ -406,6 +425,22 @@ public class PipelineRunner
this.outputs.AddRange(output);
}
private void SetStale(PipelineRunnerStale type)
{
if (!this.IsStale)
{
this.logger.Verbose(
"The pipeline was marked stale because of {Type}",
type);
this.IsStale = true;
}
foreach (PipelineRunner runner in this.Outgoing)
{
runner.SetStale(PipelineRunnerStale.Dependency);
}
}
/// <summary>
/// A method to block the call until this runner is done processing and
/// is ready to provide output.

View file

@ -0,0 +1,14 @@
namespace MfGames.Nitride.Pipelines;
public enum PipelineRunnerStale
{
/// <summary>
/// Indicates that the stale was triggered by a watch.
/// </summary>
Watch,
/// <summary>
/// Indicates that the stale was triggered by a dependency.
/// </summary>
Dependency,
}

View file

@ -0,0 +1,10 @@
namespace MfGames.Nitride.Pipelines;
/// <summary>
/// A base class that allows for the registration for watching.
/// </summary>
public abstract class WatchablePipelineBase : PipelineBase, IWatchablePipeline
{
/// <inheritdoc />
public abstract void Watch(Action onChange);
}

View file

@ -17,6 +17,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="7.0.1" />
<PackageReference Include="GitVersion.MSBuild" Version="5.12.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
@ -33,14 +34,14 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Serilog" Version="3.0.1"/>
<PackageReference Include="Serilog.Enrichers.Demystifier" Version="1.0.2"/>
<PackageReference Include="Serilog.Exceptions" Version="8.4.0"/>
<PackageReference Include="Serilog.Extensions.Autofac.DependencyInjection" Version="5.0.0"/>
<PackageReference Include="Serilog.Extensions.Hosting" Version="7.0.0"/>
<PackageReference Include="Serilog.Sinks.Xunit" Version="3.0.5"/>
<PackageReference Include="SerilogAnalyzer" Version="0.15.0"/>
<PackageReference Include="xunit" Version="2.5.0"/>
<PackageReference Include="Serilog" Version="3.0.1" />
<PackageReference Include="Serilog.Enrichers.Demystifier" Version="1.0.2" />
<PackageReference Include="Serilog.Exceptions" Version="8.4.0" />
<PackageReference Include="Serilog.Extensions.Autofac.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="Serilog.Sinks.Xunit" Version="3.0.5" />
<PackageReference Include="SerilogAnalyzer" Version="0.15.0" />
<PackageReference Include="xunit" Version="2.5.0" />
</ItemGroup>
</Project>