Skip to content

Getting started with a new Data Collector development

Clemens Zagler edited this page Mar 11, 2024 · 7 revisions

Step 1: Install Docker and Docker Compose

The recommended way to test and develop data collectors for the Open Data Hub writer API is to use Docker.

You need to install two packages:

Step 2: Install the Open Data Hub Writer API

Step 3: Install the Helloworld data collector

  • Go to https://github.com/noi-techpark/bdp-commons
  • git clone that repository to your local machine
  • cd data-collectors/helloworld
  • Try it out, if you like:
    • cp .env.example .env
    • docker-compose up
  • Eventually connect to the DB to see what came in, or have a look at the logs

Step 4: Clone the Helloworld data collector to get started with your own development

  1. Read and follow our Getting Started guidelines
  2. Copy/paste the helloworld example in a new folder under data-collectors, choose the name of your data collector for that folder
  3. Find TODO comments and follow their instructions
  4. See and alter code inside SyncScheduler.java
  5. Start the writer API locally and test everything:
    • Writer API with Docker
    • Start your data collector
    • Check log outputs of the writer and the data collector to find issues
    • Connect to the DB, and see what is their after some tests
  6. Create a pull request as described in the guidelines above

Generic API doc with curl examples

If you don't use Java you can still push data to the Open Data Hub trough it's JSON API. Here some generic curl examples on how to do that:

Authentication

We use Keycloak to authenticate. That service provides an access_token that can be used to send POST requests to the writer. See the Open Data Hub Authentication / Quick Howto for further details.

curl -X POST -L "https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token" \
    --header 'Content-Type: application/x-www-form-urlencoded' \
    --data-urlencode 'grant_type=client_credentials' \
    --data-urlencode 'client_id=odh-mobility-datacollector-development' \
    --data-urlencode 'client_secret=7bd46f8f-c296-416d-a13d-dc81e68d0830'

With this call you get an access_token that can then be used as follows in all writer API methods. Here just an example to get all stations:

curl -X GET "http://localhost:8999/json/stations" \
    --header 'Content-Type: application/json' \
    --header 'Authorization: bearer your-access-token'

You should get an empty JSON list as result.

Write an email to [email protected], if you want to get the client_secret and an Open Data Hub OAuth2 account for a non-development setup.

Set provenance

The provenance is the combined by the following values, so that later on we can differ from which data collector and which version the data came from. The provenance need to be set only once on program startup.

  • uuid: is always null
  • dataCollector: name of the data collector itself like 'odh-mobility-dc-flighdata-skyalps'
  • dataCollectorVersion: latest git commit sha of deployed version to server or version number of the data collector like 'zh7shz' or '1.0.7'
  • lineage: name of the origin of the data like 'Skyalps'
curl -X POST "https://share.opendatahub.testingmachine.eu/json/provenance"
     --header 'Content-Type: application/json'
     --header 'Authorization: bearer your-access-token'
     -d '{"uuid":null,"dataCollector":"odh-docs-example","dataCollectorVersion":"docs-example-version","lineage":"docs-example"}'

You should get the provence uuid as response like "jxkjBCqd". Save that uuid because you'll need it to push the data.

Sync data types

Every data entry can have its own datatype and if they do not already exist in the Open Data Hub, you need to create a new data type. If there are no new data types after the application started, they need to be set only once on program startup. With the following call you can create mutiple datatypes with by sending a json list with hte following values:

  • name: Unique name (ex., air-temperature)
  • descirption: (optional) Description of the data type (ex., Air temperature)
  • period: (optional) Interval on how often a measurement with this data type gets collected (seconds)
  • unit: (optional) Unit of the data type (ex., °C)
  • rtype: (optional) Metric of the data type (ex., mean or instantaneous)
  • metadata: (optional) Further detail information about given datatype as JSON
