Skip to content

Commit

Permalink
[Doc] update iotdb document, remove duplicated config
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Sep 1, 2023
1 parent 3c13275 commit 8cb40f7
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 197 deletions.
110 changes: 75 additions & 35 deletions docs/en/connector-v2/sink/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| node_urls | Array | Yes | - | `IoTDB` cluster address, the format is `["host:port", ...]` |
| node_urls | String | Yes | - | `IoTDB` cluster address, the format is `"host1:port"` or `"host1:port,host2:port"` |
| username | String | Yes | - | `IoTDB` user username |
| password | String | Yes | - | `IoTDB` user password |
| key_device | String | No | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
| key_device | String | Yes | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
| key_timestamp | String | No | processing time | Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp |
| key_measurement_fields | Array | No | exclude `device` & `timestamp` | Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp` |
| storage_group | Array | No | - | Specify device storage group(path prefix) <br/> example: deviceId = ${storage_group} + "." + ${key_device} |
Expand All @@ -68,78 +68,118 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee
| connection_timeout_in_ms | Integer | No | - | The maximum time (in ms) to wait when connecting to `IoTDB` |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

## Task Example
## Examples

```hocon
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 16
bigint.template = [1664035200001]
schema = {
fields {
device_name = "string"
temperature = "float"
moisture = "int"
event_ts = "bigint"
field_1 = "string"
field_2 = "string"
}
}
}
}
...
```

Upstream SeaTunnelRow data format is the following:

| device_name | temperature | moisture | event_ts | field_1 | field_2 |
|--------------------------|-------------|----------|---------------|---------|---------|
| root.test_group.device_a | 36.1 | 100 | 1664035200001 | abc1 | def1 |
| root.test_group.device_b | 36.2 | 101 | 1664035200001 | abc2 | def2 |
| root.test_group.device_c | 36.3 | 102 | 1664035200001 | abc3 | def3 |

### Case1

Common options:
only fill required config, use current processing time as timestamp. and include all fields but exclude `device` & `timestamp` as measurement fields

```hocon
sink {
IoTDB {
node_urls = ["localhost:6667"]
node_urls = "localhost:6667"
username = "root"
password = "root"
batch_size = 1024
key_device = "device_name"
}
}
```

When you assign `key_device` is `device_name`, for example:
Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+---------------+---------+---------+
| Time| Device| temperature| moisture| event_ts | field_1 | field_2 |
+------------------------+------------------------+--------------+-----------+---------------+---------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001 | abc1 | def1 |
|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001 | abc2 | def2 |
|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001 | abc3 | def3 |
+------------------------+------------------------+--------------+-----------+---------------+---------+---------+
```

### Case2

use source event's time

```hocon
sink {
IoTDB {
...
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```

Upstream SeaTunnelRow data format is the following:

