From d31e9478f760f947df137558264dfa7520681834 Mon Sep 17 00:00:00 2001 From: Carl-Zhou-CN <67902676+Carl-Zhou-CN@users.noreply.github.com> Date: Wed, 18 Oct 2023 11:09:10 +0800 Subject: [PATCH] [Feature][Jdbc] Supporting more ways to configure connection parameters. (#5388) --- docs/en/connector-v2/sink/DB2.md | 45 ++-- docs/en/connector-v2/sink/Jdbc.md | 5 + docs/en/connector-v2/sink/Mysql.md | 45 ++-- docs/en/connector-v2/sink/OceanBase.md | 41 ++-- docs/en/connector-v2/sink/Oracle.md | 45 ++-- docs/en/connector-v2/sink/PostgreSql.md | 45 ++-- docs/en/connector-v2/sink/Snowflake.md | 163 +++++++------ docs/en/connector-v2/sink/Vertica.md | 43 ++-- docs/en/connector-v2/source/DB2.md | 1 + docs/en/connector-v2/source/Jdbc.md | 5 + docs/en/connector-v2/source/Mysql.md | 7 + docs/en/connector-v2/source/OceanBase.md | 1 + docs/en/connector-v2/source/Oracle.md | 4 + docs/en/connector-v2/source/PostgreSQL.md | 1 + docs/en/connector-v2/source/Snowflake.md | 1 + docs/en/connector-v2/source/Vertica.md | 1 + .../jdbc/config/JdbcConnectionConfig.java | 17 ++ .../seatunnel/jdbc/config/JdbcOptions.java | 7 + .../internal/connection/DataSourceUtils.java | 2 +- .../SimpleJdbcConnectionProvider.java | 1 + .../jdbc/internal/dialect/JdbcDialect.java | 16 ++ .../internal/dialect/mysql/MysqlDialect.java | 9 + .../seatunnel/jdbc/sink/JdbcSink.java | 4 + .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 4 + .../seatunnel/jdbc/source/JdbcSource.java | 10 +- .../jdbc/source/JdbcSourceFactory.java | 8 +- .../command/SparkTaskExecuteCommand.java | 3 +- .../seatunnel/jdbc/JdbcMysqlIT.java | 222 ++++++++++++++++++ .../resources/jdbc_mysql_source_and_sink.conf | 12 +- .../jdbc_mysql_source_and_sink_xa.conf | 8 + .../resources/jdbc_oracle_source_to_sink.conf | 6 + 31 files changed, 562 insertions(+), 220 deletions(-) diff --git a/docs/en/connector-v2/sink/DB2.md b/docs/en/connector-v2/sink/DB2.md index fc0aaca0943..e91912790a9 100644 --- a/docs/en/connector-v2/sink/DB2.md +++ b/docs/en/connector-v2/sink/DB2.md @@ -35,7 +35,7 @@ semantics (using XA transaction guarantee). ## Data Type Mapping | DB2 Data type | SeaTunnel Data type | -|------------------------------------------------------------------------------------------------------|---------------------|---| +|------------------------------------------------------------------------------------------------------|---------------------| | BOOLEAN | BOOLEAN | | SMALLINT | SHORT | | INT
INTEGER
| INTEGER | @@ -52,27 +52,28 @@ semantics (using XA transaction guarantee). ## Sink Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:db2://127.0.0.1:50000/dbname | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use DB2 the value is `com.ibm.db2.jdbc.app.DB2Driver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | -| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, DB2 is `com.db2.cj.jdbc.Db2XADataSource`, and
please refer to appendix for other data sources | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:db2://127.0.0.1:50000/dbname | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use DB2 the value is `com.ibm.db2.jdbc.app.DB2Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, DB2 is `com.db2.cj.jdbc.Db2XADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 394fadde801..3b1ae9c0eb8 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -48,6 +48,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | transaction_timeout_sec | Int | No | -1 | | auto_commit | Boolean | No | true | | field_ide | String | No | - | +| properties | Map | No | - | | common-options | | no | - | ### driver [string] @@ -143,6 +144,10 @@ The field "field_ide" is used to identify whether the field needs to be converte synchronizing from the source to the sink. "ORIGINAL" indicates no conversion is needed, "UPPERCASE" indicates conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase. +### properties + +Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/sink/Mysql.md b/docs/en/connector-v2/sink/Mysql.md index 860f071df0e..108b657fdd4 100644 --- a/docs/en/connector-v2/sink/Mysql.md +++ b/docs/en/connector-v2/sink/Mysql.md @@ -58,28 +58,29 @@ semantics (using XA transaction guarantee). ## Sink Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:mysql://localhost:3306:3306/test | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | -| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and
please refer to appendix for other data sources | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:mysql://localhost:3306:3306/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/sink/OceanBase.md b/docs/en/connector-v2/sink/OceanBase.md index 3cea0b5e6e6..d5888dcc3b7 100644 --- a/docs/en/connector-v2/sink/OceanBase.md +++ b/docs/en/connector-v2/sink/OceanBase.md @@ -67,26 +67,27 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre ## Sink Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/sink/Oracle.md b/docs/en/connector-v2/sink/Oracle.md index 151243f318f..753891653ac 100644 --- a/docs/en/connector-v2/sink/Oracle.md +++ b/docs/en/connector-v2/sink/Oracle.md @@ -52,28 +52,29 @@ semantics (using XA transaction guarantee). ## Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oracle:thin:@datasource01:1523:xe | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Oracle the value is `oracle.jdbc.OracleDriver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | -| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. | -| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, Oracle is `oracle.jdbc.xa.client.OracleXADataSource`, and
please refer to appendix for other data sources | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oracle:thin:@datasource01:1523:xe | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Oracle the value is `oracle.jdbc.OracleDriver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | +| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, Oracle is `oracle.jdbc.xa.client.OracleXADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md index bcc5616f5ea..79f595da329 100644 --- a/docs/en/connector-v2/sink/PostgreSql.md +++ b/docs/en/connector-v2/sink/PostgreSql.md @@ -61,28 +61,29 @@ semantics (using XA transaction guarantee). ## Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/test
if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use PostgreSQL the value is `org.postgresql.Driver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. | -| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and
please refer to appendix for other data sources | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/test
if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use PostgreSQL the value is `org.postgresql.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/sink/Snowflake.md b/docs/en/connector-v2/sink/Snowflake.md index 1dfff5e09c7..2ae6b0fac78 100644 --- a/docs/en/connector-v2/sink/Snowflake.md +++ b/docs/en/connector-v2/sink/Snowflake.md @@ -1,14 +1,14 @@ # Snowflake > JDBC Snowflake Sink Connector -> -> ## Support those engines -> + +## Support Those Engines + > Spark
> Flink
> SeaTunnel Zeta
-> - ## Key features + +## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) @@ -27,8 +27,8 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre > Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
> For example Snowflake datasource: cp snowflake-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ -> - ## Data Type Mapping + +## Data Type Mapping | Snowflake Data type | SeaTunnel Data type | |-----------------------------------------------------------------------------|---------------------| @@ -48,24 +48,25 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre ## Options -| name | type | required | default value | description | -|-------------------------------------------|---------|----------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:snowflake://.snowflakecomputing.com | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Snowflake the value is `net.snowflake.client.jdbc.SnowflakeDriver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| name | type | required | default value | description | +|-------------------------------------------|---------|----------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:snowflake://.snowflakecomputing.com | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Snowflake the value is `net.snowflake.client.jdbc.SnowflakeDriver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ## tips @@ -76,68 +77,66 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre ### simple: > This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your snowflake database. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. -> -> ``` -> # Defining the runtime environment -> env { -> # You can set flink configuration here -> execution.parallelism = 1 -> job.mode = "BATCH" -> } -> source { -> # This is a example source plugin **only for test and demonstrate the feature source plugin** -> FakeSource { -> parallelism = 1 -> result_table_name = "fake" -> row.num = 16 -> schema = { -> fields { -> name = "string" -> age = "int" -> } -> } -> } -> # If you would like to get more information about how to configure seatunnel and see full list of source plugins, -> # please go to https://seatunnel.apache.org/docs/category/source-v2 -> } -> transform { -> # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, -> # please go to https://seatunnel.apache.org/docs/category/transform-v2 -> } -> sink { -> jdbc { -> url = "jdbc:snowflake://.snowflakecomputing.com" -> driver = "net.snowflake.client.jdbc.SnowflakeDriver" -> user = "root" -> password = "123456" -> query = "insert into test_table(name,age) values(?,?)" -> } -> # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, -> # please go to https://seatunnel.apache.org/docs/category/sink-v2 -> } -> ``` + +``` +# Defining the runtime environment +env { +# You can set flink configuration here +execution.parallelism = 1 +job.mode = "BATCH" +} +source { +# This is a example source plugin **only for test and demonstrate the feature source plugin** +FakeSource { +parallelism = 1 +result_table_name = "fake" +row.num = 16 +schema = { +fields { +name = "string" +age = "int" +} +} +} +# If you would like to get more information about how to configure seatunnel and see full list of source plugins, +# please go to https://seatunnel.apache.org/docs/category/source-v2 +} +transform { +# If you would like to get more information about how to configure seatunnel and see full list of transform plugins, +# please go to https://seatunnel.apache.org/docs/category/transform-v2 +} +sink { +jdbc { +url = "jdbc:snowflake://.snowflakecomputing.com" +driver = "net.snowflake.client.jdbc.SnowflakeDriver" +user = "root" +password = "123456" +query = "insert into test_table(name,age) values(?,?)" +} +# If you would like to get more information about how to configure seatunnel and see full list of sink plugins, +# please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` ### CDC(Change data capture) event > CDC change data is also supported by us In this case, you need config database, table and primary_keys. -> -> ``` -> jdbc { -> url = "jdbc:snowflake://.snowflakecomputing.com" -> driver = "net.snowflake.client.jdbc.SnowflakeDriver" -> user = "root" -> password = "123456" -> -> ``` - - # You need to configure both database and table - database = test - table = sink_table - primary_keys = ["id","name"] - -} ``` - +sink { + jdbc { + url = "jdbc:snowflake://.snowflakecomputing.com" + driver = "net.snowflake.client.jdbc.SnowflakeDriver" + user = "root" + password = "123456" + generate_sink_sql = true + + + # You need to configure both database and table + database = test + table = sink_table + primary_keys = ["id","name"] + } +} ``` diff --git a/docs/en/connector-v2/sink/Vertica.md b/docs/en/connector-v2/sink/Vertica.md index 9a624407682..c05ac8f6ee5 100644 --- a/docs/en/connector-v2/sink/Vertica.md +++ b/docs/en/connector-v2/sink/Vertica.md @@ -54,27 +54,28 @@ semantics (using XA transaction guarantee). ## Sink Options -| Name | Type | Required | Default | Description | -|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:vertica://localhost:5433/vertica | -| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Vertical the value is `com.vertica.jdbc.Driver`. | -| user | String | No | - | Connection instance user name | -| password | String | No | - | Connection instance password | -| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | -| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | -| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | -| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | -| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | -| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | -| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | -| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | -| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, vertical is `com.vertical.cj.jdbc.VerticalXADataSource`, and
please refer to appendix for other data sources | -| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | -| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | -| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | -| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:vertica://localhost:5433/vertica | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use Vertical the value is `com.vertica.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, vertical is `com.vertical.cj.jdbc.VerticalXADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/source/DB2.md b/docs/en/connector-v2/source/DB2.md index c9eb6a578b6..c2660535760 100644 --- a/docs/en/connector-v2/source/DB2.md +++ b/docs/en/connector-v2/source/DB2.md @@ -67,6 +67,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index b86a7b33854..442684351d3 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -42,6 +42,7 @@ supports query SQL and can achieve projection effect. | partition_lower_bound | Long | No | - | | partition_num | Int | No | job parallelism | | fetch_size | Int | No | 0 | +| properties | Map | No | - | | common-options | | No | - | ### driver [string] @@ -93,6 +94,10 @@ The number of partition count, only support positive integer. default value is j For queries that return a large number of objects, you can configure the row fetch size used in the query to improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value. +### properties + +Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. + ### common options Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. diff --git a/docs/en/connector-v2/source/Mysql.md b/docs/en/connector-v2/source/Mysql.md index bdac5c0aec6..fc2295e7890 100644 --- a/docs/en/connector-v2/source/Mysql.md +++ b/docs/en/connector-v2/source/Mysql.md @@ -73,6 +73,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips @@ -135,6 +136,9 @@ source { partition_column = "id" # Number of fragments partition_num = 10 + properties { + useSSL=false + } } } sink { @@ -162,6 +166,9 @@ source { # Read end boundary partition_upper_bound = 500 partition_num = 10 + properties { + useSSL=false + } } } ``` diff --git a/docs/en/connector-v2/source/OceanBase.md b/docs/en/connector-v2/source/OceanBase.md index 434e25284dd..5dbb633c109 100644 --- a/docs/en/connector-v2/source/OceanBase.md +++ b/docs/en/connector-v2/source/OceanBase.md @@ -85,6 +85,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. Default value is job parallelism. | | fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure
the row fetch size used in the query to improve performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/source/Oracle.md b/docs/en/connector-v2/source/Oracle.md index f191cda9d99..aefd8adfbba 100644 --- a/docs/en/connector-v2/source/Oracle.md +++ b/docs/en/connector-v2/source/Oracle.md @@ -67,6 +67,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips @@ -128,6 +129,9 @@ source { partition_column = "ID" # Number of fragments partition_num = 10 + properties { + database.oracle.jdbc.timezoneAsRegion = "false" + } } } sink { diff --git a/docs/en/connector-v2/source/PostgreSQL.md b/docs/en/connector-v2/source/PostgreSQL.md index 63ddbc25ecf..cb7de77e6c0 100644 --- a/docs/en/connector-v2/source/PostgreSQL.md +++ b/docs/en/connector-v2/source/PostgreSQL.md @@ -76,6 +76,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips diff --git a/docs/en/connector-v2/source/Snowflake.md b/docs/en/connector-v2/source/Snowflake.md index a7835013d58..2ea4c764352 100644 --- a/docs/en/connector-v2/source/Snowflake.md +++ b/docs/en/connector-v2/source/Snowflake.md @@ -69,6 +69,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## tips diff --git a/docs/en/connector-v2/source/Vertica.md b/docs/en/connector-v2/source/Vertica.md index df387ac30bf..b4c5d4e05b4 100644 --- a/docs/en/connector-v2/source/Vertica.md +++ b/docs/en/connector-v2/source/Vertica.md @@ -69,6 +69,7 @@ Read external data source data through JDBC. | partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | | partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | | fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index 555963af2cf..eeeff227b2f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; public class JdbcConnectionConfig implements Serializable { @@ -45,6 +47,8 @@ public class JdbcConnectionConfig implements Serializable { public int transactionTimeoutSec = JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue(); + private Map properties; + public static JdbcConnectionConfig of(ReadonlyConfig config) { JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder(); builder.url(config.get(JdbcOptions.URL)); @@ -63,6 +67,7 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) { config.getOptional(JdbcOptions.USER).ifPresent(builder::username); config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password); + config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties); return builder.build(); } @@ -114,6 +119,10 @@ public Optional getTransactionTimeoutSec() { return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec); } + public Map getProperties() { + return properties; + } + public static JdbcConnectionConfig.Builder builder() { return new JdbcConnectionConfig.Builder(); } @@ -133,6 +142,7 @@ public static final class Builder { private String xaDataSourceClassName; private int maxCommitAttempts = JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue(); private int transactionTimeoutSec = JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue(); + private Map properties; private Builder() {} @@ -201,6 +211,11 @@ public Builder transactionTimeoutSec(int transactionTimeoutSec) { return this; } + public Builder properties(Map properties) { + this.properties = properties; + return this; + } + public JdbcConnectionConfig build() { JdbcConnectionConfig jdbcConnectionConfig = new JdbcConnectionConfig(); jdbcConnectionConfig.batchSize = this.batchSize; @@ -215,6 +230,8 @@ public JdbcConnectionConfig build() { jdbcConnectionConfig.transactionTimeoutSec = this.transactionTimeoutSec; jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts; jdbcConnectionConfig.xaDataSourceClassName = this.xaDataSourceClassName; + jdbcConnectionConfig.properties = + this.properties == null ? new HashMap<>() : this.properties; return jdbcConnectionConfig; } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index b01fc872f31..91ba5e2030f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -23,6 +23,7 @@ import java.math.BigDecimal; import java.util.List; +import java.util.Map; public interface JdbcOptions { @@ -161,4 +162,10 @@ public interface JdbcOptions { .enumType(FieldIdeEnum.class) .noDefaultValue() .withDescription("Whether case conversion is required"); + + Option> PROPERTIES = + Options.key("properties") + .mapType() + .noDefaultValue() + .withDescription("additional connection configuration parameters"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java index 50f7d55cfaf..92b07537d99 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java @@ -59,7 +59,7 @@ private static Map buildDatabaseAccessConfig( if (jdbcConnectionConfig.getPassword().isPresent()) { accessConfig.put("password", jdbcConnectionConfig.getPassword().get()); } - + accessConfig.putAll(jdbcConnectionConfig.getProperties()); return accessConfig; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java index 815d51a3f08..dfc3c161a80 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java @@ -108,6 +108,7 @@ public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundE if (jdbcConfig.getPassword().isPresent()) { info.setProperty("password", jdbcConfig.getPassword().get()); } + info.putAll(jdbcConfig.getProperties()); connection = driver.connect(jdbcConfig.getUrl(), info); if (connection == null) { // Throw same exception as DriverManager.getConnection when no driver found to match diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index e0cf5252a60..f76cd0e5641 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -31,6 +31,8 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -240,4 +242,18 @@ default String getFieldIde(String identifier, String fieldIde) { return identifier; } } + + default Map defaultParameter() { + return new HashMap<>(); + } + + default void connectionUrlParse( + String url, Map info, Map defaultParameter) { + defaultParameter.forEach( + (key, value) -> { + if (!url.contains(key) && !info.containsKey(key)) { + info.put(key, value); + } + }); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 1ae69a6131f..fb3dd79003b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -28,6 +28,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -98,4 +100,11 @@ public PreparedStatement creatPreparedStatement( public String extractTableName(TablePath tablePath) { return tablePath.getTableName(); } + + @Override + public Map defaultParameter() { + HashMap map = new HashMap<>(); + map.put("rewriteBatchedStatements", "true"); + return map; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index bbb776e486a..e2eb971105e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -114,6 +114,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { config.get(JdbcOptions.FIELD_IDE) == null ? null : config.get(JdbcOptions.FIELD_IDE).getValue()); + this.dialect.connectionUrlParse( + jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), + jdbcSinkConfig.getJdbcConnectionConfig().getProperties(), + this.dialect.defaultParameter()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 086a199bfeb..6c633705924 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -151,6 +151,10 @@ public TableSink createSink(TableSinkFactoryContext context) { sinkConfig.getJdbcConnectionConfig().getUrl(), sinkConfig.getJdbcConnectionConfig().getCompatibleMode(), fieldIdeEnum == null ? null : fieldIdeEnum.getValue()); + dialect.connectionUrlParse( + sinkConfig.getJdbcConnectionConfig().getUrl(), + sinkConfig.getJdbcConnectionConfig().getProperties(), + dialect.defaultParameter()); CatalogTable finalCatalogTable = catalogTable; return () -> new JdbcSink( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 1bf1b332fa9..ba11937f937 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -97,13 +97,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException { ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig); ConfigValidator.of(config).validate(new JdbcSourceFactory().optionRule()); this.jdbcSourceConfig = JdbcSourceConfig.of(config); - this.jdbcConnectionProvider = - new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); - this.query = jdbcSourceConfig.getQuery(); this.jdbcDialect = JdbcDialectLoader.load( jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode()); + this.jdbcDialect.connectionUrlParse( + jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), + jdbcSourceConfig.getJdbcConnectionConfig().getProperties(), + this.jdbcDialect.defaultParameter()); + this.jdbcConnectionProvider = + new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); + this.query = jdbcSourceConfig.getQuery(); try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { this.typeInfo = initTableField(connection); this.partitionParameter = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index 264df5eafa0..4fc5f6b1893 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -67,6 +67,7 @@ import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.PARTITION_NUM; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.PARTITION_UPPER_BOUND; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.PROPERTIES; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.QUERY; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.URL; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.USER; @@ -88,6 +89,10 @@ TableSource createSource(TableSourceFactoryContext context) { JdbcDialectLoader.load( config.getJdbcConnectionConfig().getUrl(), config.getJdbcConnectionConfig().getCompatibleMode()); + dialect.connectionUrlParse( + config.getJdbcConnectionConfig().getUrl(), + config.getJdbcConnectionConfig().getProperties(), + dialect.defaultParameter()); JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig()); @@ -302,7 +307,8 @@ public OptionRule optionRule() { PARTITION_UPPER_BOUND, PARTITION_LOWER_BOUND, PARTITION_NUM, - COMPATIBLE_MODE) + COMPATIBLE_MODE, + PROPERTIES) .build(); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java index 463c1dc6581..1f5f4242c57 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java @@ -59,8 +59,7 @@ public void execute() throws CommandExecuteException { SparkExecution seaTunnelTaskExecution = new SparkExecution(config); seaTunnelTaskExecution.execute(); } catch (Exception e) { - log.error("Run SeaTunnel on spark failed.", e); - throw new CommandExecuteException(e.getMessage()); + throw new CommandExecuteException("Run SeaTunnel on spark failed", e); } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index b10aa0c2225..d8fcda85122 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -18,12 +18,34 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink; +import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState; import org.apache.commons.lang3.tuple.Pair; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -32,9 +54,12 @@ import org.testcontainers.utility.DockerLoggerFactory; import com.google.common.collect.Lists; +import com.mysql.cj.jdbc.ConnectionImpl; +import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; +import java.sql.SQLException; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; @@ -42,6 +67,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; public class JdbcMysqlIT extends AbstractJdbcIT { @@ -56,6 +82,9 @@ public class JdbcMysqlIT extends AbstractJdbcIT { private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel"; private static final int MYSQL_PORT = 3306; private static final String MYSQL_URL = "jdbc:mysql://" + HOST + ":%s/%s?useSSL=false"; + private static final String URL = "jdbc:mysql://" + HOST + ":3306/seatunnel"; + + private static final String SQL = "select * from seatunnel.source"; private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; @@ -299,4 +328,197 @@ protected void initCatalog() { jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); catalog.open(); } + + private String getUrl() { + return URL.replace("HOST", dbServer.getHost()); + } + + @Test + public void parametersTest() throws SQLException, IOException, ClassNotFoundException { + defaultSinkParametersTest(); + defaultSourceParametersTest(); + } + + void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFoundException { + TableSchema tableSchema = + TableSchema.builder() + .column( + PhysicalColumn.of( + "c_bigint", + BasicType.LONG_TYPE, + 22, + false, + null, + "c_bigint")) + .build(); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("test_catalog", "seatunnel", "source"), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + "User table"); + + // case1 url not contains parameters and properties not contains parameters + Map map1 = getDefaultConfigMap(); + map1.put("url", getUrl()); + ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1); + TableSinkFactoryContext context1 = + new TableSinkFactoryContext( + catalogTable, config1, Thread.currentThread().getContextClassLoader()); + JdbcSink jdbcSink1 = (JdbcSink) new JdbcSinkFactory().createSink(context1).createSink(); + Properties connectionProperties1 = getSinkProperties(jdbcSink1); + Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true"); + + // case2 url contains parameters and properties not contains parameters + Map map2 = getDefaultConfigMap(); + map2.put("url", getUrl() + "?rewriteBatchedStatements=false"); + ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2); + TableSinkFactoryContext context2 = + new TableSinkFactoryContext( + catalogTable, config2, Thread.currentThread().getContextClassLoader()); + JdbcSink jdbcSink2 = (JdbcSink) new JdbcSinkFactory().createSink(context2).createSink(); + Properties connectionProperties2 = getSinkProperties(jdbcSink2); + Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false"); + + // case3 url not contains parameters and properties not contains parameters + Map map3 = getDefaultConfigMap(); + Map properties3 = new HashMap<>(); + properties3.put("rewriteBatchedStatements", "false"); + map3.put("properties", properties3); + map3.put("url", getUrl()); + ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3); + TableSinkFactoryContext context3 = + new TableSinkFactoryContext( + catalogTable, config3, Thread.currentThread().getContextClassLoader()); + JdbcSink jdbcSink3 = (JdbcSink) new JdbcSinkFactory().createSink(context3).createSink(); + Properties connectionProperties3 = getSinkProperties(jdbcSink3); + Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false"); + + // case4 url contains parameters and properties contains parameters + Map map4 = getDefaultConfigMap(); + Map properties4 = new HashMap<>(); + properties4.put("useSSL", "true"); + properties4.put("rewriteBatchedStatements", "false"); + map4.put("properties", properties4); + map4.put("url", getUrl() + "?useSSL=false&rewriteBatchedStatements=true"); + ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4); + TableSinkFactoryContext context4 = + new TableSinkFactoryContext( + catalogTable, config4, Thread.currentThread().getContextClassLoader()); + JdbcSink jdbcSink4 = (JdbcSink) new JdbcSinkFactory().createSink(context4).createSink(); + Properties connectionProperties4 = getSinkProperties(jdbcSink4); + Assertions.assertEquals(connectionProperties4.get("useSSL"), "true"); + Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false"); + } + + void defaultSourceParametersTest() throws IOException, SQLException, ClassNotFoundException { + // case1 url not contains parameters and properties not contains parameters + Map map1 = getDefaultConfigMap(); + map1.put("url", getUrl()); + map1.put("query", SQL); + ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1); + TableSourceFactoryContext context1 = + new TableSourceFactoryContext( + config1, Thread.currentThread().getContextClassLoader()); + JdbcSource jdbcSource1 = + (JdbcSource) + new JdbcSourceFactory() + .createSource( + context1) + .createSource(); + Properties connectionProperties1 = getSourceProperties(jdbcSource1); + Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true"); + + // case2 url contains parameters and properties not contains parameters + Map map2 = getDefaultConfigMap(); + map2.put("url", getUrl() + "?rewriteBatchedStatements=false"); + map2.put("query", SQL); + ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2); + TableSourceFactoryContext context2 = + new TableSourceFactoryContext( + config2, Thread.currentThread().getContextClassLoader()); + JdbcSource jdbcSource2 = + (JdbcSource) + new JdbcSourceFactory() + .createSource( + context2) + .createSource(); + Properties connectionProperties2 = getSourceProperties(jdbcSource2); + Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false"); + + // case3 url not contains parameters and properties not contains parameters + Map map3 = getDefaultConfigMap(); + Map properties3 = new HashMap<>(); + properties3.put("rewriteBatchedStatements", "false"); + map3.put("properties", properties3); + map3.put("url", getUrl()); + map3.put("query", SQL); + ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3); + TableSourceFactoryContext context3 = + new TableSourceFactoryContext( + config3, Thread.currentThread().getContextClassLoader()); + JdbcSource jdbcSource3 = + (JdbcSource) + new JdbcSourceFactory() + .createSource( + context3) + .createSource(); + Properties connectionProperties3 = getSourceProperties(jdbcSource3); + Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false"); + + // case4 url contains parameters and properties contains parameters + Map map4 = getDefaultConfigMap(); + Map properties4 = new HashMap<>(); + properties4.put("useSSL", "true"); + properties4.put("rewriteBatchedStatements", "false"); + map4.put("properties", properties4); + map4.put("url", getUrl() + "?useSSL=false&rewriteBatchedStatements=true"); + map4.put("query", SQL); + ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4); + TableSourceFactoryContext context4 = + new TableSourceFactoryContext( + config4, Thread.currentThread().getContextClassLoader()); + JdbcSource jdbcSource4 = + (JdbcSource) + new JdbcSourceFactory() + .createSource( + context4) + .createSource(); + Properties connectionProperties4 = getSourceProperties(jdbcSource4); + Assertions.assertEquals(connectionProperties4.get("useSSL"), "true"); + Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false"); + } + + @NotNull private Map getDefaultConfigMap() { + Map map = new HashMap<>(); + map.put("driver", "com.mysql.cj.jdbc.Driver"); + map.put("user", MYSQL_USERNAME); + map.put("password", MYSQL_PASSWORD); + return map; + } + + private Properties getSinkProperties(JdbcSink jdbcSink) + throws IOException, SQLException, ClassNotFoundException { + jdbcSink.setTypeInfo( + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.INT_TYPE})); + JdbcSinkWriter jdbcSinkWriter = (JdbcSinkWriter) jdbcSink.createWriter(null); + JdbcConnectionProvider connectionProvider = + (JdbcConnectionProvider) + ReflectionUtils.getField(jdbcSinkWriter, "connectionProvider").get(); + ConnectionImpl connection = (ConnectionImpl) connectionProvider.getOrEstablishConnection(); + Properties connectionProperties = connection.getProperties(); + return connectionProperties; + } + + private Properties getSourceProperties(JdbcSource jdbcSource) + throws IOException, SQLException, ClassNotFoundException { + JdbcConnectionProvider connectionProvider = + (JdbcConnectionProvider) + ReflectionUtils.getField(jdbcSource, "jdbcConnectionProvider").get(); + ConnectionImpl connection = (ConnectionImpl) connectionProvider.getOrEstablishConnection(); + Properties connectionProperties = connection.getProperties(); + return connectionProperties; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf index b91ee9d3177..89309310bdf 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf @@ -22,13 +22,17 @@ env { source { jdbc { - url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false" + url = "jdbc:mysql://mysql-e2e:3306/seatunnel" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "Abc!@#135_seatunnel" query = "select * from source;" + properties { + useSSL=false + rewriteBatchedStatements=true + } } } @@ -37,7 +41,7 @@ transform { sink { jdbc { - url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false" + url = "jdbc:mysql://mysql-e2e:3306/seatunnel" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "Abc!@#135_seatunnel" @@ -49,5 +53,9 @@ sink { c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + properties { + useSSL=false + rewriteBatchedStatements=true + } } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf index bf0d3afab15..810f6c5076d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf @@ -28,6 +28,10 @@ source { user = "root" password = "Abc!@#135_seatunnel" query = "select * from source" + properties { + useSSL=false + rewriteBatchedStatements=true + } } } @@ -56,5 +60,9 @@ sink { xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" max_commit_attempts = 3 transaction_timeout_sec = 86400 + properties { + useSSL=false + rewriteBatchedStatements=true + } } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf index a8b07cfcb99..10eb26f74d2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf @@ -34,6 +34,9 @@ source { user = testUser password = testPassword query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ FROM E2E_TABLE_SOURCE" + properties { + database.oracle.jdbc.timezoneAsRegion = "false" + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, @@ -47,6 +50,9 @@ sink { user = testUser password = testPassword query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)" + properties { + database.oracle.jdbc.timezoneAsRegion = "false" + } } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,