diff --git a/examples/CopyFiles/CopyFilesPipeline.cs b/examples/CopyFiles/CopyFilesPipeline.cs index 7ca4434..966e0fe 100644 --- a/examples/CopyFiles/CopyFilesPipeline.cs +++ b/examples/CopyFiles/CopyFilesPipeline.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Gallium; @@ -44,7 +45,7 @@ public class CopyFilesPipeline : PipelineBase } /// - public override Task> RunAsync(IEnumerable _) + public override IAsyncEnumerable RunAsync(IEnumerable _) { // We don't care about the incoming entities which means we can // ignore them and use the entities from the ReadFiles operation @@ -103,6 +104,6 @@ public class CopyFilesPipeline : PipelineBase // entities. Otherwise, we can just return an empty list. The // pipeline is async, so it is wrapped in a task, but most // operations are not (or are both). - return Task.FromResult(entities); + return entities.ToAsyncEnumerable(); } } diff --git a/src/Nitride/Nitride.csproj b/src/Nitride/Nitride.csproj index 5ca6b1b..da06d39 100644 --- a/src/Nitride/Nitride.csproj +++ b/src/Nitride/Nitride.csproj @@ -29,6 +29,7 @@ + diff --git a/src/Nitride/Pipelines/IPipeline.cs b/src/Nitride/Pipelines/IPipeline.cs index 5255b86..e405dd2 100644 --- a/src/Nitride/Pipelines/IPipeline.cs +++ b/src/Nitride/Pipelines/IPipeline.cs @@ -24,5 +24,5 @@ public interface IPipeline /// /// The entities to process. /// The resulting entities after the process runs. - Task> RunAsync(IEnumerable entities); + IAsyncEnumerable RunAsync(IEnumerable entities); } diff --git a/src/Nitride/Pipelines/PipelineBase.cs b/src/Nitride/Pipelines/PipelineBase.cs index b3e6d28..18f7aa5 100644 --- a/src/Nitride/Pipelines/PipelineBase.cs +++ b/src/Nitride/Pipelines/PipelineBase.cs @@ -31,7 +31,7 @@ public abstract class PipelineBase : IPipeline } /// - public abstract Task> RunAsync(IEnumerable entities); + public abstract IAsyncEnumerable RunAsync(IEnumerable entities); /// public override string ToString() diff --git a/src/Nitride/Pipelines/PipelineRunner.cs b/src/Nitride/Pipelines/PipelineRunner.cs index 4c77abf..7abbbd8 100644 --- a/src/Nitride/Pipelines/PipelineRunner.cs +++ b/src/Nitride/Pipelines/PipelineRunner.cs @@ -268,7 +268,10 @@ public class PipelineRunner private async Task RunPipeline(List input) { - IEnumerable output = await this.Pipeline.RunAsync(input); + // Get the sequence of data, but this doesn't drain the enumeration. + List output = await this.Pipeline + .RunAsync(input) + .ToListAsync(); // Gather all the output. this.logger.Debug("{Pipeline:l}: Gathering output", this.Pipeline);