Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing adapters, as pipeline and adapter as code sections #202

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions docs/03_use-programmatically-create-adapters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
---
id: use-programmatically-create-adapters
title: Adapters as Code
sidebar_label: Adapters as Code
---

In Apache StreamPipes, adapters are typically created via the web interface.
However, it is possible to create adapters programmatically using a compact YAML format and the treamPipes REST API.
Programmatic creation of adapters can be a useful feature in cases where many similar adapters have to be created.
In this case, an externally managed YAMl file containing the full adapter configuration can be used and modified.

## Introduction

In StreamPipes, adapters are responsible for collecting data from various industrial data sources.
While the UI simplifies this process, you can also manage adapters programmatically for better automation, integration with CI/CD pipelines, or infrastructure-as-code practices.

This guide demonstrates how to define and create adapters using a YAML format and StreamPipes REST API, offering more flexibility to manage adapters programmatically.

## Adapter YAML Structure

An adapter in StreamPipes can be defined using a YAML file, which contains a compact description of the adapter configuration.
The basic structure of the YAML format for an adapter definition includes:

- **Name**: The name of the adapter.
- **ID**: A unique ID for the adapter
- **Description**: An additional description of the adapter
- **AppID**: The adapter type
- **Configuration**: Configuration details such as connection details, polling intervals, data format, and more
- **Schema**: Schema refinements, e.g., additional label, description, semantic type and property scope
- **Enrich**: Enrichment rules. Currently, only timestamp enrichment rules are supported in the YAML specification
- **Transform**: Transformation rules. Currently, rename and measurement unit transforms are supported.
- **CreateOptions**: Additional operations that are executed upon adapter generation.

Here’s a sample structure to define an OPC-UA adapter:

```yaml
name: My OPC Adapter
id: testadapter
description: Test
appId: org.apache.streampipes.connect.iiot.adapters.opcua
configuration:
- opc_host_or_url: OPC_URL
opc_server_url: opc.tcp://localhost:62541/milo
- adapter_type: PULL_MODE
pulling_interval: 1000
- access_mode: UNAUTHENTICATED
- available_nodes:
- "ns=2;s=Leakage Test Station/temperature"
- "ns=2;s=Leakage Test Station/pressure"

schema:
temperature:
propertyScope: measurement
label: Temp
description: Temperature value
semanticType: http://schema.org/temperature

enrich:
timestamp: timestamp

transform:
rename:
pressure: pressure3

createOptions:
persist: true
start: true
```

## Programmatically creating adapters

Here is a walkthrough to create an adapter programmatically over the API:

### Background: Internal adapter generation process

The YAML definition file is a more compact notation of StreamPipes' internal data format to represent an adapter.
When creating an adapter over the user interface, StreamPipes requires some basic, adapter-specific settings.
Afterwards, a `Guess Schema` step is executed. In this step, StreamPipes connects to the underlying data sources, receives some samples of live data, determines the exact schema of the data stream and provides user with the source schema.
This schema can be further refined using `Transformation Rules`.

When an adapter is created using the more compact YAML notation, the same process is applied. Based on the provided configuration, the API connects to the given data source and determines the schema.
The transformation rules provided in the YAMl definition are then applied on the original schema.
Therefore, it is important that the provided schema refinement and transformation steps fit the original schema.

### Getting a valid definition file

The easiest way to create a valid YAML file is the user interface. Within the StreamPipes Connect view, it is possible to export the YAML definition for all existing adapters.
In addition, the adapter generation wizard also offers the option to view the adapter configuration before creating the adapter by clicking the `Code` checkbox:

<img className="docs-image" src="/img/03_use-programmatically-create-adapters/01_adapter-generation-code.png" alt="StreamPipes Adapter Code View"/>

Another option is to open the adapter details view:

<img className="docs-image" src="/img/03_use-programmatically-create-adapters/02_adapter-details-view-code.png" alt="StreamPipes Adapter Details Code View"/>

You can copy this definition and modify it according to your needs.

#### Configuration

For each configuration option in the user interface, there is a mapping to the YAMl definition.

