feat: switching pipelines to return IAsyncEnumerable<Entity> instead of Task<IEnumerable<Entity>>

This commit is contained in:
Dylan R. E. Moonfire 2022-07-09 17:21:38 -05:00
parent efb255aef2
commit 1e6a8eb5d8
5 changed files with 10 additions and 5 deletions

View file

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Gallium; using Gallium;
@ -44,7 +45,7 @@ public class CopyFilesPipeline : PipelineBase
} }
/// <inheritdoc /> /// <inheritdoc />
public override Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> _) public override IAsyncEnumerable<Entity> RunAsync(IEnumerable<Entity> _)
{ {
// We don't care about the incoming entities which means we can // We don't care about the incoming entities which means we can
// ignore them and use the entities from the ReadFiles operation // ignore them and use the entities from the ReadFiles operation
@ -103,6 +104,6 @@ public class CopyFilesPipeline : PipelineBase
// entities. Otherwise, we can just return an empty list. The // entities. Otherwise, we can just return an empty list. The
// pipeline is async, so it is wrapped in a task, but most // pipeline is async, so it is wrapped in a task, but most
// operations are not (or are both). // operations are not (or are both).
return Task.FromResult(entities); return entities.ToAsyncEnumerable();
} }
} }

View file

@ -29,6 +29,7 @@
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="SerilogAnalyzer" Version="0.15.0" /> <PackageReference Include="SerilogAnalyzer" Version="0.15.0" />
<PackageReference Include="System.CommandLine" Version="[2.0.0-beta3.22114.1,)" /> <PackageReference Include="System.CommandLine" Version="[2.0.0-beta3.22114.1,)" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="Zio" Version="0.15.0" /> <PackageReference Include="Zio" Version="0.15.0" />
</ItemGroup> </ItemGroup>

View file

@ -24,5 +24,5 @@ public interface IPipeline
/// </summary> /// </summary>
/// <param name="entities">The entities to process.</param> /// <param name="entities">The entities to process.</param>
/// <returns>The resulting entities after the process runs.</returns> /// <returns>The resulting entities after the process runs.</returns>
Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities); IAsyncEnumerable<Entity> RunAsync(IEnumerable<Entity> entities);
} }

View file

@ -31,7 +31,7 @@ public abstract class PipelineBase : IPipeline
} }
/// <inheritdoc /> /// <inheritdoc />
public abstract Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities); public abstract IAsyncEnumerable<Entity> RunAsync(IEnumerable<Entity> entities);
/// <inheritdoc /> /// <inheritdoc />
public override string ToString() public override string ToString()

View file

@ -268,7 +268,10 @@ public class PipelineRunner
private async Task RunPipeline(List<Entity> input) private async Task RunPipeline(List<Entity> input)
{ {
IEnumerable<Entity> output = await this.Pipeline.RunAsync(input); // Get the sequence of data, but this doesn't drain the enumeration.
List<Entity> output = await this.Pipeline
.RunAsync(input)
.ToListAsync();
// Gather all the output. // Gather all the output.
this.logger.Debug("{Pipeline:l}: Gathering output", this.Pipeline); this.logger.Debug("{Pipeline:l}: Gathering output", this.Pipeline);