diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index c9e4b3a9b61..68c0755cfd3 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -61,6 +61,13 @@ All `changelog-producer` modes are currently supported. The default is `none`. > note: > When you use a streaming mode to read paimon table,different mode will produce [different results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。 +## Filesystems +The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3. +If you use the s3 filesystem. You can configure the `fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option. +Besides, the warehouse should start with `s3a://`. + + + ## Examples ### Single table @@ -94,6 +101,53 @@ sink { } ``` +### Single table with s3 filesystem + +```hocon +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } +} + +sink { + Paimon { + warehouse = "s3a://test/" + database = "seatunnel_namespace11" + table = "st_test" + paimon.hadoop.conf = { + fs.s3a.access-key=G52pnxg67819khOZ9ezX + fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF + fs.s3a.endpoint="http://minio4:9000" + fs.s3a.path.style.access=true + fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + } + } +} +``` + ### Single table(Specify hadoop HA config and kerberos config) ```hocon diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index e586a4fd9d8..cbe3b592f8b 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -82,6 +82,11 @@ Properties in hadoop conf The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files +## Filesystems +The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3. +If you use the s3 filesystem. You can configure the `fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option. +Besides, the warehouse should start with `s3a://`. + ## Examples ### Simple example @@ -109,6 +114,33 @@ source { } ``` +### S3 example +```hocon +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "s3a://test/" + database = "seatunnel_namespace11" + table = "st_test" + paimon.hadoop.conf = { + fs.s3a.access-key=G52pnxg67819khOZ9ezX + fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF + fs.s3a.endpoint="http://minio4:9000" + fs.s3a.path.style.access=true + fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + } + } +} + +sink { + Console{} +} +``` + ### Hadoop conf example ```hocon diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 375c8c90caf..09f4e63fbfc 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -58,7 +58,12 @@ Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/mast * [`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup) * [`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction) > 注意: - > 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。 +> 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。 + +## 文件系统 +Paimon连接器支持向多文件系统写入数据。目前支持的文件系统有hdfs和s3。 +如果您使用s3文件系统。您可以配置`fs.s3a.access-key `, `fs.s3a.secret-key`, `fs.s3a.endpoint`, `fs.s3a.path.style.access`, `fs.s3a.aws.credentials`。在`paimon.hadoop.conf`选项中设置提供程序的属性。 +除此之外,warehouse应该以`s3a://`开头。 ## 示例 @@ -93,6 +98,53 @@ sink { } ``` +### 单表(基于S3文件系统) + +```hocon +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } +} + +sink { + Paimon { + warehouse = "s3a://test/" + database = "seatunnel_namespace11" + table = "st_test" + paimon.hadoop.conf = { + fs.s3a.access-key=G52pnxg67819khOZ9ezX + fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF + fs.s3a.endpoint="http://minio4:9000" + fs.s3a.path.style.access=true + fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + } + } +} +``` + ### 单表(指定hadoop HA配置和kerberos配置) ```hocon diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index 80934e68a2b..0cd3f535d0b 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -32,6 +32,7 @@ 0.7.0-incubating 2.3.9 + connector.paimon @@ -47,6 +48,12 @@ ${paimon.version} + + org.apache.paimon + paimon-s3-impl + ${paimon.version} + + org.apache.seatunnel seatunnel-guava @@ -98,4 +105,31 @@ + + + + org.apache.maven.plugins + maven-shade-plugin + + + + shade + + package + + + + org.apache.paimon:paimon-s3-impl + + org/apache/hadoop/** + + + + + + + + + + diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java index 774576c408f..ae1f6d675a4 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -44,6 +44,7 @@ public class PaimonCatalogLoader implements Serializable { private static final String HDFS_DEF_FS_NAME = "fs.defaultFS"; private static final String HDFS_PREFIX = "hdfs://"; + private static final String S3A_PREFIX = "s3a://"; /** ********* Hdfs constants ************* */ private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; @@ -63,7 +64,7 @@ public PaimonCatalogLoader(PaimonConfig paimonConfig) { } public Catalog loadCatalog() { - // When using the seatunel engine, set the current class loader to prevent loading failures + // When using the seatunnel engine, set the current class loader to prevent loading failures Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader()); final Map optionsMap = new HashMap<>(1); optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse); @@ -71,12 +72,12 @@ public Catalog loadCatalog() { if (warehouse.startsWith(HDFS_PREFIX)) { checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME); paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL); + } else if (warehouse.startsWith(S3A_PREFIX)) { + optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY)); } if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) { optionsMap.put(CatalogOptions.URI.key(), catalogUri); - paimonHadoopConfiguration - .getPropsWithPrefix(StringUtils.EMPTY) - .forEach((k, v) -> optionsMap.put(k, v)); + optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY)); } final Options options = Options.fromMap(optionsMap); PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java new file mode 100644 index 00000000000..915070c8eac --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.filesystem; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOLoader; +import org.apache.paimon.fs.Path; +import org.apache.paimon.s3.S3FileIO; + +import java.util.ArrayList; +import java.util.List; + +public class S3Loader implements FileIOLoader { + @Override + public String getScheme() { + return "s3a"; + } + + @Override + public List requiredOptions() { + List options = new ArrayList<>(); + options.add(new String[] {"fs.s3a.access-key", "fs.s3a.access.key"}); + options.add(new String[] {"fs.s3a.secret-key", "fs.s3a.secret.key"}); + options.add(new String[] {"fs.s3a.endpoint", "fs.s3a.endpoint"}); + return options; + } + + @Override + public FileIO load(Path path) { + return new S3FileIO(); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader new file mode 100644 index 00000000000..0057f404259 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.seatunnel.connectors.seatunnel.paimon.filesystem.S3Loader diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml index 69ea9a9f74f..71784966f81 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml @@ -25,17 +25,32 @@ connector-paimon-e2e SeaTunnel : E2E : Connector V2 : Paimon + + 1.19.1 + 8.5.6 + + + - org.apache.seatunnel - connector-fake - ${project.version} + org.testcontainers + minio + ${testcontainer.version} test + + io.minio + minio + ${minio.version} + test + + org.apache.seatunnel - connector-paimon + connector-seatunnel-e2e-base ${project.version} + tests + test-jar test @@ -44,6 +59,18 @@ optional test + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-paimon + ${project.version} + test + org.apache.seatunnel connector-assert diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java new file mode 100644 index 00000000000..2df1a5e49b2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MinIOContainer; + +import io.minio.BucketExistsArgs; +import io.minio.MakeBucketArgs; +import io.minio.MinioClient; + +import java.nio.file.Paths; +import java.util.Map; + +public class PaimonWithS3IT extends SeaTunnelContainer { + + private static final String MINIO_DOCKER_IMAGE = "minio/minio:RELEASE.2024-06-13T22-53-53Z"; + private static final String HOST = "minio"; + private static final int MINIO_PORT = 9000; + private static final String MINIO_USER_NAME = "minio"; + private static final String MINIO_USER_PASSWORD = "miniominio"; + + private static final String BUCKET = "test"; + + private MinIOContainer container; + private MinioClient minioClient; + + private Map PAIMON_SINK_PROPERTIES; + + protected static final String AWS_SDK_DOWNLOAD = + "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar"; + protected static final String HADOOP_AWS_DOWNLOAD = + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar"; + + @Override + @BeforeAll + public void startUp() throws Exception { + container = + new MinIOContainer(MINIO_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withUserName(MINIO_USER_NAME) + .withPassword(MINIO_USER_PASSWORD) + .withExposedPorts(MINIO_PORT); + container.start(); + + String s3URL = container.getS3URL(); + + // configuringClient + minioClient = + MinioClient.builder() + .endpoint(s3URL) + .credentials(container.getUserName(), container.getPassword()) + .build(); + + // create bucket + minioClient.makeBucket(MakeBucketArgs.builder().bucket(BUCKET).build()); + + BucketExistsArgs existsArgs = BucketExistsArgs.builder().bucket(BUCKET).build(); + Assertions.assertTrue(minioClient.bucketExists(existsArgs)); + super.startUp(); + } + + @Override + @AfterAll + public void tearDown() throws Exception { + super.tearDown(); + if (container != null) { + container.close(); + } + } + + @Override + protected String[] buildStartCommand() { + return new String[] { + "bash", + "-c", + "wget -P " + + SEATUNNEL_HOME + + "lib " + + AWS_SDK_DOWNLOAD + + " &&" + + "wget -P " + + SEATUNNEL_HOME + + "lib " + + HADOOP_AWS_DOWNLOAD + + " &&" + + ContainerUtil.adaptPathForWin( + Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()) + }; + } + + @Override + protected boolean isIssueWeAlreadyKnow(String threadName) { + return super.isIssueWeAlreadyKnow(threadName) + // Paimon with s3 + || threadName.startsWith("s3a-transfer"); + } + + @Test + public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception { + Container.ExecResult execResult = executeSeaTunnelJob("/fake_to_paimon_with_s3.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + Container.ExecResult readResult = executeSeaTunnelJob("/paimon_with_s3_to_assert.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf new file mode 100644 index 00000000000..a379a638ebf --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + warehouse = "s3a://test/" + database = "seatunnel_namespace11" + table = "st_test" + paimon.hadoop.conf = { + fs.s3a.access-key=minio + fs.s3a.secret-key=miniominio + fs.s3a.endpoint="http://minio:9000" + fs.s3a.path.style.access=true + fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf new file mode 100644 index 00000000000..6684b5fa954 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "s3a://test/" + database = "seatunnel_namespace11" + table = "st_test" + paimon.hadoop.conf = { + fs.s3a.access-key=minio + fs.s3a.secret-key=miniominio + fs.s3a.endpoint="http://minio:9000" + fs.s3a.path.style.access=true + fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 2 + } + ], + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + ], + field_rules = [ + { + field_name = pk_id + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 1 + }, + { + rule_type = MAX + rule_value = 3 + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = score + field_type = int + field_value = [ + { + rule_type = NOT_NULL + equals_to = 100 + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index 5fa5abb7ed4..b9ff54b6c34 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -439,7 +439,7 @@ private Map getThreadClassLoader() throws IOException { } /** The thread should be recycled but not, we should fix it in the future. */ - private boolean isIssueWeAlreadyKnow(String threadName) { + protected boolean isIssueWeAlreadyKnow(String threadName) { // ClickHouse com.clickhouse.client.ClickHouseClientBuilder return threadName.startsWith("ClickHouseClientWorker") // InfluxDB okio.AsyncTimeout$Watchdog