A configuration value is a key/value pair, where the key corresponds to the internal name of the configuration and the value depends on the configuration type.

#### Schema

Schema definitions enhance the metadata of each field from the input stream.
The following configurations are supported:

* `label` to add an additional (human-readable) label to the field
* `description` for an additional description
* `propertyScope` to determine the type of the field (HEADER_PROPERTY, DIMENSION_PROPERTY or MEASUREMENT_PROPERTY)
* `semanticType` to provide the semantic type of the field (e.g., `https://schema.org/temperature`)

#### Enrich

* `timestamp` defines that an additional field named timestamp is added to each incoming event containing the ingestion time as a UNIX timestamp.

#### Transform

Currently, the following transforms are supported:

* `rename` defines renaming of individual fields from the input stream. A valid configuration consists of a key/value pair, where the key indicates the original field name and the value the target field name.
* `measurementUnit` defines a value transformation between measurement units. A valid configuration consists of a key/value pair, where the key indicates the field name and the value the target measurement unit.

#### CreateOptions

Currently, two settings can be provided in the `CreateOptions` section:

* `persist` indicates whether the data stream produced by the adapter should also be stored. In this case, a `Persist Pipeline` is automatically created.
* `start` indicates whether the adapter should be immediately started after creation.

### API

To create a new adapter, call the StreamPipes API as follows:

```
POST /streampipes-backend/api/v2/compact-adapters
Content-type: application/yml
Accept: application/yml
```

You must provide valid credentials by either adding a Bearer token or an API key:

```
X-API-USER: your username
X-API-KEY: your api key
```

The body of the request should contain the YAML definition.

:::info
It is also possible to provide the adapter specification as a JSON document. In this case, change the `Content-type` to `application/json`.


164 changes: 164 additions & 0 deletions docs/03_use-programmatically-create-pipelines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
---
id: use-programmatically-create-pipelines
title: Pipelines as Code
sidebar_label: Pipelines as Code
---

In Apache StreamPipes, pipelines represent the flow of data from sources (streams), through processors (filters, transformations, etc.), and finally to sinks (third-party-systems, storage, notifications).
Traditionally, pipelines are created through the web-based user interface.
However, they can also be defined programmatically as code, offering the flexibility to manage pipelines using Infrastructure as Code (IaC) practices.

This guide explains how to define and create pipelines programmatically using a YAML structure.

## Introduction

Defining pipelines as code allows you to automate the creation, management, and deployment of StreamPipes pipelines.
This is especially useful for managing multiple StreamPipes instances across environments.
Pipelines are written in a YAML format (or alternatively as JSON) and can be deployed programmatically via the StreamPipes REST API.

This guide provides an overview of how to structure pipeline definitions in YAML and deploy them using the API.

## Pipeline YAML Structure

A pipeline in YAML consists of several key sections:

