fix: exceptions in pipeline no longer hang Nitride

This commit is contained in:
D. Moonfire 2024-03-07 22:16:08 -06:00
parent 76d52d9a54
commit d9fb34291e
5 changed files with 138 additions and 4 deletions

View file

@ -147,11 +147,11 @@ public class PipelineManager
// Go through and connect the pipelines together using the dependencies
// that were built through the constructors of the pipelines and then
// registered with `AddDependency`.
foreach (PipelineRunner? entry in this.runners)
foreach (PipelineRunner entry in this.runners)
{
var dependencies = entry.Pipeline.GetDependencies().ToList();
foreach (IPipeline? dependency in dependencies)
foreach (IPipeline dependency in dependencies)
{
// Get the entry for the dependency.
PipelineRunner dependencyPipeline = this.runners.Single(x =>

View file

@ -132,6 +132,11 @@ public class PipelineRunner
)
.Ignore(PipelineRunnerTrigger.StartPipeline)
.IgnoreIf(PipelineRunnerTrigger.ResetPipeline, () => !this.IsStale);
this.state.Configure(PipelineRunnerState.Errored)
.OnEntry(this.OnEnterErrored)
.Permit(PipelineRunnerTrigger.ResetPipeline, PipelineRunnerState.Initialized)
.Ignore(PipelineRunnerTrigger.StartPipeline);
}
/// <summary>
@ -258,7 +263,7 @@ public class PipelineRunner
this.observers.OnGetInputsStarted(this);
// Gather all the entities from the dependencies into a single list.
var input = this.Incoming.SelectMany(x => x.GetOutputs()).ToList();
List<Entity> input = this.Incoming.SelectMany(x => x.GetOutputs()).ToList();
this.observers.OnGetInputsFinished(this, input.Count);
@ -275,6 +280,14 @@ public class PipelineRunner
return this.outputs;
}
/// <summary>
/// Called when the pipeline errors out.
/// </summary>
private void OnEnterErrored()
{
this.outgoingBlock.Set();
}
private void OnEnterProvidingOutput()
{
// Make sure our internal wait for the consumers it set.

View file

@ -33,13 +33,20 @@ public abstract class TestBase<TContext>
protected ITestOutputHelper Output { get; }
protected virtual TContext CreateContext()
protected TContext CreateContext()
{
var context = new TContext();
context.SetLogger(this.Logger);
context.ConfigureContainer();
this.SetupContext(context);
return context;
}
/// <summary>
/// Perform any additional setup methods for the context.
/// </summary>
/// <param name="context">The context to configure.</param>
protected virtual void SetupContext(TContext context) { }
}

View file

@ -0,0 +1,113 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using MfGames.Gallium;
using MfGames.Nitride.Pipelines;
using MfGames.TestSetup;
using Xunit;
using Xunit.Abstractions;
namespace MfGames.Nitride.Tests.Entities;
public class ExceptionPipelineTests : TestBase<ExceptionPipelineTests.TestContext>, IDisposable
{
private readonly NitrideTestContext context;
public ExceptionPipelineTests(ITestOutputHelper output)
: base(output)
{
this.context = this.CreateContext();
}
/// <inheritdoc />
public void Dispose()
{
this.context.Dispose();
}
[Fact]
public async Task PipelineWithExceptionAsync()
{
var pipeline1 = this.context.Resolve<Pipeline1>();
var pipeline2 = this.context.Resolve<Pipeline2>();
var pipelineManager = this.context.Resolve<PipelineManager>();
pipeline1.TriggerException = true;
await pipelineManager.RunAsync();
Assert.False(pipeline2.Triggered);
}
[Fact]
public async Task PipelineWithoutExceptionAsync()
{
var pipeline1 = this.context.Resolve<Pipeline1>();
var pipeline2 = this.context.Resolve<Pipeline2>();
var pipelineManager = this.context.Resolve<PipelineManager>();
pipeline1.TriggerException = false;
await pipelineManager.RunAsync();
Assert.True(pipeline2.Triggered);
}
public class TestContext : NitrideTestContext
{
/// <inheritdoc />
protected override void ConfigureContainer(ContainerBuilder builder)
{
base.ConfigureContainer(builder);
builder.RegisterType<Pipeline1>().AsSelf().AsImplementedInterfaces().SingleInstance();
builder.RegisterType<Pipeline2>().AsSelf().AsImplementedInterfaces().SingleInstance();
}
}
private class Pipeline1 : PipelineBase
{
/// <summary>
/// Gets or sets a value indicating whether the pipeline will thrown
/// an exception.
/// </summary>
public bool TriggerException { get; set; }
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default
)
{
if (this.TriggerException)
{
throw new Exception("Pipeline failed");
}
return Array.Empty<Entity>().ToAsyncEnumerable();
}
}
private class Pipeline2 : PipelineBase
{
public Pipeline2(Pipeline1 dependency)
{
this.AddDependency(dependency);
}
public bool Triggered { get; set; }
/// <inheritdoc />
public override IAsyncEnumerable<Entity> RunAsync(
IEnumerable<Entity> entities,
CancellationToken cancellationToken = default
)
{
this.Triggered = true;
return Array.Empty<Entity>().ToAsyncEnumerable();
}
}
}

View file

@ -12,6 +12,7 @@ public class NitrideTestContext : TestContext
protected override void ConfigureContainer(ContainerBuilder builder)
{
base.ConfigureContainer(builder);
builder.RegisterModule<NitrideModule>();
}
}