The pipeline pattern receives a data stream that is processed in several stages. Each stage processes the data produced by the previous stage and delivers its result to the next stage.
The interface to the pipeline pattern is provided by function
. As all functions in GrPPI, this function takes as its
first argument an execution policy.
grppi::pipeline(exec, other_arguments...);
Additionally, there is a variant where the execution policy may be omitted. This variant is meant to be used in pipeline nesting.
There are several variants:
- Standalone pipeline: Is a top level pipeline. Invoking the algorithm runs the pipeline.
- Composable pipeline: Builds a pipeline object that can be later inserted into another pattern.
The key elements in a pipeline are the Generator producing data items and the Transformer stages.
A Generator is any C++ callable entity that takes zero arguments and
produces data items from a given type. Thus, a Generator gen
is any
operation that, given an output type U
, makes valid the following:
U res{gen()};
if (res) { /* ... */ } // If res holds a value
if (!res) { /* ... */ } // If res does not hold a value
auto value = * res; // Get value held by res
A Transformer is any C++ callable entity that takes a data item and
transforms it. The input type and the output type may differ. Thus, a
transformer op
is any operation that, given an input x
of type T
and output type
, makes valid the following:
U res {op(x)};
A standalone pipeline generates data from a source and passes the output to the first stage that applies a transformation to each data item. The resulting items are passed to the next stage and so on.
Consequently, a pipeline with a Generator gen
and N
stages (s1, s2, ..., sN
) performs the following computation:
Note: Each stage may run concurrently with other stages. However, there are dependencies between stages, so that every item passes sequentially across stages.
Example: Generate a sequence of integers, apply consecutive transformations (from int to double and from double to string), and write to standard output.
int n = 100;
[n]() -> optional<double> {
static int x = 0;
if (x<n) return x++;
else return {};
[](double x) { return x*x; },
[](double x) { return 1/x; },
[](double x) { cout << x << endl; }
A composable pipeline returns a representation of the pipeline that can be used to perform declarative composition of streaming patterns.
A composable pipeline does not take an execution policy as it inherits the execution policy from its enclosing pattern.
Example: Use a farm to read intenger from a file and write to another file strings. The transformation phase is a pipeline that performs transformation in two stages.
[&input]() -> optional<int> {
int n;
input >> n;
if (!input) return {};
else return n;
[](int x) -> double { return func1(x); },
[](double x) -> string { return func2(x); },
[&output](string s) {
output << s << endl;
This composable pipeline can also be used to build complex composed patterns in a non-declarative way.
Example: Piecewise construction of a farm of pipelines.
auto reader = [&input]() -> optional<int> {
int n;
input >> n;
if (!input) return {};
else return n;
auto processor = grppi::pipeline(
[](int x) -> double { return func1(x); },
[](double x) -> string { return func2(x); },
auto writer = [&output](string s) {
output << s << "\n";
grppi::farm(ex1, reader, processor, writer);