curl -X POST -L "https://share.opendatahub.testingmachine.eu/json/syncDataTypes" \
    --header 'Content-Type: application/json' \
    --header 'Authorization: bearer your-access-token' \
    -d '[
          {
            "name":"docs-example-type",
            "unit":"odh-docs-unit",
            "rtype":"docs-example-rtype",
            "description":"docs-example description",
            "period": 600,
            "metadata": 
              {
                "extra":"fields"
              }
          },
          {
            "name":"docs-example-type-2",
            "unit":"odh-docs-unit",
            "rtype":"docs-example-rtype",
            "description":"docs-example description",
            "period": 300,
            "metadata": 
              {
                "even-more":"fields"
              }
          }
        ]'

You should get a 201 response code if everything is ok.

Sync stations

To create new stations or synchronize their properties with the Open Data Hub you need to make the following request. The data needs to be a list with Station objects with the following fields:

  • id: A unique id for the station
  • name: A unique name for the station, can be also the id
  • origin: The origin from where the data comes from
  • latitude: (optional) GPS latitude value like "46.454"
  • longitude: (optional) GPS longitude value like "11.254"
  • municipality: (optional) Name of the municipality the station is in
  • metaData: (optional) Some further information to the station as JSON object
curl -X POST -L "https://share.opendatahub.testingmachine.eu/json/syncStations/ExampleStation" \
    --header 'Content-Type: application/json' \
    --header 'Authorization: bearer your-access-token' \
    -d '[
          {
            "id": "example-station-id-1",
            "name": "example-station-name-1",
            "origin": "docs-example",
            "latitude": 46.333,
            "longitude": 11.356,
            "municipality": "Bolzano", 
            "metaData" : {
              "additional": "fields"
            }
          },
          {
            "id": "example-station-id-2",
            "name": "example-station-name-2",
            "origin": "docs-example",
            "latitude": 46.333,
            "longitude": 11.356,
            "municipality": "Bolzano", 
            "metaData" : {
              "additional": "fields"
            }
          }
      ]'

Push data

After having created the provenance, datatype and stations you can finally push the data to the Open Data Hub. The root of the data JSON is this:

{
  "name": "(default)",
  "branch": {},
  "data": [],
  "provenance": ""
}

Then every branch can have multiple station branches that then can have multiple data-type branches, that then has the data filed filled with data records.

{
  "name": "(default)",
  "branch": {
    "example-station-id-1": {
      "name": "(default)",
      "branch": {
        "docs-example-type": {
          "name": "(default)",
          "branch": {},
          "data": [
            {
              "timestamp": 1668522653150,
              "value": 1234,
              "period": 100,
              "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
            }
          ]
        }
      },
      "data": []
    }
  },
  "data": [],
  "provenance": "jxkjBCqd"
}

Here the curl example sending 6 data records for 2 different stations and 2 different data types:

curl -X POST -L "https://share.opendatahub.testingmachine.eu/json/pushData" \
    --header 'Content-Type: application/json' \
    --header 'Authorization: bearer your-access-token' \
    -d '{
          "name": "(default)",
          "branch": {
            "example-station-id-1": {
              "name": "(default)",
              "branch": {
                "docs-example-type": {
                  "name": "(default)",
                  "branch": {},
                  "data": [
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    },
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    }
                  ]
                },
                "docs-example-type-2": {
                  "name": "(default)",
                  "branch": {},
                  "data": [
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    },
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    }
                  ]
                }
              },
              "data": []
            },
            "example-station-id-2": {
              "name": "(default)",
              "branch": {
                "docs-example-type": {
                  "name": "(default)",
                  "branch": {},
                  "data": [
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    },
                    {
                      "timestamp": 1668522653150,
                      "value": "plapla",
                      "period": 100,
                      "_t": "it.bz.idm.bdp.dto.SimpleRecordDto"
                    }
                  ]
                }
              },
              "data": []
            }
          },
          "data": [],
          "provenance": "jxkjBCqd"
        }'

Support

For support, please contact [email protected].

Glossary

BDP (Big Data Platform)

Old name of the mobility/timeseries platform that today is called Open Data Hub. We now use BDP as a shorthand to refer to the timeseries writer API.

Data collector

Small program (microservice) that collects data from an external source, and sends it to the Open Data Hub via the BDP writer API

Elaboration

A data collector that instead of getting external data, transforms data from the Open Data Hub (e.g. some mathematical model, or a simple sum aggregation by time)

