diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index bd842ade2817..bdf29edb9beb 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -22,8 +22,6 @@ on: branches: - dev paths-ignore: - - 'docs/**' - - '**/*.md' - 'seatunnel-ui/**' concurrency: diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 7c689a328d3d..5961c839238b 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -27,8 +27,6 @@ seatunnel: checkpoint: interval: 10000 timeout: 60000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md new file mode 100644 index 000000000000..7de8a9e838b2 --- /dev/null +++ b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md @@ -0,0 +1,47 @@ +# Kafka source compatible kafka-connect-json + +Seatunnel connector kafka supports parsing data extracted through kafka connect source, especially data extracted from kafka connect jdbc and kafka connect debezium + +# How to use + +## Kafka output to mysql + +```bash +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "localhost:9092" + topic = "jdbc_source_record" + result_table_name = "kafka_table" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = COMPATIBLE_KAFKA_CONNECT_JSON + } +} + + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://localhost:3306/seatunnel" + user = st_user + password = seatunnel + generate_sink_sql = true + database = seatunnel + table = jdbc_sink + primary_keys = ["id"] + } +} +``` + diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index fcface7da22a..7d2ef237e1ce 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -23,6 +23,7 @@ Used to write data to Redis. | mode | string | no | single | | nodes | list | yes when mode=cluster | - | | format | string | no | json | +| expire | long | no | -1 | | common-options | | no | - | ### host [string] @@ -120,6 +121,10 @@ Connector will generate data as the following and write it to redis: ``` +### expire [long] + +Set redis expiration time, the unit is second. The default value is -1, keys do not automatically expire by default. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 7841afdf04e3..4bb670ae38c8 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -1,24 +1,17 @@ # S3File -> S3 file sink connector +> S3 File Sink Connector -## Description - -Output data to aws s3 file system. - -:::tip +## Support Those Engines -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. +> Spark
+> Flink
+> SeaTunnel Zeta
-If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. - -To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. - -::: - -## Key features +## Key Features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -30,59 +23,100 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] json - [x] excel -## Options - -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| bucket | string | yes | - | | -| fs.s3a.endpoint | string | yes | - | | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | -| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | - -### path [string] - -The target dir path is required. - -### bucket [string] - -The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. - -### fs.s3a.endpoint [string] - -fs s3a endpoint - -### fs.s3a.aws.credentials.provider [string] - -The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. - -More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) - -### access_key [string] - -The access key of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +## Description -### access_secret [string] +Output data to aws s3 file system. -The access secret of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +## Supported DataSource Info + +| Datasource | Supported Versions | +|------------|--------------------| +| S3 | current | + +## Database Dependency + +> If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. +> +> If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under `${SEATUNNEL_HOME}/lib` to confirm this. +> To use this connector you need put `hadoop-aws-3.1.4.jar` and `aws-java-sdk-bundle-1.11.271.jar` in `${SEATUNNEL_HOME}/lib` dir. + +## Data Type Mapping + +If write to `csv`, `text` file type, All column will be string. + +### Orc File Type + +| SeaTunnel Data type | Orc Data type | +|----------------------|-----------------------| +| STRING | STRING | +| BOOLEAN | BOOLEAN | +| TINYINT | BYTE | +| SMALLINT | SHORT | +| INT | INT | +| BIGINT | LONG | +| FLOAT | FLOAT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| BYTES | BINARY | +| DATE | DATE | +| TIME
TIMESTAMP | TIMESTAMP | +| ROW | STRUCT | +| NULL | UNSUPPORTED DATA TYPE | +| ARRAY | LIST | +| Map | Map | + +### Parquet File Type + +| SeaTunnel Data type | Parquet Data type | +|----------------------|-----------------------| +| STRING | STRING | +| BOOLEAN | BOOLEAN | +| TINYINT | INT_8 | +| SMALLINT | INT_16 | +| INT | INT32 | +| BIGINT | INT64 | +| FLOAT | FLOAT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| BYTES | BINARY | +| DATE | DATE | +| TIME
TIMESTAMP | TIMESTAMP_MILLIS | +| ROW | GroupType | +| NULL | UNSUPPORTED DATA TYPE | +| ARRAY | LIST | +| Map | Map | + +## Sink Options + +| name | type | required | default value | Description | +|----------------------------------|---------|----------|-------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| bucket | string | yes | - | | +| fs.s3a.endpoint | string | yes | - | | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. | +| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used when have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used when have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used when have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | +| | ### hadoop_s3_properties [map] @@ -208,6 +242,83 @@ Writer the sheet of the workbook ## Example +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to S3File Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target s3 dir will also create a file and all of the data in write in it. +> Before run this job, you need create s3 path: /seatunnel/text. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + c_map = "map>" + c_array = "array" + name = string + c_boolean = boolean + age = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + S3File { + bucket = "s3a://seatunnel-test" + tmp_path = "/tmp/seatunnel" + path="/seatunnel/text" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + sink_columns = ["name","age"] + is_enable_transaction=true + hadoop_s3_properties { + "fs.s3a.buffer.dir" = "/data/st_test/s3a" + "fs.s3a.fast.upload.buffer" = "disk" + } + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + For text file format with `have_partition` and `custom_filename` and `sink_columns` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` ```hocon diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index ec3a93553364..f90d42ab1cba 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -2,11 +2,13 @@ > My Hours source connector -## Description +## Support Those Engines -Used to read data from My Hours. +> Spark
+> Flink
+> SeaTunnel Zeta
-## Key features +## Key Features - [x] [batch](../../concept/connector-v2-features.md) - [ ] [stream](../../concept/connector-v2-features.md) @@ -15,71 +17,103 @@ Used to read data from My Hours. - [ ] [parallelism](../../concept/connector-v2-features.md) - [ ] [support user-defined split](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|-----------------------------|---------|----------|---------------| -| url | String | Yes | - | -| email | String | Yes | - | -| password | String | Yes | - | -| method | String | No | get | -| schema | Config | No | - | -| schema.fields | Config | No | - | -| format | String | No | json | -| params | Map | No | - | -| body | String | No | - | -| json_field | Config | No | - | -| content_json | String | No | - | -| poll_interval_ms | int | No | - | -| retry | int | No | - | -| retry_backoff_multiplier_ms | int | No | 100 | -| retry_backoff_max_ms | int | No | 10000 | -| enable_multi_lines | boolean | No | false | -| common-options | config | No | - | - -### url [String] - -http request url - -### email [String] - -email for login - -### password [String] - -password for login - -### method [String] - -http request method, only supports GET, POST method - -### params [Map] - -http params - -### body [String] - -http body - -### poll_interval_ms [int] +## Description -request http api interval(millis) in stream mode +Used to read data from My Hours. -### retry [int] +## Key features -The max retry times if request http return to `IOException` +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) -### retry_backoff_multiplier_ms [int] +## Supported DataSource Info + +In order to use the My Hours connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. + +| Datasource | Supported Versions | Dependency | +|------------|--------------------|---------------------------------------------------------------------------------------------| +| My Hours | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2) | + +## Source Options + +| Name | Type | Required | Default | Description | +|-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | Http request url. | +| email | String | Yes | - | My hours login email address. | +| password | String | Yes | - | My hours login password. | +| schema | Config | No | - | Http and seatunnel data structure mapping | +| schema.fields | Config | No | - | The schema fields of upstream data | +| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | +| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | +| format | String | No | json | The format of upstream data, now only support `json` `text`, default `json`. | +| method | String | No | get | Http request method, only supports GET, POST method. | +| headers | Map | No | - | Http headers. | +| params | Map | No | - | Http params. | +| body | String | No | - | Http body. | +| poll_interval_ms | Int | No | - | Request http api interval(millis) in stream mode. | +| retry | Int | No | - | The max retry times if request http return to `IOException`. | +| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed. | +| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | +| enable_multi_lines | Boolean | No | false | | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +## How to Create a My Hours Data Synchronization Jobs -The retry-backoff times(millis) multiplier if request http failed +```hocon +env { + execution.parallelism = 1 + job.mode = "BATCH" +} -### retry_backoff_max_ms [int] +MyHours{ + url = "https://api2.myhours.com/api/Projects/getAll" + email = "seatunnel@test.com" + password = "seatunnel" + schema { + fields { + name = string + archived = boolean + dateArchived = string + dateCreated = string + clientName = string + budgetAlertPercent = string + budgetType = int + totalTimeLogged = double + budgetValue = double + totalAmount = double + totalExpense = double + laborCost = double + totalCost = double + billableTimeLogged = double + totalBillableAmount = double + billable = boolean + roundType = int + roundInterval = int + budgetSpentPercentage = double + budgetTarget = int + budgetPeriodType = string + budgetSpent = string + id = string + } + } +} -The maximum retry-backoff times(millis) if request http failed +# Console printing of the read data +sink { + Console { + parallelism = 1 + } +} +``` -### format [String] +## Parameter Interpretation -the format of upstream data, now only support `json` `text`, default `json`. +### format when you assign format is `json`, you should also assign schema option, for example: @@ -98,11 +132,11 @@ you should assign schema as the following: ```hocon schema { - fields { - code = int - data = string - success = boolean - } + fields { + code = int + data = string + success = boolean + } } ``` @@ -131,13 +165,7 @@ connector will generate data as the following: |----------------------------------------------------------| | {"code": 200, "data": "get success", "success": true} | -### schema [Config] - -#### fields [Config] - -the schema fields of upstream data - -### content_json [String] +### content_json This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. @@ -212,14 +240,14 @@ Here is an example: - Test data can be found at this link [mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json) - See this link for task configuration [http_contentjson_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf). -### json_field [Config] +### json_field This parameter helps you configure the schema,so this parameter must be used with schema. If your data looks something like this: ```json -{ +{ "store": { "book": [ { @@ -273,47 +301,6 @@ source { - Test data can be found at this link [mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json) - See this link for task configuration [http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf). -### common options - -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details - -## Example - -```hocon -MyHours{ - url = "https://api2.myhours.com/api/Projects/getAll" - email = "seatunnel@test.com" - password = "seatunnel" - schema { - fields { - name = string - archived = boolean - dateArchived = string - dateCreated = string - clientName = string - budgetAlertPercent = string - budgetType = int - totalTimeLogged = double - budgetValue = double - totalAmount = double - totalExpense = double - laborCost = double - totalCost = double - billableTimeLogged = double - totalBillableAmount = double - billable = boolean - roundType = int - roundInterval = int - budgetSpentPercentage = double - budgetTarget = int - budgetPeriodType = string - budgetSpent = string - id = string - } - } -} -``` - ## Changelog ### next version diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index f7ad1cc8bd0f..54124a370382 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -1,22 +1,14 @@ # S3File -> S3 file source connector +> S3 File Source Connector -## Description - -Read data from aws s3 file system. - -:::tip - -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. - -If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. +## Support Those Engines -To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. +> Spark
+> Flink
+> SeaTunnel Zeta
-::: - -## Key features +## Key Features - [x] [batch](../../concept/connector-v2-features.md) - [ ] [stream](../../concept/connector-v2-features.md) @@ -35,104 +27,31 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] json - [x] excel -## Options - -| name | type | required | default value | -|---------------------------------|---------|----------|-------------------------------------------------------| -| path | string | yes | - | -| file_format_type | string | yes | - | -| bucket | string | yes | - | -| fs.s3a.endpoint | string | yes | - | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | -| read_columns | list | no | - | -| access_key | string | no | - | -| access_secret | string | no | - | -| hadoop_s3_properties | map | no | - | -| delimiter | string | no | \001 | -| parse_partition_from_path | boolean | no | true | -| date_format | string | no | yyyy-MM-dd | -| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | -| time_format | string | no | HH:mm:ss | -| skip_header_row_number | long | no | 0 | -| schema | config | no | - | -| common-options | | no | - | -| sheet_name | string | no | - | -| file_filter_pattern | string | no | - | - -### path [string] - -The source file path. - -### fs.s3a.endpoint [string] - -fs s3a endpoint - -### fs.s3a.aws.credentials.provider [string] - -The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. - -More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) - -### delimiter [string] - -Field delimiter, used to tell connector how to slice and dice fields when reading text files - -default `\001`, the same as hive's default delimiter - -### parse_partition_from_path [boolean] - -Control whether parse the partition keys and values from file path - -For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` - -Every record data from file will be added these two fields: - -| name | age | -|---------------|-----| -| tyrantlucifer | 26 | - -Tips: **Do not define partition fields in schema option** - -### date_format [string] - -Date type format, used to tell connector how to convert string to date, supported as the following formats: - -`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` - -default `yyyy-MM-dd` - -### datetime_format [string] - -Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats: - -`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` - -default `yyyy-MM-dd HH:mm:ss` - -### time_format [string] - -Time type format, used to tell connector how to convert string to time, supported as the following formats: - -`HH:mm:ss` `HH:mm:ss.SSS` - -default `HH:mm:ss` +## Description -### skip_header_row_number [long] +Read data from aws s3 file system. -Skip the first few lines, but only for the txt and csv. +## Supported DataSource Info -For example, set like following: +| Datasource | Supported versions | +|------------|--------------------| +| S3 | current | -`skip_header_row_number = 2` +## Dependency -then SeaTunnel will skip the first 2 lines from source files +> If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+> +> If you use SeaTunnel Zeta, It automatically integrated the hadoop jar when you download and install SeaTunnel Zeta. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this.
+> To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. -### file_format_type [string] +## Data Type Mapping -File type, supported as the following file types: +Data type mapping is related to the type of file being read, We supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` +### JSON File Type + If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. For example: @@ -174,7 +93,7 @@ connector will generate data as the following: |------|-------------|---------| | 200 | get success | true | -If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. +### Text Or CSV File Type If you assign file type to `text` `csv`, you can choose to specify the schema information or not. @@ -215,61 +134,102 @@ connector will generate data as the following: |---------------|-----|--------| | tyrantlucifer | 26 | male | -### bucket [string] - -The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. - -### access_key [string] - -The access key of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) - -### access_secret [string] - -The access secret of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +### Orc File Type -### hadoop_s3_properties [map] - -If you need to add a other option, you could add it here and refer to this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) - -``` -hadoop_s3_properties { - "xxx" = "xxx" - } -``` - -### schema [config] - -#### fields [Config] - -The schema of upstream data. - -### read_columns [list] - -The read column list of the data source, user can use it to implement field projection. - -The file type supported column projection as the following shown: - -- text -- json -- csv -- orc -- parquet -- excel +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. -**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured** +| Orc Data type | SeaTunnel Data type | +|----------------------------------|----------------------------------------------------------------| +| BOOLEAN | BOOLEAN | +| INT | INT | +| BYTE | BYTE | +| SHORT | SHORT | +| LONG | LONG | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BINARY | BINARY | +| STRING
VARCHAR
CHAR
| STRING | +| DATE | LOCAL_DATE_TYPE | +| TIMESTAMP | LOCAL_DATE_TIME_TYPE | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType, This type of K and V will transform to SeaTunnel type | +| STRUCT | SeaTunnelRowType | + +### Parquet File Type -### common options +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +| Orc Data type | SeaTunnel Data type | +|----------------------|----------------------------------------------------------------| +| INT_8 | BYTE | +| INT_16 | SHORT | +| DATE | DATE | +| TIMESTAMP_MILLIS | TIMESTAMP | +| INT64 | LONG | +| INT96 | TIMESTAMP | +| BINARY | BYTES | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOLEAN | +| FIXED_LEN_BYTE_ARRAY | TIMESTAMP
DECIMAL | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType, This type of K and V will transform to SeaTunnel type | +| STRUCT | SeaTunnelRowType | -### sheet_name [string] +## Options -Reader the sheet of the workbook,Only used when file_format_type is excel. +| name | type | required | default value | Description | +|---------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The s3 path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | +| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` | +| bucket | string | yes | - | The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. | +| fs.s3a.endpoint | string | yes | - | fs s3a endpoint | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) | +| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | +| access_key | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | +| access_secret | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | +| hadoop_s3_properties | map | no | - | If you need to add other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | +| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. | +| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 | +| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | +| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | +| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | +| schema | config | no | - | The schema of upstream data. | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | +| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | ## Example -```hocon +1. In this example, We read data from s3 path `s3a://seatunnel-test/seatunnel/text` and the file type is orc in this path. + We use `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` to authentication so `access_key` and `secret_key` is required. + All columns in the file will be read and send to sink. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} +source { S3File { path = "/seatunnel/text" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" @@ -279,9 +239,21 @@ Reader the sheet of the workbook,Only used when file_format_type is excel. bucket = "s3a://seatunnel-test" file_format_type = "orc" } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} +sink { + Console {} +} ``` +2. Use `InstanceProfileCredentialsProvider` to authentication + The file type in S3 is json, so need config schema option. + ```hocon S3File { @@ -300,9 +272,47 @@ Reader the sheet of the workbook,Only used when file_format_type is excel. ``` -### file_filter_pattern [string] +3. Use `InstanceProfileCredentialsProvider` to authentication + The file type in S3 is json and has five fields (`id`, `name`, `age`, `sex`, `type`), so need config schema option. + In this job, we only need send `id` and `name` column to mysql. -Filter pattern, which used for filtering files. +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + S3File { + path = "/seatunnel/json" + bucket = "s3a://seatunnel-test" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "json" + read_columns = ["id", "name"] + schema { + fields { + id = int + name = string + age = int + sex = int + type = string + } + } + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + Console {} +} +``` ## Changelog diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index a88f301439e4..f2a6487f28d2 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -59,8 +59,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -94,8 +92,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -119,8 +115,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -152,6 +146,28 @@ seatunnel: kerberosKeytab: your-kerberos-keytab ``` +if HDFS is in HA mode , you can config like this: + +```yaml +seatunnel: + engine: + checkpoint: + storage: + type: hdfs + max-retained: 3 + plugin-config: + storage.type: hdfs + fs.defaultFS: hdfs://usdp-bing + seatunnel.hadoop.dfs.nameservices: usdp-bing + seatunnel.hadoop.dfs.ha.namenodes.usdp-bing: nn1,nn2 + seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1: usdp-bing-nn1:8020 + seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2: usdp-bing-nn2:8020 + seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + +``` + +if HDFS has some other configs in `hdfs-site.xml` or `core-site.xml` , just set HDFS config by using `seatunnel.hadoop.` prefix. + #### LocalFile ```yaml @@ -160,8 +176,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index 1f8692530cdd..18c1a587a2a3 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -75,14 +75,6 @@ The interval between two checkpoints, unit is milliseconds. If the `checkpoint.i The timeout of a checkpoint. If a checkpoint cannot be completed within the timeout period, a checkpoint failure will be triggered. Therefore, Job will be restored. -**max-concurrent** - -How many checkpoints can be performed simultaneously at most. - -**tolerable-failure** - -Maximum number of retries after checkpoint failure. - Example ``` @@ -95,8 +87,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 - tolerable-failure: 2 ``` **checkpoint storage** diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java index 2100b9529cdc..5fabe2a284a9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java @@ -35,6 +35,9 @@ public byte[] serialize(T obj) throws IOException { @Override public T deserialize(byte[] serialized) throws IOException { + if (serialized == null) { + return null; + } return SerializationUtils.deserialize(serialized); } } diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 0ce4bba6b171..7955ab3f5467 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -31,6 +31,7 @@ 3.2.0 + 1.6.4.Final @@ -61,6 +62,17 @@ seatunnel-format-compatible-debezium-json ${project.version} + + org.apache.seatunnel + seatunnel-format-compatible-connect-json + ${project.version} + + + org.apache.kafka + connect-json + ${kafka.client.version} + + diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java index 1ef29f6322a3..07f9a38ddffe 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java @@ -22,5 +22,6 @@ public enum MessageFormat { TEXT, CANAL_JSON, DEBEZIUM_JSON, - COMPATIBLE_DEBEZIUM_JSON + COMPATIBLE_DEBEZIUM_JSON, + COMPATIBLE_KAFKA_CONNECT_JSON } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 30878e82a2c4..802d7986a94c 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -45,6 +45,7 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; +import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema; import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema; import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema; @@ -268,6 +269,11 @@ private void setDeserialization(Config config) { .setIgnoreParseErrors(true) .build(); break; + case COMPATIBLE_KAFKA_CONNECT_JSON: + deserializationSchema = + new CompatibleKafkaConnectDeserializationSchema( + typeInfo, config, false, false); + break; case DEBEZIUM_JSON: boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue(); if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index 226fded2409b..a2d3bae2b4d3 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay; import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException; +import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -150,9 +151,18 @@ public void pollNext(Collector output) throws Exception { recordList) { try { - deserializationSchema.deserialize( - record.value(), output); - } catch (Exception e) { + if (deserializationSchema + instanceof + CompatibleKafkaConnectDeserializationSchema) { + ((CompatibleKafkaConnectDeserializationSchema) + deserializationSchema) + .deserialize( + record, output); + } else { + deserializationSchema.deserialize( + record.value(), output); + } + } catch (IOException e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay .SKIP) { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index c777d2378273..511cbe4aa993 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -102,6 +102,12 @@ public enum HashKeyParseMode { .withDescription( "hash key parse mode, support all or kv, default value is all"); + public static final Option EXPIRE = + Options.key("expire") + .longType() + .defaultValue(-1L) + .withDescription("Set redis expiration time."); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java index 64772b5381d3..a315e0cdae0c 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java @@ -30,8 +30,9 @@ public enum RedisDataType { KEY { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.set(key, value); + expire(jedis, key, expire); } @Override @@ -41,9 +42,10 @@ public List get(Jedis jedis, String key) { }, HASH { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { Map fieldsMap = JsonUtils.toMap(value); jedis.hset(key, fieldsMap); + expire(jedis, key, expire); } @Override @@ -54,8 +56,9 @@ public List get(Jedis jedis, String key) { }, LIST { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.lpush(key, value); + expire(jedis, key, expire); } @Override @@ -65,8 +68,9 @@ public List get(Jedis jedis, String key) { }, SET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.sadd(key, value); + expire(jedis, key, expire); } @Override @@ -77,8 +81,9 @@ public List get(Jedis jedis, String key) { }, ZSET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.zadd(key, 1, value); + expire(jedis, key, expire); } @Override @@ -91,7 +96,13 @@ public List get(Jedis jedis, String key) { return Collections.emptyList(); } - public void set(Jedis jedis, String key, String value) { + private static void expire(Jedis jedis, String key, long expire) { + if (expire > 0) { + jedis.expire(key, expire); + } + } + + public void set(Jedis jedis, String key, String value, long expire) { // do nothing } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index c8bb879d0f5b..8954b4da2a1f 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -47,6 +47,7 @@ public class RedisParameters implements Serializable { private RedisConfig.RedisMode mode; private RedisConfig.HashKeyParseMode hashKeyParseMode; private List redisNodes = Collections.emptyList(); + private long expire = RedisConfig.EXPIRE.defaultValue(); public void buildWithConfig(Config config) { // set host @@ -89,6 +90,9 @@ public void buildWithConfig(Config config) { if (config.hasPath(RedisConfig.KEY_PATTERN.key())) { this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key()); } + if (config.hasPath(RedisConfig.EXPIRE.key())) { + this.expire = config.getLong(RedisConfig.EXPIRE.key()); + } // set redis data type try { String dataType = config.getString(RedisConfig.DATA_TYPE.key()); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index e68a893f79c3..22ae1568740e 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -41,7 +41,8 @@ public OptionRule optionRule() { RedisConfig.AUTH, RedisConfig.USER, RedisConfig.KEY_PATTERN, - RedisConfig.FORMAT) + RedisConfig.FORMAT, + RedisConfig.EXPIRE) .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) .build(); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 657e3aaa5658..80b1449b9d6d 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -59,7 +59,8 @@ public void write(SeaTunnelRow element) throws IOException { } else { key = keyField; } - redisDataType.set(jedis, key, data); + long expire = redisParameters.getExpire(); + redisDataType.set(jedis, key, data, expire); } @Override diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java index 0e6de2f60127..5bf15c533a15 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java @@ -19,9 +19,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.common.utils.JsonUtils; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; public class StarRocksJsonSerializer extends StarRocksBaseSerializer @@ -38,10 +39,22 @@ public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType, boolean enable @Override public String serialize(SeaTunnelRow row) { - Map rowMap = new HashMap<>(row.getFields().length); + Map rowMap = new LinkedHashMap<>(row.getFields().length); for (int i = 0; i < row.getFields().length; i++) { - Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + SqlType sqlType = seaTunnelRowType.getFieldType(i).getSqlType(); + Object value; + if (sqlType == SqlType.ARRAY + || sqlType == SqlType.MAP + || sqlType == SqlType.ROW + || sqlType == SqlType.MULTIPLE_ROW) { + // If the field type is complex type, we should keep the origin value. + // It will be transformed to json string in the next step + // JsonUtils.toJsonString(rowMap). + value = row.getField(i); + } else { + value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + } rowMap.put(seaTunnelRowType.getFieldName(i), value); } if (enableUpsertDelete) { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java new file mode 100644 index 000000000000..6e0d9476441d --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +public class StarRocksJsonSerializerTest { + + @Test + public void serialize() { + String[] filedNames = {"id", "name", "array", "map"}; + SeaTunnelDataType[] filedTypes = { + BasicType.LONG_TYPE, + BasicType.STRING_TYPE, + ArrayType.STRING_ARRAY_TYPE, + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE) + }; + + SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(filedNames, filedTypes); + StarRocksJsonSerializer starRocksJsonSerializer = + new StarRocksJsonSerializer(seaTunnelRowType, false); + Object[] fields = { + 1, "Tom", new String[] {"tag1", "tag2"}, Collections.singletonMap("key1", "value1") + }; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + String jsonString = starRocksJsonSerializer.serialize(seaTunnelRow); + Assertions.assertEquals( + "{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"}}", + jsonString); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml index 81cbb7856984..fa2e1930cce4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml @@ -92,6 +92,11 @@ postgresql test + + mysql + mysql-connector-java + test + org.testcontainers mysql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java new file mode 100644 index 000000000000..591049917f8f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kafka; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}) +public class KafkaConnectToKafkaIT extends TestSuiteBase implements TestResource { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectToKafkaIT.class); + private final ObjectMapper objectMapper = new ObjectMapper(); + // kafka + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest"; + + private static final String KAFKA_JDBC_TOPIC = "jdbc_source_record"; + + private static final String KAFKA_HOST = "kafka_connect_source_record"; + + private static KafkaContainer KAFKA_CONTAINER; + + private KafkaProducer kafkaProducer; + + // -----------------------------------mysql----------------------------------------- + private static MySqlContainer MYSQL_CONTAINER; + private static final String MYSQL_DATABASE = "seatunnel"; + private static final String MYSQL_HOST = "kafka_to_mysql_e2e"; + private static final int MYSQL_PORT = 3306; + private static final String MYSQL_DRIVER_JAR = + "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + MYSQL_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + MySqlContainer mySqlContainer = + new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName("seatunnel") + .withUsername("st_user") + .withPassword("seatunnel") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + mySqlContainer.setPortBindings( + com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", MYSQL_PORT, MYSQL_PORT))); + return mySqlContainer; + } + + private void createKafkaContainer() { + KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + } + + @BeforeAll + @Override + public void startUp() { + + LOG.info("The first stage: Starting Kafka containers..."); + createKafkaContainer(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + LOG.info("Kafka Containers are started"); + + given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initKafkaProducer); + + LOG.info("The second stage: Starting Mysql containers..."); + MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Mysql Containers are started"); + + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeDatabase); + + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeJdbcTable); + + log.info("Write 3 records to topic " + KAFKA_JDBC_TOPIC); + generateConnectJdbcRecord(); + } + + @TestTemplate + public void testJdbcRecordKafkaToMysql(TestContainer container) + throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = + container.executeJob("/kafkasource_jdbc_record_to_mysql.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List actual = new ArrayList<>(); + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword())) { + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = + statement.executeQuery("select * from seatunnel.jdbc_sink order by id"); + while (resultSet.next()) { + List row = + Arrays.asList( + resultSet.getInt("id"), + resultSet.getString("name"), + resultSet.getString("description"), + resultSet.getString("weight")); + actual.add(row); + } + } + } + List expected = + Lists.newArrayList( + Arrays.asList(15, "test", "test", "20"), + Arrays.asList(16, "test-001", "test", "30"), + Arrays.asList(18, "sdc", "sdc", "sdc")); + Assertions.assertIterableEquals(expected, actual); + + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword())) { + try (Statement statement = connection.createStatement()) { + statement.execute("truncate table seatunnel.jdbc_sink"); + LOG.info("testJdbcRecordKafkaToMysql truncate table sink"); + } + } + } + + @SneakyThrows + public void generateConnectJdbcRecord() { + String[] jdbcSourceRecords = { + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":15,\"name\":\"test\",\"description\":\"test\",\"weight\":\"20\"}}", + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":16,\"name\":\"test-001\",\"description\":\"test\",\"weight\":\"30\"}}", + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":18,\"name\":\"sdc\",\"description\":\"sdc\",\"weight\":\"sdc\"}}" + }; + for (String value : jdbcSourceRecords) { + JsonNode jsonNode = objectMapper.readTree(value); + byte[] bytes = objectMapper.writeValueAsBytes(jsonNode); + ProducerRecord producerRecord = + new ProducerRecord<>(KAFKA_JDBC_TOPIC, null, bytes); + kafkaProducer.send(producerRecord).get(); + } + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + kafkaProducer = new KafkaProducer<>(props); + } + + @Override + public void tearDown() { + MYSQL_CONTAINER.close(); + KAFKA_CONTAINER.close(); + } + + protected void initializeDatabase() { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword())) { + Statement statement = connection.createStatement(); + String sql = "CREATE DATABASE IF NOT EXISTS " + MYSQL_DATABASE; + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Mysql database failed!", e); + } + } + + private void initializeJdbcTable() { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword())) { + Statement statement = connection.createStatement(); + String jdbcSink = + "CREATE TABLE IF NOT EXISTS seatunnel.jdbc_sink(\n" + + "id INT NOT NULL PRIMARY KEY,\n" + + "name varchar(255),\n" + + "description varchar(255),\n" + + "weight varchar(255)" + + ")"; + statement.execute(jdbcSink); + } catch (SQLException e) { + throw new RuntimeException("Initializing Mysql table failed!", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf new file mode 100644 index 000000000000..36ae276e0349 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafka_connect_source_record:9092" + topic = "jdbc_source_record" + result_table_name = "kafka_table" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = COMPATIBLE_KAFKA_CONNECT_JSON + } +} + + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://kafka_to_mysql_e2e:3306/seatunnel" + user = st_user + password = seatunnel + generate_sink_sql = true + database = seatunnel + table = jdbc_sink + primary_keys = ["id"] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 808f6860337e..bd4a9063ba13 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -192,4 +192,15 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx jedis.del("key_list"); Assertions.assertEquals(0, jedis.llen("key_list")); } + + @TestTemplate + public void testRedisWithExpire(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis-expire.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(100, jedis.llen("key_list")); + // Clear data to prevent data duplication in the next TestContainer + Thread.sleep(60 * 1000); + Assertions.assertEquals(0, jedis.llen("key_list")); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf new file mode 100644 index 000000000000..4a42bd3a46af --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = key + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "key_list" + data_type = list + expire = 30 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 501e763e3c47..cba498e99922 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -19,22 +19,18 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; -import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; -import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.utils.ExceptionUtil; -import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.hazelcast.client.config.ClientConfig; @@ -52,8 +48,8 @@ public class JobExecutionIT { private static HazelcastInstanceImpl hazelcastInstance; - @BeforeAll - public static void beforeClass() throws Exception { + @BeforeEach + public void beforeClass() { hazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance( TestUtils.getClusterName("JobExecutionIT")); @@ -86,10 +82,7 @@ public void testExecuteJob() throws Exception { final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture objectCompletableFuture = - CompletableFuture.supplyAsync( - () -> { - return clientJobProxy.waitForJobComplete(); - }); + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); await().atMost(600000, TimeUnit.MILLISECONDS) .untilAsserted( @@ -116,12 +109,8 @@ public void cancelJobTest() throws Exception { final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); JobStatus jobStatus1 = clientJobProxy.getJobStatus(); Assertions.assertFalse(jobStatus1.isEndState()); - ClientJobProxy finalClientJobProxy = clientJobProxy; CompletableFuture objectCompletableFuture = - CompletableFuture.supplyAsync( - () -> { - return finalClientJobProxy.waitForJobComplete(); - }); + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); Thread.sleep(1000); clientJobProxy.cancelJob(); @@ -146,38 +135,12 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - JobStatus jobStatus = clientJobProxy.getJobStatus(); - while (jobStatus == JobStatus.RUNNING) { - Thread.sleep(1000L); - jobStatus = clientJobProxy.getJobStatus(); - } - - CompletableFuture completableFuture = - CompletableFuture.supplyAsync( - () -> { - try { - return RetryUtils.retryWithException( - () -> { - PassiveCompletableFuture jobFuture = - clientJobProxy.doWaitForJobComplete(); - return jobFuture.get(); - }, - new RetryUtils.RetryMaterial( - 100000, - true, - exception -> - ExceptionUtil.isOperationNeedRetryException( - exception), - Constant.OPERATION_RETRY_SLEEP)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - + CompletableFuture completableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); await().atMost(600000, TimeUnit.MILLISECONDS) .untilAsserted(() -> Assertions.assertTrue(completableFuture.isDone())); - JobResult result = completableFuture.get(); + JobResult result = clientJobProxy.getJobResultCache(); Assertions.assertEquals(result.getStatus(), JobStatus.FAILED); Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); } @@ -197,18 +160,16 @@ public void testExpiredJobWasDeleted() throws Exception { final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - JobResult result = clientJobProxy.doWaitForJobComplete().get(); - Assertions.assertEquals(result.getStatus(), JobStatus.FINISHED); + Assertions.assertEquals(clientJobProxy.waitForJobComplete(), JobStatus.FINISHED); await().atMost(65, TimeUnit.SECONDS) .untilAsserted( () -> Assertions.assertThrowsExactly( - NullPointerException.class, - () -> clientJobProxy.getJobStatus())); + NullPointerException.class, clientJobProxy::getJobStatus)); } - @AfterAll - static void afterClass() { + @AfterEach + void afterClass() { if (hazelcastInstance != null) { hazelcastInstance.shutdown(); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 3897ae95031b..4276fc87916f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -26,8 +26,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: localfile max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml index ea5b5ac23070..4678cfed3d5a 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml @@ -24,8 +24,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 5b8bbf697635..2010d1f4155f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -161,16 +161,6 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) { getIntegerValue( ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) { - checkpointConfig.setMaxConcurrentCheckpoints( - getIntegerValue( - ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(), - getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) { - checkpointConfig.setTolerableFailureCheckpoints( - getIntegerValue( - ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(), - getTextContent(node))); } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) { checkpointConfig.setStorage(parseCheckpointStorageConfig(node)); } else { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 7038a65b422d..8d521f2b8b84 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -33,10 +33,6 @@ public class CheckpointConfig implements Serializable { private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue(); private long schemaChangeCheckpointTimeout = ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue(); - private int maxConcurrentCheckpoints = - ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue(); - private int tolerableFailureCheckpoints = - ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue(); private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); @@ -60,18 +56,4 @@ public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) { "The minimum checkpoint timeout is 10 ms."); this.schemaChangeCheckpointTimeout = checkpointTimeout; } - - public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { - checkArgument( - maxConcurrentCheckpoints >= 1, - "The minimum number of concurrent checkpoints is 1."); - this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; - } - - public void setTolerableFailureCheckpoints(int tolerableFailureCheckpoints) { - checkArgument( - maxConcurrentCheckpoints >= 0, - "The number of tolerance failed checkpoints must be a natural number."); - this.tolerableFailureCheckpoints = tolerableFailureCheckpoints; - } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 2409e59ca2dc..486f11878e5e 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -92,18 +92,6 @@ public class ServerConfigOptions { .withDescription( "The timeout (in milliseconds) for a schema change checkpoint."); - public static final Option CHECKPOINT_MAX_CONCURRENT = - Options.key("max-concurrent") - .intType() - .defaultValue(1) - .withDescription("The maximum number of concurrent checkpoints."); - - public static final Option CHECKPOINT_TOLERABLE_FAILURE = - Options.key("tolerable-failure") - .intType() - .defaultValue(0) - .withDescription("The tolerable failure number of a checkpoint."); - public static final Option CHECKPOINT_STORAGE_TYPE = Options.key("type") .stringType() diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index e5d92281da7b..cc14d81eafa3 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -25,8 +25,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java index 4c199b352ef0..ed6853e39b4f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java @@ -53,12 +53,6 @@ public void testSeaTunnelConfig() { Assertions.assertEquals( 7000, config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout()); - Assertions.assertEquals( - 1, config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints()); - - Assertions.assertEquals( - 2, config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints()); - Assertions.assertEquals( "hdfs", config.getEngineConfig().getCheckpointConfig().getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml index 4f6ce5f4ef1b..8453bdeecaaf 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml @@ -25,8 +25,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 1b6bc6b68719..222f60a5cb50 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -117,7 +117,6 @@ public class CheckpointCoordinator { private final CheckpointConfig coordinatorConfig; - private int tolerableFailureCheckpoints; private transient ScheduledExecutorService scheduler; private final AtomicLong latestTriggerTimestamp = new AtomicLong(0); @@ -165,7 +164,6 @@ public CheckpointCoordinator( this.runningJobStateIMap = runningJobStateIMap; this.plan = plan; this.coordinatorConfig = checkpointConfig; - this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints(); this.pendingCheckpoints = new ConcurrentHashMap<>(); this.completedCheckpoints = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1); @@ -392,7 +390,6 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) { if (currentTimestamp - latestTriggerTimestamp.get() < coordinatorConfig.getCheckpointInterval() - || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) { return; } @@ -531,16 +528,9 @@ private void startTriggerPendingCheckpoint( if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) { - if (tolerableFailureCheckpoints-- <= 0 - || pendingCheckpoint - .getCheckpointType() - .isSchemaChangeCheckpoint()) { - LOG.info( - "timeout checkpoint: " - + pendingCheckpoint.getInfo()); - handleCoordinatorError( - CheckpointCloseReason.CHECKPOINT_EXPIRED, null); - } + LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo()); + handleCoordinatorError( + CheckpointCloseReason.CHECKPOINT_EXPIRED, null); } }, checkpointTimeout, @@ -746,12 +736,6 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed notifyCompleted(completedCheckpoint); pendingCheckpoints.remove(checkpointId); pendingCounter.decrementAndGet(); - if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) { - // latest checkpoint completed time > checkpoint interval - if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) { - scheduleTriggerPendingCheckpoint(0L); - } - } if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 69d72d7130a9..a238ae134c94 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -552,7 +552,10 @@ private List getSourceTask( .getJobId(), taskLocation, finalParallelismIndex, - f); + (PhysicalExecutionFlow< + SourceAction, + SourceConfig>) + f); } else { return new TransformSeaTunnelTask( jobImmutableInformation diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 404956a7e71f..5137f23b7bad 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -264,10 +264,6 @@ private CheckpointConfig createJobCheckpointConfig( CheckpointConfig jobCheckpointConfig = new CheckpointConfig(); jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout()); jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval()); - jobCheckpointConfig.setMaxConcurrentCheckpoints( - defaultCheckpointConfig.getMaxConcurrentCheckpoints()); - jobCheckpointConfig.setTolerableFailureCheckpoints( - defaultCheckpointConfig.getTolerableFailureCheckpoints()); CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig(); jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index 1a8ecf29c809..a904146a1d82 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -34,6 +34,7 @@ import org.apache.commons.collections4.CollectionUtils; import com.hazelcast.cluster.Address; +import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -75,6 +77,8 @@ public class SinkAggregatedCommitterTask private final SinkAggregatedCommitter aggregatedCommitter; private transient Serializer aggregatedCommitInfoSerializer; + @Getter private transient Serializer commitInfoSerializer; + private Map writerAddressMap; private ConcurrentMap> commitInfoCache; @@ -107,6 +111,7 @@ public void init() throws Exception { this.writerAddressMap = new ConcurrentHashMap<>(); this.checkpointCommitInfoMap = new ConcurrentHashMap<>(); this.completableFuture = new CompletableFuture<>(); + this.commitInfoSerializer = sink.getSink().getCommitInfoSerializer().get(); this.aggregatedCommitInfoSerializer = sink.getSink().getAggregatedCommitInfoSerializer().get(); log.debug( @@ -250,6 +255,7 @@ public void restoreState(List actionStateList) throws Except actionStateList.stream() .map(ActionSubtaskState::getState) .flatMap(Collection::stream) + .filter(Objects::nonNull) .map( bytes -> sneaky( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 842cf8a6022d..8650dc7f2a68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -18,10 +18,11 @@ package org.apache.seatunnel.engine.server.task; import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig; -import org.apache.seatunnel.engine.server.dag.physical.flow.Flow; +import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow; import org.apache.seatunnel.engine.server.execution.ProgressState; import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; @@ -29,6 +30,7 @@ import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; +import lombok.Getter; import lombok.NonNull; import java.util.List; @@ -41,15 +43,24 @@ public class SourceSeaTunnelTask extends SeaTunne private transient SeaTunnelSourceCollector collector; private transient Object checkpointLock; + @Getter private transient Serializer splitSerializer; + private final PhysicalExecutionFlow sourceFlow; - public SourceSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) { + public SourceSeaTunnelTask( + long jobID, + TaskLocation taskID, + int indexID, + PhysicalExecutionFlow executionFlow) { super(jobID, taskID, indexID, executionFlow); + this.sourceFlow = executionFlow; } @Override public void init() throws Exception { super.init(); this.checkpointLock = new Object(); + this.splitSerializer = sourceFlow.getAction().getSource().getSplitSerializer(); + LOGGER.info("starting seatunnel source task, index " + indexID); if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) { throw new TaskRuntimeException( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index da5fa8aeb3a0..25fdbc9638ce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -37,6 +37,7 @@ import com.hazelcast.cluster.Address; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; +import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -77,6 +78,7 @@ public class SourceSplitEnumeratorTask extends Coord private SeaTunnelSplitEnumeratorContext enumeratorContext; private Serializer enumeratorStateSerializer; + @Getter private Serializer splitSerializer; private int maxReaderSize; private Set unfinishedReaders; @@ -102,6 +104,7 @@ public void init() throws Exception { new SeaTunnelSplitEnumeratorContext<>( this.source.getParallelism(), this, getMetricsContext()); enumeratorStateSerializer = this.source.getSource().getEnumeratorStateSerializer(); + splitSerializer = this.source.getSource().getSplitSerializer(); taskMemberMapping = new ConcurrentHashMap<>(); taskIDToTaskLocationMapping = new ConcurrentHashMap<>(); taskIndexToTaskLocationMapping = new ConcurrentHashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java index c3cce03d3bd3..110562e49440 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; @@ -31,6 +30,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @Slf4j public class SeaTunnelSplitEnumeratorContext @@ -67,22 +69,26 @@ public void assignSplit(int subtaskIndex, List splits) { log.warn("No reader is obtained, skip this assign!"); return; } + + List splitBytes = + splits.stream() + .map(split -> sneaky(() -> task.getSplitSerializer().serialize(split))) + .collect(Collectors.toList()); task.getExecutionContext() .sendToMember( new AssignSplitOperation<>( - task.getTaskMemberLocationByIndex(subtaskIndex), - SerializationUtils.serialize(splits.toArray())), + task.getTaskMemberLocationByIndex(subtaskIndex), splitBytes), task.getTaskMemberAddressByIndex(subtaskIndex)) .join(); } @Override public void signalNoMoreSplits(int subtaskIndex) { + List emptySplits = Collections.emptyList(); task.getExecutionContext() .sendToMember( new AssignSplitOperation<>( - task.getTaskMemberLocationByIndex(subtaskIndex), - SerializationUtils.serialize(Collections.emptyList().toArray())), + task.getTaskMemberLocationByIndex(subtaskIndex), emptySplits), task.getTaskMemberAddressByIndex(subtaskIndex)) .join(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 7e6d73c54986..c51e3483c09a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; @@ -48,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -66,6 +66,7 @@ public class SinkFlowLifeCycle sinkAction; private SinkWriter writer; + private transient Optional> commitInfoSerializer; private transient Optional> writerStateSerializer; private final int indexID; @@ -110,6 +111,7 @@ public SinkFlowLifeCycle( @Override public void init() throws Exception { + this.commitInfoSerializer = sinkAction.getSink().getCommitInfoSerializer(); this.writerStateSerializer = sinkAction.getSink().getWriterStateSerializer(); this.committer = sinkAction.getSink().createCommitter(); this.lastCommitInfo = Optional.empty(); @@ -184,10 +186,14 @@ public void received(Record record) { runningTask .getExecutionContext() .sendToMember( - new SinkPrepareCommitOperation( + new SinkPrepareCommitOperation( barrier, committerTaskLocation, - SerializationUtils.serialize(commitInfoT)), + commitInfoSerializer.isPresent() + ? commitInfoSerializer + .get() + .serialize(commitInfoT) + : null), committerTaskAddress) .join(); } @@ -247,9 +253,9 @@ public void restoreState(List actionStateList) throws Except if (writerStateSerializer.isPresent()) { states = actionStateList.stream() - .filter(state -> writerStateSerializer.isPresent()) .map(ActionSubtaskState::getState) .flatMap(Collection::stream) + .filter(Objects::nonNull) .map( bytes -> sneaky( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index b883bd8ffdd9..572836fe5177 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.Record; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; @@ -59,7 +58,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; import static org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates; @Slf4j @@ -338,21 +336,17 @@ public void restoreState(List actionStateList) throws Except if (actionStateList.isEmpty()) { return; } - List splits = + List splits = actionStateList.stream() .map(ActionSubtaskState::getState) .flatMap(Collection::stream) .filter(Objects::nonNull) - .map(bytes -> sneaky(() -> splitSerializer.deserialize(bytes))) .collect(Collectors.toList()); try { runningTask .getExecutionContext() .sendToMember( - new RestoredSplitOperation( - enumeratorTaskLocation, - SerializationUtils.serialize(splits.toArray()), - indexID), + new RestoredSplitOperation(enumeratorTaskLocation, splits, indexID), enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java index 06945a61b254..5ed6f81a7aa2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.engine.server.task.operation.sink; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.TaskExecutionService; import org.apache.seatunnel.engine.server.execution.TaskLocation; @@ -33,7 +32,7 @@ import java.io.IOException; @NoArgsConstructor -public class SinkPrepareCommitOperation extends BarrierFlowOperation { +public class SinkPrepareCommitOperation extends BarrierFlowOperation { private byte[] commitInfos; public SinkPrepareCommitOperation( @@ -73,15 +72,24 @@ public int getClassId() { public void run() throws Exception { TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService(); - SinkAggregatedCommitterTask committerTask = + SinkAggregatedCommitterTask committerTask = taskExecutionService.getTask(taskLocation); - ClassLoader classLoader = + ClassLoader taskClassLoader = taskExecutionService .getExecutionContext(taskLocation.getTaskGroupLocation()) .getClassLoader(); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + if (commitInfos != null) { - committerTask.receivedWriterCommitInfo( - barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader)); + CommitInfoT deserializeCommitInfo = null; + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + deserializeCommitInfo = + committerTask.getCommitInfoSerializer().deserialize(commitInfos); + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } + committerTask.receivedWriterCommitInfo(barrier.getId(), deserializeCommitInfo); } committerTask.triggerBarrier(barrier); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java index 637a48e8ab43..b21111e18fd4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.common.utils.RetryUtils; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; @@ -33,18 +32,18 @@ import com.hazelcast.spi.impl.operationservice.Operation; import java.io.IOException; -import java.util.Arrays; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.List; public class AssignSplitOperation extends Operation implements IdentifiedDataSerializable { - private byte[] splits; + private List splits; private TaskLocation taskID; public AssignSplitOperation() {} - public AssignSplitOperation(TaskLocation taskID, byte[] splits) { + public AssignSplitOperation(TaskLocation taskID, List splits) { this.taskID = taskID; this.splits = splits; } @@ -56,13 +55,22 @@ public void run() throws Exception { () -> { SourceSeaTunnelTask task = server.getTaskExecutionService().getTask(taskID); - ClassLoader classLoader = + ClassLoader taskClassLoader = server.getTaskExecutionService() .getExecutionContext(taskID.getTaskGroupLocation()) .getClassLoader(); - Object[] o = SerializationUtils.deserialize(splits, classLoader); - task.receivedSourceSplit( - Arrays.stream(o).map(i -> (SplitT) i).collect(Collectors.toList())); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + List deserializeSplits = new ArrayList<>(); + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + for (byte[] split : this.splits) { + deserializeSplits.add(task.getSplitSerializer().deserialize(split)); + } + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } + + task.receivedSourceSplit(deserializeSplits); return null; }, new RetryUtils.RetryMaterial( @@ -76,13 +84,20 @@ public void run() throws Exception { @Override protected void writeInternal(ObjectDataOutput out) throws IOException { - out.writeByteArray(splits); + out.writeInt(splits.size()); + for (byte[] split : splits) { + out.writeByteArray(split); + } out.writeObject(taskID); } @Override protected void readInternal(ObjectDataInput in) throws IOException { - splits = in.readByteArray(); + int splitCount = in.readInt(); + splits = new ArrayList<>(splitCount); + for (int i = 0; i < splitCount; i++) { + splits.add(in.readByteArray()); + } taskID = in.readObject(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java index 0c9c3d95c902..05fbf6537e00 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.common.utils.RetryUtils; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.TaskExecutionService; @@ -34,19 +33,18 @@ import com.hazelcast.nio.ObjectDataOutput; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class RestoredSplitOperation extends TaskOperation { - private byte[] splits; + private List splits; private Integer subtaskIndex; public RestoredSplitOperation() {} public RestoredSplitOperation( - TaskLocation enumeratorLocation, byte[] splits, int subtaskIndex) { + TaskLocation enumeratorLocation, List splits, int subtaskIndex) { super(enumeratorLocation); this.splits = splits; this.subtaskIndex = subtaskIndex; @@ -55,14 +53,21 @@ public RestoredSplitOperation( @Override protected void writeInternal(ObjectDataOutput out) throws IOException { super.writeInternal(out); - out.writeByteArray(splits); + out.writeInt(splits.size()); + for (byte[] split : splits) { + out.writeByteArray(split); + } out.writeInt(subtaskIndex); } @Override protected void readInternal(ObjectDataInput in) throws IOException { super.readInternal(in); - splits = in.readByteArray(); + int splitCount = in.readInt(); + splits = new ArrayList<>(splitCount); + for (int i = 0; i < splitCount; i++) { + splits.add(in.readByteArray()); + } subtaskIndex = in.readInt(); } @@ -82,27 +87,31 @@ public void run() throws Exception { TaskExecutionService taskExecutionService = server.getTaskExecutionService(); RetryUtils.retryWithException( () -> { - ClassLoader classLoader = + SourceSplitEnumeratorTask task = + taskExecutionService.getTask(taskLocation); + ClassLoader taskClassLoader = taskExecutionService .getExecutionContext(taskLocation.getTaskGroupLocation()) .getClassLoader(); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + + List deserializeSplits = new ArrayList<>(); + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + for (byte[] split : splits) { + deserializeSplits.add(task.getSplitSerializer().deserialize(split)); + } + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } - List deserialize = - Arrays.stream( - (Object[]) - SerializationUtils.deserialize( - splits, classLoader)) - .map(o -> (SourceSplit) o) - .collect(Collectors.toList()); - SourceSplitEnumeratorTask task = - taskExecutionService.getTask(taskLocation); task.getExecutionContext() .getTaskExecutionService() .asyncExecuteFunction( taskLocation.getTaskGroupLocation(), () -> { try { - task.addSplitsBack(deserialize, subtaskIndex); + task.addSplitsBack(deserializeSplits, subtaskIndex); } catch (Exception e) { task.getExecutionContext() .sendToMaster( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml index 8f22b0613cad..f8739cc48301 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml @@ -25,8 +25,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java index 8d41ae848d86..953da3027bd0 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java @@ -49,6 +49,8 @@ public class HdfsConfiguration extends AbstractConfiguration { private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; + private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop."; + @Override public Configuration buildConfiguration(Map config) throws CheckpointStorageException { @@ -69,7 +71,15 @@ public Configuration buildConfiguration(Map config) authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf); } } - // todo support other hdfs optional config keys + // support other hdfs optional config keys + config.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX)) + .forEach( + entry -> { + String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, ""); + String value = entry.getValue(); + hadoopConf.set(key, value); + }); return hadoopConf; } diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java new file mode 100644 index 000000000000..23a41a2782ba --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.checkpoint.storage.hdfs; + +import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; + +import java.util.HashMap; +import java.util.Map; + +@Disabled( + "HDFS is not available in CI, if you want to run this test, please set up your own HDFS environment") +public class HDFSFileCheckpointTest extends AbstractFileCheckPointTest { + + @BeforeAll + public static void setup() throws CheckpointStorageException { + Map config = new HashMap<>(); + config.put("storage.type", "hdfs"); + config.put("fs.defaultFS", "hdfs://usdp-bing"); + config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing"); + config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2"); + config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1", "usdp-bing-nn1:8020"); + config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2", "usdp-bing-nn2:8020"); + config.put( + "seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + STORAGE = new HdfsStorage(config); + initStorageData(); + } +} diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml index 983a8629ce89..7fc09b356a03 100644 --- a/seatunnel-formats/pom.xml +++ b/seatunnel-formats/pom.xml @@ -30,6 +30,7 @@ seatunnel-format-json seatunnel-format-text seatunnel-format-compatible-debezium-json + seatunnel-format-compatible-connect-json diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml new file mode 100644 index 000000000000..d3d554574281 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-formats + ${revision} + + + seatunnel-format-compatible-connect-json + SeaTunnel : Formats : Compatible Kafka Connect Json + + 1.6.4.Final + + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + provided + + + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + provided + + + + org.apache.kafka + kafka-clients + 3.2.0 + provided + + + + org.apache.kafka + connect-json + 3.2.0 + provided + + + + + diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java new file mode 100644 index 000000000000..b2e6ac97e977 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.compatible.kafka.connect.json; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.ReflectionUtils; +import org.apache.seatunnel.format.json.JsonToRowConverters; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** Compatible kafka connect deserialization schema */ +@RequiredArgsConstructor +public class CompatibleKafkaConnectDeserializationSchema + implements DeserializationSchema { + + private static final String INCLUDE_SCHEMA_METHOD = "convertToJsonWithEnvelope"; + private static final String EXCLUDE_SCHEMA_METHOD = "convertToJsonWithoutEnvelope"; + private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload"; + private transient JsonConverter keyConverter; + private transient JsonConverter valueConverter; + private transient Method keyConverterMethod; + private transient Method valueConverterMethod; + private final SeaTunnelRowType seaTunnelRowType; + private final JsonToRowConverters.JsonToRowConverter runtimeConverter; + private final boolean keySchemaEnable; + private final boolean valueSchemaEnable; + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + public CompatibleKafkaConnectDeserializationSchema( + @NonNull SeaTunnelRowType seaTunnelRowType, + @NonNull Config config, + boolean failOnMissingField, + boolean ignoreParseErrors) { + + Map configMap = ReadonlyConfig.fromConfig(config).toMap(); + this.seaTunnelRowType = seaTunnelRowType; + this.keySchemaEnable = + KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap); + this.valueSchemaEnable = + KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap); + + // Runtime converter + this.runtimeConverter = + new JsonToRowConverters(failOnMissingField, ignoreParseErrors) + .createConverter(checkNotNull(seaTunnelRowType)); + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + throw new UnsupportedEncodingException(); + } + + /** + * Deserialize kafka consumer record + * + * @param msg + * @param out + * @throws Exception + */ + public void deserialize(ConsumerRecord msg, Collector out) + throws InvocationTargetException, IllegalAccessException { + tryInitConverter(); + SinkRecord record = convertToSinkRecord(msg); + RowKind rowKind = RowKind.INSERT; + JsonNode jsonNode = + (JsonNode) + valueConverterMethod.invoke( + valueConverter, record.valueSchema(), record.value()); + JsonNode payload = jsonNode.get(KAFKA_CONNECT_SINK_RECORD_PAYLOAD); + if (payload.isArray()) { + ArrayNode arrayNode = (ArrayNode) payload; + for (int i = 0; i < arrayNode.size(); i++) { + SeaTunnelRow row = convertJsonNode(arrayNode.get(i)); + row.setRowKind(rowKind); + out.collect(row); + } + } else { + SeaTunnelRow row = convertJsonNode(payload); + row.setRowKind(rowKind); + out.collect(row); + } + } + + private SeaTunnelRow convertJsonNode(JsonNode jsonNode) { + if (jsonNode.isNull()) { + return null; + } + try { + org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode jsonData = + objectMapper.readTree(jsonNode.toString()); + return (SeaTunnelRow) runtimeConverter.convert(jsonData); + } catch (Throwable t) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.JSON_OPERATION_FAILED, + String.format("Failed to deserialize JSON '%s'.", jsonNode), + t); + } + } + + private SinkRecord convertToSinkRecord(ConsumerRecord msg) { + SchemaAndValue keyAndSchema = + (msg.key() == null) + ? SchemaAndValue.NULL + : keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key()); + SchemaAndValue valueAndSchema = + valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value()); + return new SinkRecord( + msg.topic(), + msg.partition(), + keyAndSchema.schema(), + keyAndSchema.value(), + valueAndSchema.schema(), + valueAndSchema.value(), + msg.offset(), + msg.timestamp(), + msg.timestampType(), + null); + } + + @Override + public SeaTunnelDataType getProducedType() { + return seaTunnelRowType; + } + + private void tryInitConverter() { + if (keyConverter == null) { + synchronized (this) { + if (keyConverter == null) { + keyConverter = new JsonConverter(); + keyConverter.configure( + Collections.singletonMap( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, keySchemaEnable), + true); + keyConverterMethod = + ReflectionUtils.getDeclaredMethod( + JsonConverter.class, + keySchemaEnable + ? INCLUDE_SCHEMA_METHOD + : EXCLUDE_SCHEMA_METHOD, + Schema.class, + Object.class) + .get(); + } + } + } + if (valueConverter == null) { + synchronized (this) { + if (valueConverter == null) { + valueConverter = new JsonConverter(); + valueConverter.configure( + Collections.singletonMap( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, valueSchemaEnable), + false); + valueConverterMethod = + ReflectionUtils.getDeclaredMethod( + JsonConverter.class, + valueSchemaEnable + ? INCLUDE_SCHEMA_METHOD + : EXCLUDE_SCHEMA_METHOD, + Schema.class, + Object.class) + .get(); + } + } + } + } +} diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java new file mode 100644 index 000000000000..05e16e0abb79 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.compatible.kafka.connect.json; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.Map; + +public class KafkaConnectJsonFormatOptions { + + public static final Option KEY_CONVERTER_SCHEMA_ENABLED = + Options.key("key_converter_schema_enabled") + .booleanType() + .defaultValue(true) + .withDescription("kafka connect key converter schema enabled."); + + public static final Option VALUE_CONVERTER_SCHEMA_ENABLED = + Options.key("value_converter_schema_enabled") + .booleanType() + .defaultValue(true) + .withDescription("kafka connect value converter schema enabled."); + + public static boolean getKeyConverterSchemaEnabled(Map options) { + return Boolean.parseBoolean( + options.getOrDefault(KEY_CONVERTER_SCHEMA_ENABLED.key(), "true")); + } + + public static boolean getValueConverterSchemaEnabled(Map options) { + return Boolean.parseBoolean( + options.getOrDefault(VALUE_CONVERTER_SCHEMA_ENABLED.key(), "true")); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java index b1e734c31ef4..6dfaddca00a9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java @@ -23,7 +23,11 @@ import java.util.List; public interface SQLEngine { - void init(String inputTableName, SeaTunnelRowType inputRowType, String sql); + void init( + String inputTableName, + String catalogTableName, + SeaTunnelRowType inputRowType, + String sql); SeaTunnelRowType typeMapping(List inputColumnsMapping); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index 20a07dcee02e..9b21c4b6f5c4 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -115,7 +115,11 @@ protected void setConfig(Config pluginConfig) { @Override public void open() { sqlEngine = SQLEngineFactory.getSQLEngine(engineType); - sqlEngine.init(inputTableName, inputRowType, query); + sqlEngine.init( + inputTableName, + inputCatalogTable != null ? inputCatalogTable.getTableId().getTableName() : null, + inputRowType, + query); } private void tryOpen() { diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 55fbe04cf13c..2f01fe3af98d 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -37,6 +37,8 @@ import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,6 +47,7 @@ public class ZetaSQLEngine implements SQLEngine { private String inputTableName; + @Nullable private String catalogTableName; private SeaTunnelRowType inputRowType; private String sql; @@ -59,8 +62,13 @@ public class ZetaSQLEngine implements SQLEngine { public ZetaSQLEngine() {} @Override - public void init(String inputTableName, SeaTunnelRowType inputRowType, String sql) { + public void init( + String inputTableName, + String catalogTableName, + SeaTunnelRowType inputRowType, + String sql) { this.inputTableName = inputTableName; + this.catalogTableName = catalogTableName; this.inputRowType = inputRowType; this.sql = sql; @@ -109,7 +117,8 @@ private void validateSQL(Statement statement) { throw new IllegalArgumentException("Unsupported table alias name syntax"); } String tableName = table.getName(); - if (!inputTableName.equalsIgnoreCase(tableName)) { + if (!inputTableName.equalsIgnoreCase(tableName) + && !tableName.equalsIgnoreCase(catalogTableName)) { throw new IllegalArgumentException( String.format("Table name: %s not found", tableName)); } diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java new file mode 100644 index 000000000000..94e1060af859 --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.sql.zeta; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.transform.exception.TransformException; +import org.apache.seatunnel.transform.sql.SQLEngine; +import org.apache.seatunnel.transform.sql.SQLEngineFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ZetaSQLEngineTest { + + @Test + public void testCatalogNameAndSourceTableNameBothSupport() { + + SQLEngine sqlEngine = SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"id", "name", "age"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE + }); + sqlEngine.init("test", null, rowType, "select * from test"); + sqlEngine.init("test", "nameFromCatalog", rowType, "select * from test"); + sqlEngine.init("test", "nameFromCatalog", rowType, "select * from nameFromCatalog"); + + Assertions.assertThrows( + TransformException.class, + () -> sqlEngine.init("test", "nameFromCatalog", rowType, "select * from unknown")); + Assertions.assertThrows( + TransformException.class, + () -> sqlEngine.init("test", null, rowType, "select * from unknown")); + } +}