| device_name | field_1 | field_2 |
|--------------------------|---------|---------|
| root.test_group.device_a | 1001 | 1002 |
| root.test_group.device_b | 2001 | 2002 |
| root.test_group.device_c | 3001 | 3002 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```

### Case2
### Case3

When you assign `key_device``key_timestamp``key_measurement_fields`, for example:
use source event's time and limit measurement fields

```hocon
sink {
IoTDB {
...
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "ts"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```

Upstream SeaTunnelRow data format is the following:

| ts | device_name | field_1 | field_2 | temperature | moisture |
|---------------|--------------------------|---------|---------|-------------|----------|
| 1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
| 1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
| 1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |

Output to `IoTDB` data format is the following:

```shell
Expand Down
187 changes: 65 additions & 122 deletions docs/en/connector-v2/source/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

> IoTDB source connector
## Description
## Support Those Engines

Read external data source data through IoTDB.
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)

Expand All @@ -18,106 +20,53 @@ supports query SQL and can achieve projection effect.
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------------------|---------|----------|---------------|
| host | string | no | - |
| port | int | no | - |
| node_urls | string | no | - |
| username | string | yes | - |
| password | string | yes | - |
| sql | string | yes | - |
| schema | config | yes | - |
| fetch_size | int | no | - |
| lower_bound | long | no | - |
| upper_bound | long | no | - |
| num_partitions | int | no | - |
| thrift_default_buffer_size | int | no | - |
| enable_cache_leader | boolean | no | - |
| version | string | no | - |
| common-options | | no | - |

### single node, you need to set host and port to connect to the remote data source.

**host** [string] the host of the IoTDB when you select host of the IoTDB

**port** [int] the port of the IoTDB when you select

### multi node, you need to set node_urls to connect to the remote data source.

**node_urls** [string] the node_urls of the IoTDB when you select

e.g.

```text
127.0.0.1:8080,127.0.0.2:8080
```

### other parameters

**sql** [string]
execute sql statement e.g.

```
select name,age from test
```

### schema [config]

#### fields [Config]

The schema of the IoTDB that you want to generate

e.g.

```
schema {
fields {
name = string
age = int
}
}
```

### option parameters

### fetch_size [int]

the fetch_size of the IoTDB when you select

### username [string]

the username of the IoTDB when you select

### password [string]

the password of the IoTDB when you select

### lower_bound [long]

the lower_bound of the IoTDB when you select

### upper_bound [long]

the upper_bound of the IoTDB when you select

### num_partitions [int]

the num_partitions of the IoTDB when you select

### thrift_default_buffer_size [int]

the thrift_default_buffer_size of the IoTDB when you select

### enable_cache_leader [boolean]

enable_cache_leader of the IoTDB when you select
## Description

### version [string]
Read external data source data through IoTDB.

Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of
0.12 when upgrading 0.13. The possible values are: V_0_12, V_0_13.
:::tip

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.

:::

## Supported DataSource Info

| Datasource | Supported Versions | Url |
|------------|--------------------|----------------|
| IoTDB | `>= 0.13.0` | localhost:6667 |

## Data Type Mapping

| IotDB Data type | SeaTunnel Data type |
|-----------------|---------------------|
| BOOLEAN | BOOLEAN |
| INT32 | TINYINT |
| INT32 | SMALLINT |
| INT32 | INT |
| INT64 | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| TEXT | STRING |

## Source Options

| name | type | required | default value | Description |
|----------------------------|---------|----------|---------------|------------------------------------------------------------------------------------|
| node_urls | string | yes | - | `IoTDB` cluster address, the format is `"host1:port"` or `"host1:port,host2:port"` |
| username | string | yes | - | `IoTDB` user username |
| password | string | yes | - | `IoTDB` user password |
| sql | string | yes | - | execute sql statement |
| schema | config | yes | - | the data schema |
| fetch_size | int | no | - | the fetch_size of the IoTDB when you select |
| lower_bound | long | no | - | the lower_bound of the IoTDB when you select |
| upper_bound | long | no | - | the upper_bound of the IoTDB when you select |
| num_partitions | int | no | - | the num_partitions of the IoTDB when you select |
| thrift_default_buffer_size | int | no | - | the thrift_default_buffer_size of the IoTDB when you select |
| thrift_max_frame_size | int | no | - | the thrift max frame size |
| enable_cache_leader | boolean | no | - | enable_cache_leader of the IoTDB when you select |
| version | string | no | - | SQL semantic version used by the client, The possible values are: V_0_12, V_0_13 |
| common-options | | no | - | |

### split partitions

Expand Down Expand Up @@ -157,37 +106,31 @@ Source plugin common parameters, please refer to [Source Common Options](common-

## Examples

### Case1

Common options:

```hocon
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
schema {
fields {
ts = timestamp
device_name = string
temperature = float
moisture = bigint
}
}
}
}
```
When you assign `sql``fields``partition`, for example:

```hocon
sink {
IoTDB {
...
sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
lower_bound = 1
upper_bound = 4102329600000
num_partitions = 10
fields {
ts = bigint
device_name = string
temperature = float
moisture = bigint
}
Console {
}
}
```
Expand Down
Loading

0 comments on commit 8cb40f7

Please sign in to comment.