Data structures

Station:

Geographical point at which measurements can be taken. Stations are identified by a unique stationcode, a station type (just a string), an origin, and often have associated metadata.

Station Metadata:

A freeform JSON containing additional information, currently the structure is only defined by the data collector that creates it.

Origin:

ID of the data provider. Note that this is planned to be phased out in the near future and replaced with provider/source, so we know both who provided the data on a contractual, and on a technical level.

Measurement:

Single value (double, string, json) at a timestamp. A measurement is always associated to a station, a type, a period and a provenance. In BDP context often called 'Records'

Data Type (Measurement):

Data type with a description and some meta information (e.g. unit, if it's a count or average). Can also have metadata analog to stations

Period:

Periodicity/validity of a measurement in seconds. In some ways, this can be considered part of the data type. Some stations have measurements of the same type, but different periods. Usually coincides with the update interval of the measurement e.g. a temperature sensor that updates it's status every 5 minutes, will have a period of 300. You may aggregate the average temperature over an hour, with the same data type, but a period of 3600.

Provenance:

Identifies a version/configuration of a data collector. It's used to identify where exactly (which program version and provider) a measurement comes from.

Event:

The time series core also supports events, imagine traffic events like road closures, but also events in the sense of a Talk or a Workshop.

Edge:

A special type of station that represents a connection between two (point) stations. E.g. a road between two traffic stations. The edge itself is also inserted as a "normal" station that can have measurements

Data collector logic

Data collectors and elaborations typically follow this logic:

Push/Pull

Most data collectors run as a periodic job using a cron schedule. But Some data collectors are listeners that receive data via REST POST or MQTT.

Set Provenance

On startup, data collectors send their Unique provenance record and get a provenance ID for it. They use the provenance ID for all successive calls

Sync Data types

On startup, data collectors sync the data types they send. Sync means that they are created if they don't exist, and updated if they do. Attention, as many data collectors may use the same type and potentially overwrite information

Sync Stations

A list of stations, identified by common Origin and StationType is synced. Again, stations are created/updated, and stations that are not in the sync list, are deactivated. The behavior about deactivating can be configured via flags. Only one data collector per origin/stationtype should sync stations, or there will be conflicts

Push Records

Once data types and stations are synced, the data collector pushes the actual measurements

GetLastRecord

Sometimes data collectors want to know what the newest record in Open Data Hub is, so that they can ignore older records.

Idempotence

Data collectors should generally be idempotent, which means they should be able to called multiple times and still lead to the same result. Syncing stations and data types are idempotent operations by default. When records are pushed, the BDP API silently (!!) discards any records that are older than what it already has in the Open Data Hub. This means that if you push the same record twice by mistake, only the first one will be written. This also means that you cannot write old missing data, if there is already newer one existing. This constraint works on a station/datatype/period level. e.g. if the period is different you can still push older data

Authentication

We use Oauth2 for authentication. On each API call, you have to pass your access token as an Authorization header.

Some documentation can be found here: https://github.com/noi-techpark/documentation/blob/main/README.md#i-want-to-get-an-access-token-from-our-oauth-20-keycloak-server

In practice, you will be provided with credentials by the Open Data Hub team

Running a local Instance

clone the bdp-core project copy .env.example to .env docker compose up

This runs the API and a local empty postgres instance. You can now execute example calls in calls.http

When developing a data collector, authorization should also just work (for local development):

OAUTH_AUTH_URI=https://auth.opendatahub.testingmachine.eu/auth
OAUTH_TOKEN_URI=https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token
OAUTH_BASE_URI=http://localhost:8999/json
OAUTH_CLIENT_ID=odh-mobility-datacollector-development
OAUTH_CLIENT_NAME=odh-mobility-datacollector-development
OAUTH_CLIENT_SECRET=7bd46f8f-c296-416d-a13d-dc81e68d0830

Example implementations

BDP client library (Java) https://github.com/noi-techpark/bdp-core/blob/main/client/src/main/java/com/opendatahub/timeseries/bdp/client/json/JSONPusher.java

Clone this wiki locally