- **ID**: A unique identifier for the pipeline.
- **Name and Description**: Optional fields to describe the pipeline.
- **Pipeline Elements**: The components that make up the pipeline, including streams (data sources), processors (data transformations), and sinks (output destinations).
- **Create Options**: Specifies how and when to start the pipeline (e.g., `start: false` means the pipeline won't start automatically).

Here’s a high-level breakdown of the structure:

```yaml
id: my-pipeline
name: ""
description: ""
pipelineElements: # Define pipeline components here
- type: stream # Data source
ref: <reference> # Unique reference ID
id: <data-stream-id> # ID of the stream

- type: processor # Data transformation
ref: <reference> # Unique reference ID
id: <processor-id> # ID of the processor
connectedTo: # Previous pipeline element reference(s)
- <reference>
configuration: # Processor-specific configurations
- <configuration-option>

- type: sink # Data sink (output)
ref: <reference> # Unique reference ID
id: <sink-id> # ID of the sink
connectedTo: # Previous pipeline element reference(s)
- <reference>
configuration: # Sink-specific configurations
- <configuration-option>

createOptions:
start: <true|false> # Whether to start the pipeline immediately
```

## Pipeline Elements

### Building blocks

The key building blocks of a pipeline include:

#### Stream
A stream represents a data source in the pipeline, such as a sensor feed, API, or message queue. It is referenced by a unique ID that identifies the data stream.

#### Processor
A processor transforms, filters, or enriches the data coming from a stream or another processor. Each processor has configuration parameters that control its behavior, such as filtering criteria or mapping options.

#### Sink
A sink sends the processed data to a final destination, such as a database, file storage, or another service. Sinks may also have configuration options that specify where and how the data should be sent.

A pipeline element is selected by providing its ID. For processors and sinks, the ID refers to the `appId` of the pipeline element, e.g., `org.apache.streampipes.processors.filters.jvm.numericalfilter`.
For data streams, the ID refers to the `elementId` of the data stream.

To define connections between pipeline elements, the `ref` and `connectedTo` fields can be used.
`ref` can be a short string (e.g., `stream01` or `processor01`) which will be used as an internal identifier of the pipeline element.
Within the `connectedTo` list, connections to other pipeline elements can be defined.
Each item of the list should relate to an existing `ref`.

### Configuration

In the `configuration` section, which only applies for data processors and sinks, the pipeline element configuration can be applied.
The configuration options depend on the pipeline element and have the same structure as the adapter configuration (see [Adapters as Code](use-programmatically-create-adapters))
The easiest way to determine a valid configuration is the web interface.

After creating a pipeline in the web interface and clicking on `Save pipeline`, the option `Show pipeline configuration as code` shows the current pipeline configuration in YAML or JSON format:

<img className="docs-image" src="/img/03_use-programmatically-create-pipelines/01_pipeline-editor-pipeline-as-code.png" alt="StreamPipes Pipeline Editor Code View"/>

Another option is to view the pipeline details for an existing pipeline. Here, the YAMl definition of the pipeline can be viewed by clicking the `View pipeline as code` button:

<img className="docs-image" src="/img/03_use-programmatically-create-pipelines/02_pipeline-details-pipeline-as-code.png" alt="StreamPipes Pipeline Editor Code View"/>


## Example pipeline as Code

Here's an example of a pipeline written in YAML format:

```yaml
id: my-pipeline
name: "Density Filter Pipeline"
description: "A pipeline that filters data based on the density and stores it in a data lake."
pipelineElements:
- type: stream
ref: stream01
id: sp:spdatastream:GWWzMD
- type: processor
ref: processor01
id: org.apache.streampipes.processors.filters.jvm.numericalfilter
connectedTo:
- stream01
configuration:
- number-mapping: s0::density
- operation: <
- value: "12"
- type: sink
ref: sink01
id: org.apache.streampipes.sinks.internal.jvm.datalake
connectedTo:
- processor01
configuration:
- timestamp_mapping: s0::timestamp
- db_measurement: my-measurement
- schema_update: Update schema
- dimensions_selection:
- sensorId
- ignore_duplicates: false
createOptions:
start: false
```

Stream: The pipeline begins with a data stream (sp:spdatastream:GWWzMD) referenced by stream01. This is the source of the data.

Processor: The data is passed through a numerical filter processor (org.apache.streampipes.processors.filters.jvm.numericalfilter) which checks if the field s0::density is less than 12. The filter is connected to the stream via reference stream01.

Sink: The filtered data is then sent to a data lake (org.apache.streampipes.sinks.internal.jvm.datalake). The sink is configured with several parameters including the mapping of the timestamp (s0::timestamp) and schema update options. The sink is connected to the processor via reference processor01.

Create Options: The pipeline is set to not start automatically (start: false).

## API

To create a new pipeline, call the StreamPipes API as follows:

```
POST /streampipes-backend/api/v2/compact-pipelines
Content-type: application/yml
Accept: application/yml
```

You must provide valid credentials by either adding a Bearer token or an API key:

```
X-API-USER: your username
X-API-KEY: your api key
```

The body of the request should contain the YAML definition.

:::info
It is also possible to provide the pipeline specification as a JSON document. In this case, change the `Content-type` to `application/json`.
Loading
Loading