2021-09-07 05:15:45 +00:00
|
|
|
# Pipelines
|
|
|
|
|
|
|
|
A pipeline is a set of operations that take an input of entities, performs
|
|
|
|
various operations and transformations on them, and then returns a resulting
|
|
|
|
list of entities which may or may not be the original ones. For pipelines that
|
|
|
|
read files from the disk, the input may be empty. The output from the final
|
|
|
|
pipelines may never be used.
|
|
|
|
|
|
|
|
A simple pipeline may:
|
|
|
|
|
|
|
|
1. Load all the Markdown files to a disk.
|
|
|
|
2. Convert the files into HTML.
|
|
|
|
3. Write them out to a different location.
|
|
|
|
|
|
|
|
A pipeline implements the `IPipeline` interface or extends the convenience
|
|
|
|
class `Pipeline`.
|
|
|
|
|
|
|
|
```c#
|
|
|
|
using Nitride.Pipelines;
|
|
|
|
|
|
|
|
public class LoadFilesPipeline : Pipeline {}
|
|
|
|
```
|
|
|
|
|
|
|
|
## Dependencies
|
|
|
|
|
|
|
|
Pipelines can be chained from another pipeline or from multiple pipelines. This
|
|
|
|
chaining may be for organization purposes or the output from one pipeline may
|
|
|
|
end up being fed into multiple pipelines. A pipeline may have 0-n pipelines it
|
|
|
|
depends on.
|
|
|
|
|
|
|
|
```c#
|
|
|
|
var loadPipeline = new Pipeline();
|
|
|
|
var htmlPipeline = new Pipeline().AddDependency(loadPipeline);
|
|
|
|
var geminiPipeline = new GeminiPipeline(loadPipeline);
|
|
|
|
|
|
|
|
public class GeminiPipeline : Pipeline
|
|
|
|
{
|
|
|
|
public GeminiPipeline(Pipeline loadPipeline)
|
|
|
|
{
|
|
|
|
this.AddDependency(loadPipeline);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
Since Nitride is built around dependency injection, the constructor is the place
|
|
|
|
where the dependencies are typically set up.
|
|
|
|
|
|
|
|
```c#
|
|
|
|
ContainerBuilder builder;
|
|
|
|
|
|
|
|
builder.RegisterInstance<LoadPipeline>().AsSelf();
|
|
|
|
builder.RegisterInstance<GeminiPipline>().AsSelf();
|
|
|
|
|
2022-06-05 18:44:51 +00:00
|
|
|
IContainer container = builder;
|
2021-09-07 05:15:45 +00:00
|
|
|
|
|
|
|
var loadPipeline = new Pipeline();
|
|
|
|
var geminiPipeline = container.Resolve<GeminiPipeline>();
|
|
|
|
|
|
|
|
public class GeminiPipeline : Pipeline
|
|
|
|
{
|
|
|
|
public GeminiPipeline(LoadPipeline loadPipeline)
|
|
|
|
{
|
|
|
|
this.AddDependency(loadPipeline);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
With the dependencies, pipelines are assumed to be directional, acyclic graph
|
|
|
|
(DAG). This means that there are no circular dependencies and the code can
|
|
|
|
determine the "entry" pipelines that will feed into the others until the
|
|
|
|
processes are complete.
|
|
|
|
|
|
|
|
The DAG is also used to determine what is rebuilt during development. Only
|
|
|
|
pipelines affected (including dependencies) by a change will be re-run to keep
|
|
|
|
the process running as quickly as possible.
|
|
|
|
|
|
|
|
### Processing
|
|
|
|
|
|
|
|
The primary purpose of the pipeline is to process the entites. This method is
|
|
|
|
rather simple:
|
|
|
|
|
|
|
|
```c#
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(
|
|
|
|
IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return entities;
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
(For purposes of this document, `async` is added to all the methods even if the
|
|
|
|
example would be better suited without it and using `Task.FromResult()`
|
|
|
|
instead.)
|
|
|
|
|
|
|
|
### Identifying Source Pipeline
|
|
|
|
|
|
|
|
Because of how entities are passed around, a pipeline that depends on two other
|
|
|
|
ones may have the same entity in both streams. Without any alteration, this will
|
|
|
|
result in duplicate entities with no ability to determine which entity came from
|
|
|
|
which pipeline.
|
|
|
|
|
|
|
|
To mark an entity as being processed by a pipeline, a simple component can be
|
|
|
|
added as needed using the `AddComponents` extension method on the collection to
|
|
|
|
add the pipeline or some other indicator.
|
|
|
|
|
|
|
|
```c#
|
|
|
|
public class ExamplePipeline : Pipeline
|
|
|
|
{
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return entities
|
|
|
|
.AddComponents(this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
With the entity identified with the pipeline as a component, special processing
|
|
|
|
can be easily used. In addition, the `MergeEntities` can be used to pull
|
|
|
|
entities of the same identifier together.
|
|
|
|
|
|
|
|
```c#
|
|
|
|
public class Pipeline1 : Pipeline
|
|
|
|
{
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return LoadFileEntities();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public class Pipeline2 : Pipeline
|
|
|
|
{
|
|
|
|
public Pipeline2(Pipeline1 pipeline)
|
|
|
|
{
|
|
|
|
this.AddDependency(pipeline);
|
|
|
|
}
|
|
|
|
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return entities
|
|
|
|
.AddComponents(this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public class Pipeline3 : Pipeline
|
|
|
|
{
|
|
|
|
public Pipeline3(Pipeline1 pipeline)
|
|
|
|
{
|
|
|
|
this.AddDependency(pipeline);
|
|
|
|
}
|
|
|
|
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return entities
|
|
|
|
.AddComponents(this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public class Pipeline4 : Pipeline
|
|
|
|
{
|
|
|
|
public Pipeline4(Pipeline2 pipeline2, Pipeline3 pipeline3)
|
|
|
|
{
|
|
|
|
this.AddDependency(pipeline2);
|
|
|
|
this.AddDependency(pipeline3);
|
|
|
|
}
|
|
|
|
|
|
|
|
public async Task<IEnumerable<Entity>> RunAsync(IEnumerable<Entity> entities)
|
|
|
|
{
|
|
|
|
return entities
|
|
|
|
.ForComponents<Pipeline2>((entity, pipeline2) => ...)
|
|
|
|
.ForComponents<Pipeline3>((entity, pipeline3) => ...)
|
|
|
|
.MergeComponents<Pipeline2, Pipeline3>(
|
|
|
|
(entity1, entity2) => entity1.Merge(entity2));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
## Concurrency
|
|
|
|
|
|
|
|
Pipelines will run in multiple threads, but will wait on their dependencies.
|
|
|
|
This means two pipelines with separate inputs will run at the same time, but a
|
|
|
|
pipeline that depends on one or more other pipelines will wait until their
|
|
|
|
dependencies have finished running before starting.
|
|
|
|
|
|
|
|
Inside a pipeline, the code determines what operations are done concurrently.
|
|
|
|
Using PLINQ or various await methods can be used in this case. However, even if
|
|
|
|
a pipeline is synchronized, the process is always treated as async and will be
|
|
|
|
awaited.
|
|
|
|
|
|
|
|
## Inspiration
|
|
|
|
|
|
|
|
The concept of a pipeline has been inspired by both Statiq and also work done on
|
|
|
|
CobblestoneJS (the gather, prepare, and process approach).
|