From cf01a17cb6d0521e4121f27b4b1217f674dd37b8 Mon Sep 17 00:00:00 2001 From: kim-up Date: Fri, 14 Apr 2023 18:32:17 +0800 Subject: [PATCH 01/11] [Feature][Connector-V2] Add huawei cloud obs connector #4577 --- docs/en/connector-v2/sink/ObsFile.md | 260 ++++++++++++++++ docs/en/connector-v2/source/ObsFile.md | 290 ++++++++++++++++++ release-note.md | 1 + .../seatunnel/file/config/FileSystemType.java | 3 +- .../connector-file/connector-file-obs/pom.xml | 56 ++++ .../seatunnel/file/obs/config/ObsConf.java | 55 ++++ .../seatunnel/file/obs/config/ObsConfig.java | 39 +++ .../seatunnel/file/obs/sink/ObsFileSink.java | 63 ++++ .../file/obs/sink/ObsFileSinkFactory.java | 88 ++++++ .../file/obs/source/ObsFileSource.java | 119 +++++++ .../file/obs/source/ObsFileSourceFactory.java | 70 +++++ .../services/org.apache.hadoop.fs.FileSystem | 16 + .../file/obs/ObsFileFactoryTest.java | 33 ++ .../connector-file/pom.xml | 1 + 14 files changed, 1093 insertions(+), 1 deletion(-) create mode 100644 docs/en/connector-v2/sink/ObsFile.md create mode 100644 docs/en/connector-v2/source/ObsFile.md create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java diff --git a/docs/en/connector-v2/sink/ObsFile.md b/docs/en/connector-v2/sink/ObsFile.md new file mode 100644 index 00000000000..3a9651a19f6 --- /dev/null +++ b/docs/en/connector-v2/sink/ObsFile.md @@ -0,0 +1,260 @@ +# ObsFile + +> Obs file sink connector + +## Description + +Output data to huawei cloud obs file system. + +:::tip + +If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. + +If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. + +We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. +It only supports hadoop version **2.9.X+**. + +::: + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +By default, we use 2PC commit to ensure `exactly-once` + +- [x] file format type + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + +## Options + +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| +| path | string | yes | - | | +| bucket | string | yes | - | | +| access_key | string | yes | - | | +| access_secret | string | yes | - | | +| endpoint | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | + +### path [string] + +The target dir path is required. + +### bucket [string] + +The bucket address of obs file system, for example: `obs://obs-bucket-name` + +### access_key [string] + +The access key of obs file system. + +### access_secret [string] + +The access secret of obs file system. + +### endpoint [string] + +The endpoint of obs file system. + +### custom_filename [boolean] + +Whether custom the filename + +### file_name_expression [string] + +Only used when `custom_filename` is `true` + +`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, +`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +### filename_time_format [string] + +Only used when `custom_filename` is `true` + +When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: + +| Symbol | Description | +|--------|--------------------| +| y | Year | +| M | Month | +| d | Day of month | +| H | Hour in day (0-23) | +| m | Minute in hour | +| s | Second in minute | + +### file_format_type [string] + +We supported as the following file types: + +`text` `json` `csv` `orc` `parquet` `excel` + +Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. + +### field_delimiter [string] + +The separator between columns in a row of data. Only needed by `text` file format. + +### row_delimiter [string] + +The separator between rows in a file. Only needed by `text` file format. + +### have_partition [boolean] + +Whether you need processing partitions. + +### partition_by [array] + +Only used when `have_partition` is `true`. + +Partition data based on selected fields. + +### partition_dir_expression [string] + +Only used when `have_partition` is `true`. + +If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. + +Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. + +### is_partition_field_write_in_file [boolean] + +Only used when `have_partition` is `true`. + +If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file. + +For example, if you want to write a Hive Data File, Its value should be `false`. + +### sink_columns [array] + +Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. +The order of the fields determines the order in which the file is actually written. + +### is_enable_transaction [boolean] + +If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +Only support `true` now. + +### batch_size [int] + +The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. + +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + +Tips: excel type does not support any compression format + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +### max_rows_in_memory [int] + +When File Format is Excel,The maximum number of data items that can be cached in the memory. + +### sheet_name [string] + +Writer the sheet of the workbook + +## Example + +For text file format with `have_partition` and `custom_filename` and `sink_columns` + +```hocon + + ObsFile { + path="/seatunnel/sink" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + sink_columns = ["name","age"] + is_enable_transaction = true + } + +``` + +For parquet file format with `have_partition` and `sink_columns` + +```hocon + + ObsFile { + path = "/seatunnel/sink" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_format_type = "parquet" + sink_columns = ["name","age"] + } + +``` + +For orc file format simple config + +```bash + + ObsFile { + path="/seatunnel/sink" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "obs.xxxxx.myhuaweicloud.com" + file_format_type = "orc" + } + +``` + +## Changelog + +### 2.3.1-SNAPSHOT 2023-04-14 + +- Add OBS Sink Connector + diff --git a/docs/en/connector-v2/source/ObsFile.md b/docs/en/connector-v2/source/ObsFile.md new file mode 100644 index 00000000000..800189f8cf5 --- /dev/null +++ b/docs/en/connector-v2/source/ObsFile.md @@ -0,0 +1,290 @@ +# ObsFile + +> Obs file source connector + +## Description + +Read data from huawei cloud obs file system. + +:::tip + +If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. + +If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. + +We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. +It only supports hadoop version **2.9.X+**. + +::: + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) + +Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot. + +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) +- [x] file format type + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + +## Options + +| name | type | required | default value | +|---------------------------|---------|----------|---------------------| +| path | string | yes | - | +| file_format_type | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| endpoint | string | yes | - | +| read_columns | list | yes | - | +| delimiter | string | no | \001 | +| parse_partition_from_path | boolean | no | true | +| skip_header_row_number | long | no | 0 | +| date_format | string | no | yyyy-MM-dd | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | +| time_format | string | no | HH:mm:ss | +| schema | config | no | - | +| common-options | | no | - | +| sheet_name | string | no | - | + +### path [string] + +The source file path. + +### delimiter [string] + +Field delimiter, used to tell connector how to slice and dice fields when reading text files + +default `\001`, the same as hive's default delimiter + +### parse_partition_from_path [boolean] + +Control whether parse the partition keys and values from file path + +For example if you read a file from path `obs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` + +Every record data from file will be added these two fields: + +| name | age | +|---------------|-----| +| tyrantlucifer | 26 | + +Tips: **Do not define partition fields in schema option** + +### date_format [string] + +Date type format, used to tell the connector how to convert string to date, supported as the following formats: + +`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` + +default `yyyy-MM-dd` + +### datetime_format [string] + +Datetime type format, used to tell the connector how to convert string to datetime, supported as the following formats: + +`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` + +default `yyyy-MM-dd HH:mm:ss` + +### time_format [string] + +Time type format, used to tell the connector how to convert string to time, supported as the following formats: + +`HH:mm:ss` `HH:mm:ss.SSS` + +default `HH:mm:ss` + +### skip_header_row_number [long] + +Skip the first few lines, but only for the txt and csv. + +For example, set like following: + +`skip_header_row_number = 2` + +then Seatunnel will skip the first 2 lines from source files + +### file_format_type [string] + +File type, supported as the following file types: + +`text` `csv` `parquet` `orc` `json` `excel` + +If you assign file type to `json`, you should also assign schema option to tell the connector how to parse data to the row you want. + +For example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +You can also save multiple pieces of data in one file and split them by one newline: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. + +If you assign file type to `text` `csv`, you can choose to specify the schema information or not. + +For example, upstream data is the following: + +```text + +tyrantlucifer#26#male + +``` + +If you do not assign data schema connector will treat the upstream data as the following: + +| content | +|-----------------------| +| tyrantlucifer#26#male | + +If you assign data schema, you should also assign the option `delimiter` too except CSV file type + +you should assign schema and delimiter as the following: + +```hocon + +delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +connector will generate data as the following: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### bucket [string] + +The bucket address of obs file system, for example: `obs://obs-bucket-name` + +### access_key [string] + +The access key of obs file system. + +### access_secret [string] + +The access secret of obs file system. + +### endpoint [string] + +The endpoint of obs file system. + +### schema [config] + +#### fields [Config] + +The schema of upstream data. + +### read_columns [list] + +The read column list of the data source, user can use it to implement field projection. + +The file type supported column projection as the following shown: + +- text +- json +- csv +- orc +- parquet +- excel + +**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured** + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +### sheet_name [string] + +Reader the sheet of the workbook,Only used when file_format is excel. + +## Example + +```hocon + + ObsFile { + path = "/seatunnel/orc" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "orc" + } + +``` + +```hocon + + ObsFile { + path = "/seatunnel/json" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "json" + schema { + fields { + id = int + name = string + } + } + } + +``` + +## Changelog + +### 2.3.1-SNAPSHOT 2023-04-14 + +- Add OBS File Source Connector + diff --git a/release-note.md b/release-note.md index 65ae1abab0f..d0b59300840 100644 --- a/release-note.md +++ b/release-note.md @@ -19,6 +19,7 @@ - [Hbase] Add hbase sink connector #4049 - [Github] Add Github source connector #4155 - [CDC] Support export debezium-json format to kafka #4339 +- [FILE-OBS] Add huawei cloud obs connector ### Formats - [Canal]Support read canal format message #3950 diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java index 8d50cee4697..db7d79a2727 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java @@ -26,7 +26,8 @@ public enum FileSystemType implements Serializable { OSS_JINDO("OssJindoFile"), FTP("FtpFile"), SFTP("SftpFile"), - S3("S3File"); + S3("S3File"), + OBS("ObsFile"); private final String fileSystemPluginName; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml new file mode 100644 index 00000000000..a820d7a3e74 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + org.apache.seatunnel + connector-file + ${revision} + + + connector-file-obs + SeaTunnel : Connectors V2 : File : Obs + + + 3.1.1.29 + 3.19.7.3 + + + + + + org.apache.seatunnel + connector-file-base + ${project.version} + + + org.apache.flink + flink-shaded-hadoop-2 + provided + + + org.apache.avro + avro + + + + + org.apache.hadoop + hadoop-huaweicloud + ${hadoop-huaweicloud.version} + + + com.huawei.storage + esdk-obs-java + + + + + com.huawei.storage + esdk-obs-java + ${esdk.version} + + + + + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java new file mode 100644 index 00000000000..714e7ede178 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; + +import org.apache.hadoop.fs.obs.Constants; + +import java.util.HashMap; + +public class ObsConf extends HadoopConf { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.obs.OBSFileSystem"; + private static final String SCHEMA = "obs"; + + @Override + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + + public ObsConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(Config config) { + HadoopConf hadoopConf = new ObsConf(config.getString(ObsConfig.BUCKET.key())); + HashMap ossOptions = new HashMap<>(); + ossOptions.put(Constants.ACCESS_KEY, config.getString(ObsConfig.ACCESS_KEY.key())); + ossOptions.put(Constants.SECRET_KEY, config.getString(ObsConfig.ACCESS_SECRET.key())); + ossOptions.put(Constants.ENDPOINT, config.getString(ObsConfig.ENDPOINT.key())); + hadoopConf.setExtraOptions(ossOptions); + return hadoopConf; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java new file mode 100644 index 00000000000..b42e83dfc61 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; + +public class ObsConfig extends BaseSourceConfig { + public static final Option ACCESS_KEY = + Options.key("access_key") + .stringType() + .noDefaultValue() + .withDescription("OBS bucket access key"); + public static final Option ACCESS_SECRET = + Options.key("access_secret") + .stringType() + .noDefaultValue() + .withDescription("OBS bucket access secret"); + public static final Option ENDPOINT = + Options.key("endpoint").stringType().noDefaultValue().withDescription("OBS endpoint"); + public static final Option BUCKET = + Options.key("bucket").stringType().noDefaultValue().withDescription("OBS bucket"); +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java new file mode 100644 index 00000000000..8f303b6a457 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.sink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSink.class) +public class ObsFileSink extends BaseFileSink { + @Override + public String getPluginName() { + return FileSystemType.OBS.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + super.prepare(pluginConfig); + CheckResult result = + CheckConfigUtil.checkAllExists( + pluginConfig, + ObsConfig.FILE_PATH.key(), + ObsConfig.BUCKET.key(), + ObsConfig.ACCESS_KEY.key(), + ObsConfig.ACCESS_SECRET.key(), + ObsConfig.BUCKET.key()); + if (!result.isSuccess()) { + throw new FileConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, result.getMsg())); + } + hadoopConf = ObsConf.buildWithConfig(pluginConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java new file mode 100644 index 00000000000..8f1c221e076 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class ObsFileSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OBS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(ObsConfig.FILE_PATH) + .required(ObsConfig.BUCKET) + .required(ObsConfig.ACCESS_KEY) + .required(ObsConfig.ACCESS_SECRET) + .required(ObsConfig.ENDPOINT) + .optional(BaseSinkConfig.FILE_FORMAT_TYPE) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.TEXT, + BaseSinkConfig.ROW_DELIMITER, + BaseSinkConfig.FIELD_DELIMITER, + BaseSinkConfig.TXT_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.CSV, + BaseSinkConfig.TXT_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.JSON, + BaseSinkConfig.TXT_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.ORC, + BaseSinkConfig.ORC_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.PARQUET, + BaseSinkConfig.PARQUET_COMPRESS) + .optional(BaseSinkConfig.CUSTOM_FILENAME) + .conditional( + BaseSinkConfig.CUSTOM_FILENAME, + true, + BaseSinkConfig.FILE_NAME_EXPRESSION, + BaseSinkConfig.FILENAME_TIME_FORMAT) + .optional(BaseSinkConfig.HAVE_PARTITION) + .conditional( + BaseSinkConfig.HAVE_PARTITION, + true, + BaseSinkConfig.PARTITION_BY, + BaseSinkConfig.PARTITION_DIR_EXPRESSION, + BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE) + .optional(BaseSinkConfig.SINK_COLUMNS) + .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION) + .optional(BaseSinkConfig.DATE_FORMAT) + .optional(BaseSinkConfig.DATETIME_FORMAT) + .optional(BaseSinkConfig.TIME_FORMAT) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java new file mode 100644 index 00000000000..9668c0c12aa --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.source; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig; +import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSource.class) +public class ObsFileSource extends BaseFileSource { + @Override + public String getPluginName() { + return FileSystemType.OBS.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = + CheckConfigUtil.checkAllExists( + pluginConfig, + ObsConfig.FILE_PATH.key(), + ObsConfig.FILE_FORMAT_TYPE.key(), + ObsConfig.ENDPOINT.key(), + ObsConfig.ACCESS_KEY.key(), + ObsConfig.ACCESS_SECRET.key(), + ObsConfig.BUCKET.key()); + if (!result.isSuccess()) { + throw new FileConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); + } + readStrategy = + ReadStrategyFactory.of(pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key())); + readStrategy.setPluginConfig(pluginConfig); + String path = pluginConfig.getString(ObsConfig.FILE_PATH.key()); + hadoopConf = ObsConf.buildWithConfig(pluginConfig); + try { + filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); + } catch (IOException e) { + String errorMsg = String.format("Get file list from this path [%s] failed", path); + throw new FileConnectorException( + FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); + } + // support user-defined schema + FileFormat fileFormat = + FileFormat.valueOf( + pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()).toUpperCase()); + // only json text csv type support user-defined schema now + if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) { + switch (fileFormat) { + case CSV: + case TEXT: + case JSON: + case EXCEL: + SeaTunnelRowType userDefinedSchema = + CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); + readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); + break; + case ORC: + case PARQUET: + throw new FileConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + "SeaTunnel does not support user-defined schema for [parquet, orc] files"); + default: + // never got in there + throw new FileConnectorException( + CommonErrorCode.ILLEGAL_ARGUMENT, + "SeaTunnel does not supported this file format"); + } + } else { + try { + rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); + } catch (FileConnectorException e) { + String errorMsg = + String.format("Get table schema from file [%s] failed", filePaths.get(0)); + throw new FileConnectorException( + CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java new file mode 100644 index 00000000000..fe8ac7efd79 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig; + +import com.google.auto.service.AutoService; + +import java.util.Arrays; + +@AutoService(Factory.class) +public class ObsFileSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OBS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(ObsConfig.FILE_PATH) + .required(ObsConfig.BUCKET) + .required(ObsConfig.ACCESS_KEY) + .required(ObsConfig.ACCESS_SECRET) + .required(ObsConfig.ENDPOINT) + .required(BaseSourceConfig.FILE_FORMAT_TYPE) + .conditional( + BaseSourceConfig.FILE_FORMAT_TYPE, + FileFormat.TEXT, + BaseSourceConfig.DELIMITER) + .conditional( + BaseSourceConfig.FILE_FORMAT_TYPE, + Arrays.asList( + FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + CatalogTableUtil.SCHEMA) + .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH) + .optional(BaseSourceConfig.DATE_FORMAT) + .optional(BaseSourceConfig.DATETIME_FORMAT) + .optional(BaseSourceConfig.TIME_FORMAT) + .build(); + } + + @Override + public Class getSourceClass() { + return ObsFileSource.class; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 00000000000..75590871402 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.obs.OBSFileSystem \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java new file mode 100644 index 00000000000..dc7c34525a5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.obs; + +import org.apache.seatunnel.connectors.seatunnel.file.obs.sink.ObsFileSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.obs.source.ObsFileSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ObsFileFactoryTest { + + @Test + void optionRule() { + Assertions.assertNotNull((new ObsFileSourceFactory()).optionRule()); + Assertions.assertNotNull((new ObsFileSinkFactory()).optionRule()); + } +} diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml index 25e47c67c1e..84b3af3838b 100644 --- a/seatunnel-connectors-v2/connector-file/pom.xml +++ b/seatunnel-connectors-v2/connector-file/pom.xml @@ -39,6 +39,7 @@ connector-file-sftp connector-file-s3 connector-file-oss-jindo + connector-file-obs From 2c6746aeba4061e8f22f6b5dc0cf1159d184121a Mon Sep 17 00:00:00 2001 From: kim-up Date: Fri, 14 Apr 2023 18:48:23 +0800 Subject: [PATCH 02/11] [Fix][plugin-mapping] update plugin-mapping.properties --- plugin-mapping.properties | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 87debbe7ec2..7a0f71c5382 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -107,3 +107,5 @@ seatunnel.sink.Hbase = connector-hbase seatunnel.source.StarRocks = connector-starrocks seatunnel.source.Rocketmq = connector-rocketmq seatunnel.sink.Rocketmq = connector-rocketmq +seatunnel.source.ObsFile = connector-file-obs +seatunnel.sink.ObsFile = connector-file-obs From 66d6eca1cd82405fa5ddb2a7de50773139fdb0c0 Mon Sep 17 00:00:00 2001 From: kim-up Date: Sat, 15 Apr 2023 11:20:27 +0800 Subject: [PATCH 03/11] [Connector-V2][Obs] Add license for pom.xml --- .../connector-file/connector-file-obs/pom.xml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml index a820d7a3e74..b675bc667ba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml @@ -1,4 +1,22 @@ + 4.0.0 From b19a4332d07dd4c671c0b66b75d2fa3ea61762df Mon Sep 17 00:00:00 2001 From: kim-up Date: Sun, 16 Apr 2023 10:18:29 +0800 Subject: [PATCH 04/11] [Connector-V2][Obs] Update dist pom.xml --- seatunnel-dist/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index e9fc30f84d1..484d437b2ac 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -487,6 +487,13 @@ provided + + org.apache.seatunnel + connector-file-obs + ${project.version} + provided + + com.aliyun.phoenix From f6174c0069df42f351041e1a9895d79584440fe3 Mon Sep 17 00:00:00 2001 From: kim-up Date: Mon, 17 Apr 2023 14:02:53 +0800 Subject: [PATCH 05/11] [Fix][dependency] add huawei cloud repository --- .../connector-file/connector-file-obs/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml index b675bc667ba..00676916fda 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml @@ -71,4 +71,10 @@ + + + huaweiCloud + https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/ + + From 35a25c3215b7784869a7761abb06b2b68e6b18ef Mon Sep 17 00:00:00 2001 From: kim-up Date: Tue, 18 Apr 2023 11:40:11 +0800 Subject: [PATCH 06/11] [Doc][obs] Unified format --- docs/en/connector-v2/sink/ObsFile.md | 244 +++++++++++-------------- docs/en/connector-v2/source/ObsFile.md | 243 ++++++++++++------------ 2 files changed, 228 insertions(+), 259 deletions(-) diff --git a/docs/en/connector-v2/sink/ObsFile.md b/docs/en/connector-v2/sink/ObsFile.md index 3a9651a19f6..9a207f69c21 100644 --- a/docs/en/connector-v2/sink/ObsFile.md +++ b/docs/en/connector-v2/sink/ObsFile.md @@ -2,20 +2,13 @@ > Obs file sink connector -## Description - -Output data to huawei cloud obs file system. - -:::tip +## Support those engines -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. - -If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. - -We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. -It only supports hadoop version **2.9.X+**. - -::: +> Spark +> +> Flink +> +> Seatunnel Zeta ## Key features @@ -31,71 +24,76 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] json - [x] excel -## Options - -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| -| path | string | yes | - | | -| bucket | string | yes | - | | -| access_key | string | yes | - | | -| access_secret | string | yes | - | | -| endpoint | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | - -### path [string] - -The target dir path is required. - -### bucket [string] - -The bucket address of obs file system, for example: `obs://obs-bucket-name` - -### access_key [string] - -The access key of obs file system. +## Description -### access_secret [string] +Output data to huawei cloud obs file system. -The access secret of obs file system. +If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. -### endpoint [string] +If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. -The endpoint of obs file system. +We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. +It only supports hadoop version **2.9.X+**. -### custom_filename [boolean] +## Required Jar List -Whether custom the filename +| jar | supported versions | maven | +|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------| +| hadoop-huaweicloud | support version >= 3.1.1.29 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/) | +| esdk-obs-java | support version >= 3.19.7.3 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/) | +| okhttp | support version >= 3.11.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) | +| okio | support version >= 1.14.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/) | -### file_name_expression [string] +> Please download the support list corresponding to 'Maven' and copy them to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory. +> +> And copy all jars to $SEATNUNNEL_HOME/lib/ -Only used when `custom_filename` is `true` +## Options -`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, -`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. +| name | type | required | default | description | +|----------------------------------|---------|----------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The target dir path. | +| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name`. | +| access_key | string | yes | - | The access key of obs file system. | +| access_secret | string | yes | - | The access secret of obs file system. | +| endpoint | string | yes | - | The endpoint of obs file system. | +| custom_filename | boolean | no | false | Whether you need custom the filename. | +| file_name_expression | string | no | "${transactionId}" | Describes the file expression which will be created into the `path`. Only used when custom_filename is true. [Tips](#file_name_expression) | +| filename_time_format | string | no | "yyyy.MM.dd" | Specify the time format of the `path`. Only used when custom_filename is true. [Tips](#filename_time_format) | +| file_format_type | string | no | "csv" | Supported file types. [Tips](#file_format_type) | +| field_delimiter | string | no | '\001' | The separator between columns in a row of data.Only used when file_format is text. | +| row_delimiter | string | no | "\n" | The separator between rows in a file. Only needed by `text` file format. | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Partition data based on selected fields. Only used then have_partition is true. | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true.[Tips](#partition_dir_expression) | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true.[Tips](#is_partition_field_write_in_file) | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns.[Tips](#sink_columns) | +| is_enable_transaction | boolean | no | true | [Tips](#is_enable_transaction) | +| batch_size | int | no | 1000000 | [Tips](#batch_size) | +| compress_codec | string | no | none | [Tips](#compress_codec) | +| common-options | object | no | - | [Tips](#common_options) | +| max_rows_in_memory | int | no | - | When File Format is Excel,The maximum number of data items that can be cached in the memory.Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Writer the sheet of the workbook. Only used when file_format is excel. | + +### Tips + +#### file_name_expression + +> Only used when `custom_filename` is `true` +> +> `file_name_expression` describes the file expression which will be created into the `path`. +> +> We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, +> +> `${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. -### filename_time_format [string] +#### filename_time_format -Only used when `custom_filename` is `true` - -When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: +> Only used when `custom_filename` is `true` +> +> When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: | Symbol | Description | |--------|--------------------| @@ -106,92 +104,66 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file | m | Minute in hour | | s | Second in minute | -### file_format_type [string] - -We supported as the following file types: +#### file_format_type -`text` `json` `csv` `orc` `parquet` `excel` +> We supported as the following file types: +> +> `text` `json` `csv` `orc` `parquet` `excel` Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. -### field_delimiter [string] - -The separator between columns in a row of data. Only needed by `text` file format. - -### row_delimiter [string] - -The separator between rows in a file. Only needed by `text` file format. - -### have_partition [boolean] - -Whether you need processing partitions. +#### partition_dir_expression -### partition_by [array] +> Only used when `have_partition` is `true`. +> +> If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. +> +> Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. -Only used when `have_partition` is `true`. +#### is_partition_field_write_in_file -Partition data based on selected fields. +> Only used when `have_partition` is `true`. +> +> If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file. +> +> For example, if you want to write a Hive Data File, Its value should be `false`. -### partition_dir_expression [string] +#### sink_columns -Only used when `have_partition` is `true`. +> Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. +> The order of the fields determines the order in which the file is actually written. -If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. +#### is_enable_transaction -Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. +> If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. +> +> Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. Only support `true` now. -### is_partition_field_write_in_file [boolean] +#### batch_size -Only used when `have_partition` is `true`. +> The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. -If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file. +#### compress_codec -For example, if you want to write a Hive Data File, Its value should be `false`. - -### sink_columns [array] - -Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. -The order of the fields determines the order in which the file is actually written. - -### is_enable_transaction [boolean] - -If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. - -Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. +> The compress codec of files and the details that supported as the following shown: +> +> - txt: `lzo` `none` +> - json: `lzo` `none` +> - csv: `lzo` `none` +> - orc: `lzo` `snappy` `lz4` `zlib` `none` +> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` -Only support `true` now. +Please note that excel type does not support any compression format -### batch_size [int] +#### common options -The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +> Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. -### compress_codec [string] +## Task Example -The compress codec of files and the details that supported as the following shown: +### text file -- txt: `lzo` `none` -- json: `lzo` `none` -- csv: `lzo` `none` -- orc: `lzo` `snappy` `lz4` `zlib` `none` -- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` - -Tips: excel type does not support any compression format - -### common options - -Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. - -### max_rows_in_memory [int] - -When File Format is Excel,The maximum number of data items that can be cached in the memory. - -### sheet_name [string] - -Writer the sheet of the workbook - -## Example - -For text file format with `have_partition` and `custom_filename` and `sink_columns` +> For text file format with `have_partition` and `custom_filename` and `sink_columns` ```hocon @@ -217,7 +189,9 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum ``` -For parquet file format with `have_partition` and `sink_columns` +### parquet file + +> For parquet file format with `have_partition` and `sink_columns` ```hocon @@ -237,9 +211,11 @@ For parquet file format with `have_partition` and `sink_columns` ``` -For orc file format simple config +### orc file + +> For orc file format simple config -```bash +```hocon ObsFile { path="/seatunnel/sink" @@ -254,7 +230,7 @@ For orc file format simple config ## Changelog -### 2.3.1-SNAPSHOT 2023-04-14 +### next version - Add OBS Sink Connector diff --git a/docs/en/connector-v2/source/ObsFile.md b/docs/en/connector-v2/source/ObsFile.md index 800189f8cf5..0ddbf06564b 100644 --- a/docs/en/connector-v2/source/ObsFile.md +++ b/docs/en/connector-v2/source/ObsFile.md @@ -2,20 +2,13 @@ > Obs file source connector -## Description - -Read data from huawei cloud obs file system. - -:::tip - -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. +## Support those engines -If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. - -We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. -It only supports hadoop version **2.9.X+**. - -::: +> Spark +> +> Flink +> +> Seatunnel Zeta ## Key features @@ -36,104 +29,120 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] json - [x] excel -## Options - -| name | type | required | default value | -|---------------------------|---------|----------|---------------------| -| path | string | yes | - | -| file_format_type | string | yes | - | -| bucket | string | yes | - | -| access_key | string | yes | - | -| access_secret | string | yes | - | -| endpoint | string | yes | - | -| read_columns | list | yes | - | -| delimiter | string | no | \001 | -| parse_partition_from_path | boolean | no | true | -| skip_header_row_number | long | no | 0 | -| date_format | string | no | yyyy-MM-dd | -| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | -| time_format | string | no | HH:mm:ss | -| schema | config | no | - | -| common-options | | no | - | -| sheet_name | string | no | - | +## Description -### path [string] +Read data from huawei cloud obs file system. -The source file path. +If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. -### delimiter [string] +If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. -Field delimiter, used to tell connector how to slice and dice fields when reading text files +We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. +It only supports hadoop version **2.9.X+**. -default `\001`, the same as hive's default delimiter +## Required Jar List -### parse_partition_from_path [boolean] +| jar | supported versions | maven | +|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------| +| hadoop-huaweicloud | support version >= 3.1.1.29 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/) | +| esdk-obs-java | support version >= 3.19.7.3 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/) | +| okhttp | support version >= 3.11.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) | +| okio | support version >= 1.14.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/) | -Control whether parse the partition keys and values from file path +> Please download the support list corresponding to 'Maven' and copy them to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory. +> +> And copy all jars to $SEATNUNNEL_HOME/lib/ -For example if you read a file from path `obs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` +## Options -Every record data from file will be added these two fields: +| name | type | required | default | description | +|---------------------------|---------|----------|---------------------|--------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The target dir path | +| file_format_type | string | yes | - | File type.[Tips](#file_format_type) | +| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name` | +| access_key | string | yes | - | The access key of obs file system | +| access_secret | string | yes | - | The access secret of obs file system | +| endpoint | string | yes | - | The endpoint of obs file system | +| read_columns | list | yes | - | The read column list of the data source, user can use it to implement field projection.[Tips](#read_columns) | +| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files | +| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. [Tips](#parse_partition_from_path) | +| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. | +| date_format | string | no | yyyy-MM-dd | Date type format, used to tell the connector how to convert string to date.[Tips](#date_format) | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell the connector how to convert string to datetime.[Tips](#datetime_format) | +| time_format | string | no | HH:mm:ss | Time type format, used to tell the connector how to convert string to time.[Tips](#time_format) | +| schema | config | no | - | [Tips](#schema) | +| common-options | | no | - | [Tips](#common_options) | +| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | + +### Tips + +#### parse_partition_from_path + +> Control whether parse the partition keys and values from file path +> +> For example if you read a file from path `obs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` +> +> Every record data from the file will be added these two fields: | name | age | |---------------|-----| | tyrantlucifer | 26 | -Tips: **Do not define partition fields in schema option** - -### date_format [string] - -Date type format, used to tell the connector how to convert string to date, supported as the following formats: - -`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` - -default `yyyy-MM-dd` +> Do not define partition fields in schema option -### datetime_format [string] +#### date_format -Datetime type format, used to tell the connector how to convert string to datetime, supported as the following formats: +> Date type format, used to tell the connector how to convert string to date, supported as the following formats: +> +> `yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` +> +> default `yyyy-MM-dd` -`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` +### datetime_format -default `yyyy-MM-dd HH:mm:ss` +> Datetime type format, used to tell the connector how to convert string to datetime, supported as the following formats: +> +> `yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` +> +> default `yyyy-MM-dd HH:mm:ss` -### time_format [string] +### time_format -Time type format, used to tell the connector how to convert string to time, supported as the following formats: +> Time type format, used to tell the connector how to convert string to time, supported as the following formats: +> +> `HH:mm:ss` `HH:mm:ss.SSS` +> +> default `HH:mm:ss` -`HH:mm:ss` `HH:mm:ss.SSS` +### skip_header_row_number -default `HH:mm:ss` +> Skip the first few lines, but only for the txt and csv. +> +> For example, set like following: +> +> `skip_header_row_number = 2` +> +> Then Seatunnel will skip the first 2 lines from source files -### skip_header_row_number [long] +### file_format_type -Skip the first few lines, but only for the txt and csv. - -For example, set like following: - -`skip_header_row_number = 2` - -then Seatunnel will skip the first 2 lines from source files - -### file_format_type [string] - -File type, supported as the following file types: - -`text` `csv` `parquet` `orc` `json` `excel` - -If you assign file type to `json`, you should also assign schema option to tell the connector how to parse data to the row you want. - -For example: - -upstream data is the following: - -```json +> File type, supported as the following file types: +> +> `text` `csv` `parquet` `orc` `json` `excel` +> +> If you assign file type to `json`, you should also assign schema option to tell the connector how to parse data to the row you want. +> +> For example,upstream data is the following: +> +> ```json +> +> ``` {"code": 200, "data": "get success", "success": true} ``` -You can also save multiple pieces of data in one file and split them by one newline: +> You can also save multiple pieces of data in one file and split them by one newline: ```json lines @@ -142,7 +151,7 @@ You can also save multiple pieces of data in one file and split them by one newl ``` -you should assign schema as the following: +> you should assign schema as the following: ```hocon @@ -156,17 +165,17 @@ schema { ``` -connector will generate data as the following: +> connector will generate data as the following: | code | data | success | |------|-------------|---------| | 200 | get success | true | -If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. - -If you assign file type to `text` `csv`, you can choose to specify the schema information or not. - -For example, upstream data is the following: +> If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. +> +> If you assign file type to `text` `csv`, you can choose to specify the schema information or not. +> +> For example, upstream data is the following: ```text @@ -174,15 +183,15 @@ tyrantlucifer#26#male ``` -If you do not assign data schema connector will treat the upstream data as the following: +> If you do not assign data schema connector will treat the upstream data as the following: | content | |-----------------------| | tyrantlucifer#26#male | -If you assign data schema, you should also assign the option `delimiter` too except CSV file type - -you should assign schema and delimiter as the following: +> If you assign data schema, you should also assign the option `delimiter` too except CSV file type +> +> you should assign schema and delimiter as the following: ```hocon @@ -197,39 +206,23 @@ schema { ``` -connector will generate data as the following: +> connector will generate data as the following: | name | age | gender | |---------------|-----|--------| | tyrantlucifer | 26 | male | -### bucket [string] - -The bucket address of obs file system, for example: `obs://obs-bucket-name` - -### access_key [string] - -The access key of obs file system. - -### access_secret [string] - -The access secret of obs file system. +#### schema -### endpoint [string] +##### fields -The endpoint of obs file system. +> The schema of upstream data. -### schema [config] +#### read_columns -#### fields [Config] - -The schema of upstream data. - -### read_columns [list] - -The read column list of the data source, user can use it to implement field projection. - -The file type supported column projection as the following shown: +> The read column list of the data source, user can use it to implement field projection. +> +> The file type supported column projection as the following shown: - text - json @@ -238,17 +231,15 @@ The file type supported column projection as the following shown: - parquet - excel -**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured** - -### common options +> If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +#### common options -### sheet_name [string] +> Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. -Reader the sheet of the workbook,Only used when file_format is excel. +## Task Example -## Example +### orc file ```hocon @@ -263,6 +254,8 @@ Reader the sheet of the workbook,Only used when file_format is excel. ``` +### json file + ```hocon ObsFile { @@ -284,7 +277,7 @@ Reader the sheet of the workbook,Only used when file_format is excel. ## Changelog -### 2.3.1-SNAPSHOT 2023-04-14 +### next version - Add OBS File Source Connector From 993ea6a41c7d5ca22e8629b5ac83c288b5366a7f Mon Sep 17 00:00:00 2001 From: kim-up Date: Sun, 23 Apr 2023 16:50:31 +0800 Subject: [PATCH 07/11] [Connector-v2][obs] Add e2e --- docs/en/connector-v2/sink/ObsFile.md | 59 ++++++++- docs/en/connector-v2/source/ObsFile.md | 81 +++++++++++- .../connector-file-obs-e2e/pom.xml | 45 +++++++ .../e2e/connector/file/obs/ObsFileIT.java | 99 +++++++++++++++ .../test/resources/csv/fake_to_obs_csv.conf | 85 +++++++++++++ .../csv/obs_csv_projection_to_assert.conf | 102 +++++++++++++++ .../test/resources/csv/obs_csv_to_assert.conf | 120 ++++++++++++++++++ .../resources/excel/fake_to_obs_excel.conf | 85 +++++++++++++ .../excel/obs_excel_projection_to_assert.conf | 102 +++++++++++++++ .../resources/excel/obs_excel_to_assert.conf | 120 ++++++++++++++++++ .../resources/json/fake_to_obs_file_json.conf | 83 ++++++++++++ .../json/obs_file_json_to_assert.conf | 114 +++++++++++++++++ .../resources/orc/fake_to_obs_file_orc.conf | 84 ++++++++++++ .../obs_file_orc_projection_to_assert.conf | 81 ++++++++++++ .../resources/orc/obs_file_orc_to_assert.conf | 80 ++++++++++++ .../parquet/fake_to_obs_file_parquet.conf | 84 ++++++++++++ ...obs_file_parquet_projection_to_assert.conf | 81 ++++++++++++ .../parquet/obs_file_parquet_to_assert.conf | 80 ++++++++++++ .../resources/text/fake_to_obs_file_text.conf | 84 ++++++++++++ .../obs_file_text_projection_to_assert.conf | 115 +++++++++++++++++ .../text/obs_file_text_skip_headers.conf | 115 +++++++++++++++++ .../text/obs_file_text_to_assert.conf | 114 +++++++++++++++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 23 files changed, 2003 insertions(+), 11 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf diff --git a/docs/en/connector-v2/sink/ObsFile.md b/docs/en/connector-v2/sink/ObsFile.md index 9a207f69c21..cfb1ec8c55e 100644 --- a/docs/en/connector-v2/sink/ObsFile.md +++ b/docs/en/connector-v2/sink/ObsFile.md @@ -168,7 +168,7 @@ Please note that excel type does not support any compression format ```hocon ObsFile { - path="/seatunnel/sink" + path="/seatunnel/text" bucket = "obs://obs-bucket-name" access_key = "xxxxxxxxxxx" access_secret = "xxxxxxxxxxx" @@ -196,7 +196,7 @@ Please note that excel type does not support any compression format ```hocon ObsFile { - path = "/seatunnel/sink" + path = "/seatunnel/parquet" bucket = "obs://obs-bucket-name" access_key = "xxxxxxxxxxx" access_secret = "xxxxxxxxxxxxxxxxx" @@ -218,7 +218,7 @@ Please note that excel type does not support any compression format ```hocon ObsFile { - path="/seatunnel/sink" + path="/seatunnel/orc" bucket = "obs://obs-bucket-name" access_key = "xxxxxxxxxxx" access_secret = "xxxxxxxxxxx" @@ -228,9 +228,60 @@ Please note that excel type does not support any compression format ``` +### json file + +> For json file format simple config + +```hcocn + + ObsFile { + path = "/seatunnel/json" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "obs.xxxxx.myhuaweicloud.com" + file_format_type = "json" + } + +``` + +### excel file + +> For excel file format simple config + +```hcocn + + ObsFile { + path = "/seatunnel/excel" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "obs.xxxxx.myhuaweicloud.com" + file_format_type = "excel" + } + +``` + +### csv file + +> For csv file format simple config + +```hcocn + + ObsFile { + path = "/seatunnel/csv" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "obs.xxxxx.myhuaweicloud.com" + file_format_type = "csv" + } + +``` + ## Changelog ### next version -- Add OBS Sink Connector +- Add Obs Sink Connector diff --git a/docs/en/connector-v2/source/ObsFile.md b/docs/en/connector-v2/source/ObsFile.md index 0ddbf06564b..b5363d77173 100644 --- a/docs/en/connector-v2/source/ObsFile.md +++ b/docs/en/connector-v2/source/ObsFile.md @@ -239,8 +239,44 @@ schema { ## Task Example +### text file + +> For text file format simple config + +```hocon + + ObsFile { + path = "/seatunnel/text" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "text" + } + +``` + +### parquet file + +> For parquet file format simple config + +```hocon + + ObsFile { + path = "/seatunnel/parquet" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "parquet" + } + +``` + ### orc file +> For orc file format simple config + ```hocon ObsFile { @@ -256,6 +292,8 @@ schema { ### json file +> For json file format simple config + ```hocon ObsFile { @@ -265,12 +303,41 @@ schema { access_secret = "xxxxxxxxxxxxxxxxxxxxxx" endpoint = "obs.xxxxxx.myhuaweicloud.com" file_format_type = "json" - schema { - fields { - id = int - name = string - } - } + } + +``` + +### excel file + +> For excel file format simple config + +```hocon + + ObsFile { + path = "/seatunnel/excel" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "excel" + } + +``` + +### csv file + +> For csv file format simple config + +```hocon + + ObsFile { + path = "/seatunnel/csv" + bucket = "obs://obs-bucket-name" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "csv" + delimiter = "," } ``` @@ -279,5 +346,5 @@ schema { ### next version -- Add OBS File Source Connector +- Add Obs File Source Connector diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml new file mode 100644 index 00000000000..9ee7fd4f147 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + connector-file-obs-e2e + SeaTunnel : E2E : Connector V2 : File Obs + + + + org.apache.seatunnel + connector-fake + ${project.version} + + + org.apache.seatunnel + connector-file-obs + ${project.version} + + + org.apache.seatunnel + connector-assert + ${project.version} + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java new file mode 100644 index 00000000000..d37cd53e352 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.file.obs; + +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.flink.Flink13Container; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +@Disabled +public class ObsFileIT extends TestSuiteBase { + + @TestTemplate + public void testLocalFileReadAndWrite(TestContainer container) + throws IOException, InterruptedException { + if (container instanceof Flink13Container) { + return; + } + Container.ExecResult excelWriteResult = + container.executeJob("/excel/fake_to_obs_excel.conf"); + Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); + Container.ExecResult excelReadResult = + container.executeJob("/excel/obs_excel_to_assert.conf"); + Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr()); + Container.ExecResult excelProjectionReadResult = + container.executeJob("/excel/obs_excel_projection_to_assert.conf"); + Assertions.assertEquals( + 0, excelReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + // test write obs text file + Container.ExecResult textWriteResult = + container.executeJob("/text/fake_to_obs_file_text.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + // test read skip header + Container.ExecResult textWriteAndSkipResult = + container.executeJob("/text/obs_file_text_skip_headers.conf"); + Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode()); + // test read obs text file + Container.ExecResult textReadResult = + container.executeJob("/text/obs_file_text_to_assert.conf"); + Assertions.assertEquals(0, textReadResult.getExitCode()); + // test read obs text file with projection + Container.ExecResult textProjectionResult = + container.executeJob("/text/obs_file_text_projection_to_assert.conf"); + Assertions.assertEquals(0, textProjectionResult.getExitCode()); + // test write obs json file + Container.ExecResult jsonWriteResult = + container.executeJob("/json/fake_to_obs_file_json.conf"); + Assertions.assertEquals(0, jsonWriteResult.getExitCode()); + // test read obs json file + Container.ExecResult jsonReadResult = + container.executeJob("/json/obs_file_json_to_assert.conf"); + Assertions.assertEquals(0, jsonReadResult.getExitCode()); + // test write obs orc file + Container.ExecResult orcWriteResult = + container.executeJob("/orc/fake_to_obs_file_orc.conf"); + Assertions.assertEquals(0, orcWriteResult.getExitCode()); + // test read obs orc file + Container.ExecResult orcReadResult = + container.executeJob("/orc/obs_file_orc_to_assert.conf"); + Assertions.assertEquals(0, orcReadResult.getExitCode()); + // test read obs orc file with projection + Container.ExecResult orcProjectionResult = + container.executeJob("/orc/obs_file_orc_projection_to_assert.conf"); + Assertions.assertEquals(0, orcProjectionResult.getExitCode()); + // test write obs parquet file + Container.ExecResult parquetWriteResult = + container.executeJob("/parquet/fake_to_obs_file_parquet.conf"); + Assertions.assertEquals(0, parquetWriteResult.getExitCode()); + // test read obs parquet file + Container.ExecResult parquetReadResult = + container.executeJob("/parquet/obs_file_parquet_to_assert.conf"); + Assertions.assertEquals(0, parquetReadResult.getExitCode()); + // test read obs parquet file with projection + Container.ExecResult parquetProjectionResult = + container.executeJob("/parquet/obs_file_parquet_projection_to_assert.conf"); + Assertions.assertEquals(0, parquetProjectionResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf new file mode 100644 index 00000000000..8ed1e64fce0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + ObsFile { + path="/seatunnel/csv" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format_type="csv" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf new file mode 100644 index 00000000000..da22e3e90a2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +source { + ObsFile { + path="/seatunnel/csv" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + result_table_name = "fake" + file_format_type = csv + delimiter = "," + read_columns = [c_string, c_boolean] + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf new file mode 100644 index 00000000000..52bbcf5ab95 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path="/seatunnel/csv" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + result_table_name = "fake" + file_format_type = csv + delimiter = "," + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf new file mode 100644 index 00000000000..79ff16eb1af --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + ObsFile { + path="/seatunnel/excel" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format_type="excel" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf new file mode 100644 index 00000000000..b2697e82b99 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +source { + ObsFile { + path="/seatunnel/excel" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + result_table_name = "fake" + file_format_type = excel + delimiter = ; + read_columns = [c_string, c_boolean] + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf new file mode 100644 index 00000000000..45144959b0f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path="/seatunnel/excel" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + result_table_name = "fake" + file_format_type = excel + delimiter = ; + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf new file mode 100644 index 00000000000..1cd92373f3a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + ObsFile { + path = "/seatunnel/json" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "json" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf new file mode 100644 index 00000000000..76f746bcb2b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/json" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "json" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf new file mode 100644 index 00000000000..bb531a3c13c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + ObsFile { + path = "/seatunnel/orc" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "orc" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "zlib" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf new file mode 100644 index 00000000000..b89bed9a49d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/orc" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "orc" + read_columns = [c_string, c_boolean, c_double] + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf new file mode 100644 index 00000000000..4d5ab63f5ec --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/orc" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "parquet" + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf new file mode 100644 index 00000000000..bf696c24946 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + ObsFile { + path = "/seatunnel/parquet" + bucket = "obs://dc-for-test/seatunnel-test" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "parquet" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "gzip" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf new file mode 100644 index 00000000000..3ca1c801222 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/parquet" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "parquet" + read_columns = [c_string, c_boolean, c_double] + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf new file mode 100644 index 00000000000..67b0146efb9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/parquet" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "parquet" + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf new file mode 100644 index 00000000000..4b78f77d476 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + ObsFile { + path = "/seatunnel/text" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf new file mode 100644 index 00000000000..09853ce0672 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/text" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "text" + read_columns = [c_string, c_boolean, c_double] + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf new file mode 100644 index 00000000000..452fb79fd8e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/text" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "text" + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 4 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf new file mode 100644 index 00000000000..86742f67d42 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + ObsFile { + path = "/seatunnel/text" + bucket = "obs://obs-bucket-name" + access_key = "" + access_secret = "" + endpoint = "obs.xxxxxx.myhuaweicloud.com" + file_format_type = "text" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index bce1027664d..de9934d497a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -54,6 +54,7 @@ connector-hbase-e2e connector-maxcompute-e2e connector-rocketmq-e2e + connector-file-obs-e2e From c49aca99507d79b122339114b9d2e103879b2101 Mon Sep 17 00:00:00 2001 From: kim-up Date: Sun, 23 Apr 2023 17:10:40 +0800 Subject: [PATCH 08/11] [Connector-v2][obs] Update e2e --- .../e2e/connector/file/obs/ObsFileIT.java | 18 ++++++++++++++++-- .../excel/obs_excel_projection_to_assert.conf | 10 ++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java index d37cd53e352..c5a87959d60 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java @@ -28,7 +28,7 @@ import java.io.IOException; -@Disabled +@Disabled("Please testing it in your local environment with obs account conf") public class ObsFileIT extends TestSuiteBase { @TestTemplate @@ -37,16 +37,30 @@ public void testLocalFileReadAndWrite(TestContainer container) if (container instanceof Flink13Container) { return; } + // test write obs csv file + Container.ExecResult csvWriteResult = container.executeJob("/csv/fake_to_obs_csv.conf"); + Assertions.assertEquals(0, csvWriteResult.getExitCode(), csvWriteResult.getStderr()); + // test read obs csv file + Container.ExecResult csvReadResult = container.executeJob("/csv/obs_csv_to_assert.conf"); + Assertions.assertEquals(0, csvReadResult.getExitCode(), csvReadResult.getStderr()); + // test read obs csv file with projection + Container.ExecResult csvProjectionReadResult = + container.executeJob("/csv/obs_csv_projection_to_assert.conf"); + Assertions.assertEquals( + 0, csvProjectionReadResult.getExitCode(), csvProjectionReadResult.getStderr()); + // test write obs excel file Container.ExecResult excelWriteResult = container.executeJob("/excel/fake_to_obs_excel.conf"); Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); + // test read obs excel file Container.ExecResult excelReadResult = container.executeJob("/excel/obs_excel_to_assert.conf"); Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr()); + // test read obs excel file with projection Container.ExecResult excelProjectionReadResult = container.executeJob("/excel/obs_excel_projection_to_assert.conf"); Assertions.assertEquals( - 0, excelReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); // test write obs text file Container.ExecResult textWriteResult = container.executeJob("/text/fake_to_obs_file_text.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf index b2697e82b99..4ae33021fc5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf @@ -18,6 +18,16 @@ ###### This config file is a demonstration of streaming processing in seatunnel config ###### +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + source { ObsFile { path="/seatunnel/excel" From 802ca97bfab2b279fdba2241f18d335bfca1489c Mon Sep 17 00:00:00 2001 From: kim-up Date: Fri, 28 Jul 2023 15:45:28 +0800 Subject: [PATCH 09/11] [Connector-V2][file] fix pom --- seatunnel-connectors-v2/connector-file/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml index bc222fec804..bacbb5a9bb8 100644 --- a/seatunnel-connectors-v2/connector-file/pom.xml +++ b/seatunnel-connectors-v2/connector-file/pom.xml @@ -38,7 +38,6 @@ connector-file-base-hadoop connector-file-sftp connector-file-s3 - connector-file-oss-jindo connector-file-obs connector-file-jindo-oss From 05aa579209b0cd3b51e06f1704f3d1ab15a9a637 Mon Sep 17 00:00:00 2001 From: kim-up Date: Fri, 28 Jul 2023 15:46:46 +0800 Subject: [PATCH 10/11] [Connector-V2][file] fix pom --- seatunnel-dist/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 583f152e51c..7002b418a7a 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -514,7 +514,7 @@ ${project.version} provided - + org.apache.seatunnel connector-paimon From f33c57b2d9dd5e6a54c48c2f0853c46c69c9657d Mon Sep 17 00:00:00 2001 From: hailin0 Date: Sat, 15 Jun 2024 14:06:20 +0800 Subject: [PATCH 11/11] fix --- .../seatunnel/file/obs/config/ObsConfig.java | 4 ++-- .../file/obs/source/ObsFileSource.java | 23 ++++++++++++------- .../file/obs/source/ObsFileSourceFactory.java | 22 +++++++++--------- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java index b42e83dfc61..a4893f6c153 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java @@ -19,9 +19,9 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; -public class ObsConfig extends BaseSourceConfig { +public class ObsConfig extends BaseSourceConfigOptions { public static final Option ACCESS_KEY = Options.key("access_key") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java index 9668c0c12aa..cf3061a44a3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java @@ -23,11 +23,12 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; @@ -69,10 +70,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { readStrategy = ReadStrategyFactory.of(pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(ObsConfig.FILE_PATH.key()); hadoopConf = ObsConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); + String path = pluginConfig.getString(ObsConfig.FILE_PATH.key()); try { - filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); + filePaths = readStrategy.getFileNamesByPath(path); } catch (IOException e) { String errorMsg = String.format("Get file list from this path [%s] failed", path); throw new FileConnectorException( @@ -83,7 +85,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { FileFormat.valueOf( pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()).toUpperCase()); // only json text csv type support user-defined schema now - if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) { + if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) { switch (fileFormat) { case CSV: case TEXT: @@ -97,22 +99,27 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case ORC: case PARQUET: throw new FileConnectorException( - CommonErrorCode.UNSUPPORTED_OPERATION, + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "SeaTunnel does not support user-defined schema for [parquet, orc] files"); default: // never got in there throw new FileConnectorException( - CommonErrorCode.ILLEGAL_ARGUMENT, + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { - rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); + rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)); } catch (FileConnectorException e) { String errorMsg = String.format("Get table schema from file [%s] failed", filePaths.get(0)); throw new FileConnectorException( - CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e); + CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e); } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java index fe8ac7efd79..e1cd0ee97ba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java @@ -19,10 +19,10 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig; @@ -46,20 +46,20 @@ public OptionRule optionRule() { .required(ObsConfig.ACCESS_KEY) .required(ObsConfig.ACCESS_SECRET) .required(ObsConfig.ENDPOINT) - .required(BaseSourceConfig.FILE_FORMAT_TYPE) + .required(BaseSourceConfigOptions.FILE_FORMAT_TYPE) .conditional( - BaseSourceConfig.FILE_FORMAT_TYPE, + BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, - BaseSourceConfig.DELIMITER) + BaseSourceConfigOptions.FIELD_DELIMITER) .conditional( - BaseSourceConfig.FILE_FORMAT_TYPE, + BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), - CatalogTableUtil.SCHEMA) - .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH) - .optional(BaseSourceConfig.DATE_FORMAT) - .optional(BaseSourceConfig.DATETIME_FORMAT) - .optional(BaseSourceConfig.TIME_FORMAT) + TableSchemaOptions.SCHEMA) + .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) + .optional(BaseSourceConfigOptions.DATE_FORMAT) + .optional(BaseSourceConfigOptions.DATETIME_FORMAT) + .optional(BaseSourceConfigOptions.TIME_FORMAT) .build(); }