-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
193 changed files
with
3,776 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
--- | ||
id: use-programmatically-create-adapters | ||
title: Adapters as Code | ||
sidebar_label: Adapters as Code | ||
--- | ||
|
||
In Apache StreamPipes, adapters are typically created via the web interface. | ||
However, it is possible to create adapters programmatically using a compact YAML format and the treamPipes REST API. | ||
Programmatic creation of adapters can be a useful feature in cases where many similar adapters have to be created. | ||
In this case, an externally managed YAMl file containing the full adapter configuration can be used and modified. | ||
|
||
## Introduction | ||
|
||
In StreamPipes, adapters are responsible for collecting data from various industrial data sources. | ||
While the UI simplifies this process, you can also manage adapters programmatically for better automation, integration with CI/CD pipelines, or infrastructure-as-code practices. | ||
|
||
This guide demonstrates how to define and create adapters using a YAML format and StreamPipes REST API, offering more flexibility to manage adapters programmatically. | ||
|
||
## Adapter YAML Structure | ||
|
||
An adapter in StreamPipes can be defined using a YAML file, which contains a compact description of the adapter configuration. | ||
The basic structure of the YAML format for an adapter definition includes: | ||
|
||
- **Name**: The name of the adapter. | ||
- **ID**: A unique ID for the adapter | ||
- **Description**: An additional description of the adapter | ||
- **AppID**: The adapter type | ||
- **Configuration**: Configuration details such as connection details, polling intervals, data format, and more | ||
- **Schema**: Schema refinements, e.g., additional label, description, semantic type and property scope | ||
- **Enrich**: Enrichment rules. Currently, only timestamp enrichment rules are supported in the YAML specification | ||
- **Transform**: Transformation rules. Currently, rename and measurement unit transforms are supported. | ||
- **CreateOptions**: Additional operations that are executed upon adapter generation. | ||
|
||
Here’s a sample structure to define an OPC-UA adapter: | ||
|
||
```yaml | ||
name: My OPC Adapter | ||
id: testadapter | ||
description: Test | ||
appId: org.apache.streampipes.connect.iiot.adapters.opcua | ||
configuration: | ||
- opc_host_or_url: OPC_URL | ||
opc_server_url: opc.tcp://localhost:62541/milo | ||
- adapter_type: PULL_MODE | ||
pulling_interval: 1000 | ||
- access_mode: UNAUTHENTICATED | ||
- available_nodes: | ||
- "ns=2;s=Leakage Test Station/temperature" | ||
- "ns=2;s=Leakage Test Station/pressure" | ||
|
||
schema: | ||
temperature: | ||
propertyScope: measurement | ||
label: Temp | ||
description: Temperature value | ||
semanticType: http://schema.org/temperature | ||
|
||
enrich: | ||
timestamp: timestamp | ||
|
||
transform: | ||
rename: | ||
pressure: pressure3 | ||
|
||
createOptions: | ||
persist: true | ||
start: true | ||
``` | ||
## Programmatically creating adapters | ||
Here is a walkthrough to create an adapter programmatically over the API: | ||
### Background: Internal adapter generation process | ||
The YAML definition file is a more compact notation of StreamPipes' internal data format to represent an adapter. | ||
When creating an adapter over the user interface, StreamPipes requires some basic, adapter-specific settings. | ||
Afterwards, a `Guess Schema` step is executed. In this step, StreamPipes connects to the underlying data sources, receives some samples of live data, determines the exact schema of the data stream and provides user with the source schema. | ||
This schema can be further refined using `Transformation Rules`. | ||
|
||
When an adapter is created using the more compact YAML notation, the same process is applied. Based on the provided configuration, the API connects to the given data source and determines the schema. | ||
The transformation rules provided in the YAMl definition are then applied on the original schema. | ||
Therefore, it is important that the provided schema refinement and transformation steps fit the original schema. | ||
|
||
### Getting a valid definition file | ||
|
||
The easiest way to create a valid YAML file is the user interface. Within the StreamPipes Connect view, it is possible to export the YAML definition for all existing adapters. | ||
In addition, the adapter generation wizard also offers the option to view the adapter configuration before creating the adapter by clicking the `Code` checkbox: | ||
|
||
<img className="docs-image" src="/img/03_use-programmatically-create-adapters/01_adapter-generation-code.png" alt="StreamPipes Adapter Code View"/> | ||
|
||
Another option is to open the adapter details view: | ||
|
||
<img className="docs-image" src="/img/03_use-programmatically-create-adapters/02_adapter-details-view-code.png" alt="StreamPipes Adapter Details Code View"/> | ||
|
||
You can copy this definition and modify it according to your needs. | ||
|
||
#### Configuration | ||
|
||
For each configuration option in the user interface, there is a mapping to the YAMl definition. | ||
|
||
A configuration value is a key/value pair, where the key corresponds to the internal name of the configuration and the value depends on the configuration type. | ||
|
||
#### Schema | ||
|
||
Schema definitions enhance the metadata of each field from the input stream. | ||
The following configurations are supported: | ||
|
||
* `label` to add an additional (human-readable) label to the field | ||
* `description` for an additional description | ||
* `propertyScope` to determine the type of the field (HEADER_PROPERTY, DIMENSION_PROPERTY or MEASUREMENT_PROPERTY) | ||
* `semanticType` to provide the semantic type of the field (e.g., `https://schema.org/temperature`) | ||
|
||
#### Enrich | ||
|
||
* `timestamp` defines that an additional field named timestamp is added to each incoming event containing the ingestion time as a UNIX timestamp. | ||
|
||
#### Transform | ||
|
||
Currently, the following transforms are supported: | ||
|
||
* `rename` defines renaming of individual fields from the input stream. A valid configuration consists of a key/value pair, where the key indicates the original field name and the value the target field name. | ||
* `measurementUnit` defines a value transformation between measurement units. A valid configuration consists of a key/value pair, where the key indicates the field name and the value the target measurement unit. | ||
|
||
#### CreateOptions | ||
|
||
Currently, two settings can be provided in the `CreateOptions` section: | ||
|
||
* `persist` indicates whether the data stream produced by the adapter should also be stored. In this case, a `Persist Pipeline` is automatically created. | ||
* `start` indicates whether the adapter should be immediately started after creation. | ||
|
||
### API | ||
|
||
To create a new adapter, call the StreamPipes API as follows: | ||
|
||
``` | ||
POST /streampipes-backend/api/v2/compact-adapters | ||
Content-type: application/yml | ||
Accept: application/yml | ||
``` | ||
|
||
You must provide valid credentials by either adding a Bearer token or an API key: | ||
|
||
``` | ||
X-API-USER: your username | ||
X-API-KEY: your api key | ||
``` | ||
|
||
The body of the request should contain the YAML definition. | ||
|
||
:::info | ||
It is also possible to provide the adapter specification as a JSON document. In this case, change the `Content-type` to `application/json`. | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
--- | ||
id: use-programmatically-create-pipelines | ||
title: Pipelines as Code | ||
sidebar_label: Pipelines as Code | ||
--- | ||
|
||
In Apache StreamPipes, pipelines represent the flow of data from sources (streams), through processors (filters, transformations, etc.), and finally to sinks (third-party-systems, storage, notifications). | ||
Traditionally, pipelines are created through the web-based user interface. | ||
However, they can also be defined programmatically as code, offering the flexibility to manage pipelines using Infrastructure as Code (IaC) practices. | ||
|
||
This guide explains how to define and create pipelines programmatically using a YAML structure. | ||
|
||
## Introduction | ||
|
||
Defining pipelines as code allows you to automate the creation, management, and deployment of StreamPipes pipelines. | ||
This is especially useful for managing multiple StreamPipes instances across environments. | ||
Pipelines are written in a YAML format (or alternatively as JSON) and can be deployed programmatically via the StreamPipes REST API. | ||
|
||
This guide provides an overview of how to structure pipeline definitions in YAML and deploy them using the API. | ||
|
||
## Pipeline YAML Structure | ||
|
||
A pipeline in YAML consists of several key sections: | ||
|
||
- **ID**: A unique identifier for the pipeline. | ||
- **Name and Description**: Optional fields to describe the pipeline. | ||
- **Pipeline Elements**: The components that make up the pipeline, including streams (data sources), processors (data transformations), and sinks (output destinations). | ||
- **Create Options**: Specifies how and when to start the pipeline (e.g., `start: false` means the pipeline won't start automatically). | ||
|
||
Here’s a high-level breakdown of the structure: | ||
|
||
```yaml | ||
id: my-pipeline | ||
name: "" | ||
description: "" | ||
pipelineElements: # Define pipeline components here | ||
- type: stream # Data source | ||
ref: <reference> # Unique reference ID | ||
id: <data-stream-id> # ID of the stream | ||
|
||
- type: processor # Data transformation | ||
ref: <reference> # Unique reference ID | ||
id: <processor-id> # ID of the processor | ||
connectedTo: # Previous pipeline element reference(s) | ||
- <reference> | ||
configuration: # Processor-specific configurations | ||
- <configuration-option> | ||
|
||
- type: sink # Data sink (output) | ||
ref: <reference> # Unique reference ID | ||
id: <sink-id> # ID of the sink | ||
connectedTo: # Previous pipeline element reference(s) | ||
- <reference> | ||
configuration: # Sink-specific configurations | ||
- <configuration-option> | ||
|
||
createOptions: | ||
start: <true|false> # Whether to start the pipeline immediately | ||
``` | ||
## Pipeline Elements | ||
### Building blocks | ||
The key building blocks of a pipeline include: | ||
#### Stream | ||
A stream represents a data source in the pipeline, such as a sensor feed, API, or message queue. It is referenced by a unique ID that identifies the data stream. | ||
#### Processor | ||
A processor transforms, filters, or enriches the data coming from a stream or another processor. Each processor has configuration parameters that control its behavior, such as filtering criteria or mapping options. | ||
#### Sink | ||
A sink sends the processed data to a final destination, such as a database, file storage, or another service. Sinks may also have configuration options that specify where and how the data should be sent. | ||
A pipeline element is selected by providing its ID. For processors and sinks, the ID refers to the `appId` of the pipeline element, e.g., `org.apache.streampipes.processors.filters.jvm.numericalfilter`. | ||
For data streams, the ID refers to the `elementId` of the data stream. | ||
|
||
To define connections between pipeline elements, the `ref` and `connectedTo` fields can be used. | ||
`ref` can be a short string (e.g., `stream01` or `processor01`) which will be used as an internal identifier of the pipeline element. | ||
Within the `connectedTo` list, connections to other pipeline elements can be defined. | ||
Each item of the list should relate to an existing `ref`. | ||
|
||
### Configuration | ||
|
||
In the `configuration` section, which only applies for data processors and sinks, the pipeline element configuration can be applied. | ||
The configuration options depend on the pipeline element and have the same structure as the adapter configuration (see [Adapters as Code](use-programmatically-create-adapters)) | ||
The easiest way to determine a valid configuration is the web interface. | ||
|
||
After creating a pipeline in the web interface and clicking on `Save pipeline`, the option `Show pipeline configuration as code` shows the current pipeline configuration in YAML or JSON format: | ||
|
||
<img className="docs-image" src="/img/03_use-programmatically-create-pipelines/01_pipeline-editor-pipeline-as-code.png" alt="StreamPipes Pipeline Editor Code View"/> | ||
|
||
Another option is to view the pipeline details for an existing pipeline. Here, the YAMl definition of the pipeline can be viewed by clicking the `View pipeline as code` button: | ||
|
||
<img className="docs-image" src="/img/03_use-programmatically-create-pipelines/02_pipeline-details-pipeline-as-code.png" alt="StreamPipes Pipeline Editor Code View"/> | ||
|
||
|
||
## Example pipeline as Code | ||
|
||
Here's an example of a pipeline written in YAML format: | ||
|
||
```yaml | ||
id: my-pipeline | ||
name: "Density Filter Pipeline" | ||
description: "A pipeline that filters data based on the density and stores it in a data lake." | ||
pipelineElements: | ||
- type: stream | ||
ref: stream01 | ||
id: sp:spdatastream:GWWzMD | ||
- type: processor | ||
ref: processor01 | ||
id: org.apache.streampipes.processors.filters.jvm.numericalfilter | ||
connectedTo: | ||
- stream01 | ||
configuration: | ||
- number-mapping: s0::density | ||
- operation: < | ||
- value: "12" | ||
- type: sink | ||
ref: sink01 | ||
id: org.apache.streampipes.sinks.internal.jvm.datalake | ||
connectedTo: | ||
- processor01 | ||
configuration: | ||
- timestamp_mapping: s0::timestamp | ||
- db_measurement: my-measurement | ||
- schema_update: Update schema | ||
- dimensions_selection: | ||
- sensorId | ||
- ignore_duplicates: false | ||
createOptions: | ||
start: false | ||
``` | ||
|
||
Stream: The pipeline begins with a data stream (sp:spdatastream:GWWzMD) referenced by stream01. This is the source of the data. | ||
|
||
Processor: The data is passed through a numerical filter processor (org.apache.streampipes.processors.filters.jvm.numericalfilter) which checks if the field s0::density is less than 12. The filter is connected to the stream via reference stream01. | ||
|
||
Sink: The filtered data is then sent to a data lake (org.apache.streampipes.sinks.internal.jvm.datalake). The sink is configured with several parameters including the mapping of the timestamp (s0::timestamp) and schema update options. The sink is connected to the processor via reference processor01. | ||
|
||
Create Options: The pipeline is set to not start automatically (start: false). | ||
|
||
## API | ||
|
||
To create a new pipeline, call the StreamPipes API as follows: | ||
|
||
``` | ||
POST /streampipes-backend/api/v2/compact-pipelines | ||
Content-type: application/yml | ||
Accept: application/yml | ||
``` | ||
|
||
You must provide valid credentials by either adding a Bearer token or an API key: | ||
|
||
``` | ||
X-API-USER: your username | ||
X-API-KEY: your api key | ||
``` | ||
|
||
The body of the request should contain the YAML definition. | ||
|
||
:::info | ||
It is also possible to provide the pipeline specification as a JSON document. In this case, change the `Content-type` to `application/json`. |
Oops, something went wrong.