diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 707a0dc0dbd..e0ce1350bb9 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -17,12 +17,14 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table. | warehouse | String | Yes | - | Paimon warehouse path | | database | String | Yes | - | The database you want to access | | table | String | Yes | - | The table you want to access | -| hdfs_site_path | String | No | - | | +| hdfs_site_path | String | No | - | The path of hdfs-site.xml | | schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode | | data_save_mode | Enum | No | APPEND_DATA | The data save mode | | paimon.table.primary-keys | String | No | - | Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields) | | paimon.table.partition-keys | String | No | - | Default comma-separated list of partition fields to use when creating tables. | | paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions). | +| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf | +| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files | ## Examples @@ -57,6 +59,48 @@ sink { } ``` +### Single table(Specify hadoop HA config and kerberos config) + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + security.kerberos.login.principal = "your-kerberos-principal" + security.kerberos.login.keytab = "your-kerberos-keytab-path" + } + } +} +``` + ### Single table with write props of paimon ```hocon diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 306bc12b562..ef22add620d 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -17,12 +17,14 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。 | warehouse | 字符串 | 是 | - | Paimon warehouse路径 | | database | 字符串 | 是 | - | 数据库名称 | | table | 字符串 | 是 | - | 表名 | -| hdfs_site_path | 字符串 | 否 | - | | +| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | | schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | | data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | | paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | | paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 | | paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions) | +| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | +| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | ## 示例 @@ -57,6 +59,48 @@ sink { } ``` +### 单表(指定hadoop HA配置和kerberos配置) + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + security.kerberos.login.principal = "your-kerberos-principal" + security.kerberos.login.keytab = "your-kerberos-keytab-path" + } + } +} +``` + ### 指定paimon的写属性的单表 ```hocon diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index 499165ea6fb..bc0756ecdc4 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -30,7 +30,7 @@ SeaTunnel : Connectors V2 : Paimon - 0.6.0-incubating + 0.7.0-incubating @@ -46,6 +46,13 @@ ${paimon.version} + + org.apache.seatunnel + seatunnel-guava + ${project.version} + optional + + org.apache.seatunnel seatunnel-hadoop3-3.1.4-uber diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java index b8c8eb10880..9994df1d86f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java @@ -51,7 +51,9 @@ public OptionRule optionRule() { PaimonSinkConfig.DATA_SAVE_MODE, PaimonSinkConfig.PRIMARY_KEYS, PaimonSinkConfig.PARTITION_KEYS, - PaimonSinkConfig.WRITE_PROPS) + PaimonSinkConfig.WRITE_PROPS, + PaimonSinkConfig.HADOOP_CONF, + PaimonSinkConfig.HADOOP_CONF_PATH) .build(); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java index bec66dbe3f2..1c00e6dc3b8 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -17,11 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -31,31 +34,66 @@ import java.io.Serializable; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; @Slf4j public class PaimonCatalogLoader implements Serializable { - private PaimonSinkConfig config; + /** hdfs uri is required */ + private static final String HDFS_DEF_FS_NAME = "fs.defaultFS"; - public PaimonCatalogLoader(PaimonSinkConfig config) { - this.config = config; + private static final String HDFS_PREFIX = "hdfs://"; + /** ********* Hdfs constants ************* */ + private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + + private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; + + private String warehouse; + + private PaimonHadoopConfiguration paimonHadoopConfiguration; + + public PaimonCatalogLoader(PaimonSinkConfig paimonSinkConfig) { + this.warehouse = paimonSinkConfig.getWarehouse(); + this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig); } public Catalog loadCatalog() { // When using the seatunel engine, set the current class loader to prevent loading failures Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader()); - final String warehouse = config.getWarehouse(); - final Map optionsMap = new HashMap<>(); + final Map optionsMap = new HashMap<>(1); optionsMap.put(WAREHOUSE.key(), warehouse); final Options options = Options.fromMap(optionsMap); - final Configuration hadoopConf = new Configuration(); - String hdfsSitePathOptional = config.getHdfsSitePath(); - if (StringUtils.isNotBlank(hdfsSitePathOptional)) { - hadoopConf.addResource(new Path(hdfsSitePathOptional)); + if (warehouse.startsWith(HDFS_PREFIX)) { + checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME); + paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL); + } + PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); + final CatalogContext catalogContext = + CatalogContext.create(options, paimonHadoopConfiguration); + try { + return PaimonSecurityContext.runSecured( + () -> CatalogFactory.createCatalog(catalogContext)); + } catch (Exception e) { + throw new PaimonConnectorException( + PaimonConnectorErrorCode.LOAD_CATALOG, + "Failed to perform SecurityContext.runSecured", + e); + } + } + + void checkConfiguration(Configuration configuration, String key) { + Iterator> entryIterator = configuration.iterator(); + while (entryIterator.hasNext()) { + Map.Entry entry = entryIterator.next(); + if (entry.getKey().equals(key)) { + if (StringUtils.isBlank(entry.getValue())) { + throw new IllegalArgumentException("The value of" + key + " is required"); + } + return; + } } - final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); - return CatalogFactory.createCatalog(catalogContext); + throw new IllegalArgumentException(key + " is required"); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index 0396e6223af..01207570837 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -17,18 +17,31 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.config; +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import lombok.Getter; + import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.toList; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; /** * Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link * SeaTunnelSink}. */ +@Getter public class PaimonConfig implements Serializable { public static final Option WAREHOUSE = @@ -61,9 +74,54 @@ public class PaimonConfig implements Serializable { .noDefaultValue() .withDescription("The read columns of the flink table store"); + @Deprecated public static final Option HDFS_SITE_PATH = Options.key("hdfs_site_path") .stringType() .noDefaultValue() .withDescription("The file path of hdfs-site.xml"); + + public static final Option> HADOOP_CONF = + Options.key("paimon.hadoop.conf") + .mapType() + .defaultValue(new HashMap<>()) + .withDescription("Properties in hadoop conf"); + + public static final Option HADOOP_CONF_PATH = + Options.key("paimon.hadoop.conf-path") + .stringType() + .noDefaultValue() + .withDescription( + "The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files"); + + protected String catalogName; + protected String warehouse; + protected String namespace; + protected String table; + protected String hdfsSitePath; + protected Map hadoopConfProps; + protected String hadoopConfPath; + + public PaimonConfig(ReadonlyConfig readonlyConfig) { + this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME)); + this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE)); + this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE)); + this.table = checkArgumentNotNull(readonlyConfig.get(TABLE)); + this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); + this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF); + this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH); + } + + protected T checkArgumentNotNull(T argument) { + checkNotNull(argument); + return argument; + } + + @VisibleForTesting + public static List stringToList(String value, String regex) { + if (value == null || value.isEmpty()) { + return ImmutableList.of(); + } + return Arrays.stream(value.split(regex)).map(String::trim).collect(toList()); + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonHadoopConfiguration.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonHadoopConfiguration.java new file mode 100644 index 00000000000..71d30d0d21c --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonHadoopConfiguration.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.config; + +import org.apache.hadoop.conf.Configuration; + +import java.io.Serializable; + +/** Can serializable */ +public class PaimonHadoopConfiguration extends Configuration implements Serializable {} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java index d369c74bca9..b706e622f74 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java @@ -17,26 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.config; -import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; -import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SchemaSaveMode; -import lombok.Data; +import lombok.Getter; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import static java.util.stream.Collectors.toList; -import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; - -@Data +@Getter public class PaimonSinkConfig extends PaimonConfig { public static final Option SCHEMA_SAVE_MODE = Options.key("schema_save_mode") @@ -71,41 +64,18 @@ public class PaimonSinkConfig extends PaimonConfig { .withDescription( "Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)"); - private String catalogName; - private String warehouse; - private String namespace; - private String table; - private String hdfsSitePath; private SchemaSaveMode schemaSaveMode; private DataSaveMode dataSaveMode; - private Integer bucket; private List primaryKeys; private List partitionKeys; private Map writeProps; public PaimonSinkConfig(ReadonlyConfig readonlyConfig) { - this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME)); - this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE)); - this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE)); - this.table = checkArgumentNotNull(readonlyConfig.get(TABLE)); - this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); + super(readonlyConfig); this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE); this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE); this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ","); this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), ","); this.writeProps = readonlyConfig.get(WRITE_PROPS); } - - protected T checkArgumentNotNull(T argument) { - checkNotNull(argument); - return argument; - } - - @VisibleForTesting - public static List stringToList(String value, String regex) { - if (value == null || value.isEmpty()) { - return ImmutableList.of(); - } - return Arrays.stream(value.split(regex)).map(String::trim).collect(toList()); - } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java index 573f2db2de8..ef37e52c01e 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java @@ -24,7 +24,9 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode { TABLE_WRITE_COMMIT_FAILED("PAIMON-01", "Paimon write commit failed"), TABLE_WRITE_RECORD_FAILED("PAIMON-02", "Write record to paimon failed"), TABLE_PRE_COMMIT_FAILED("PAIMON-03", "Paimon pre commit failed"), - GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"); + GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"), + AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"), + LOAD_CATALOG("PAIMON-06", "Load catalog failed"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/security/PaimonSecurityContext.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/security/PaimonSecurityContext.java new file mode 100644 index 00000000000..88cad4a7f60 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/security/PaimonSecurityContext.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.security; + +import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; + +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.options.Options; +import org.apache.paimon.security.SecurityConfiguration; +import org.apache.paimon.security.SecurityContext; + +import lombok.extern.slf4j.Slf4j; +import sun.security.krb5.Config; +import sun.security.krb5.KrbException; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +@Slf4j +public class PaimonSecurityContext extends SecurityContext { + private static final String KRB5_CONF_KEY = "java.security.krb5.conf"; + private static final String FS_DISABLE_CACHE = "fs.hdfs.impl.disable.cache"; + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static void shouldEnableKerberos(Configuration configuration) { + String kerberosPrincipal = + configuration.get(SecurityConfiguration.KERBEROS_LOGIN_PRINCIPAL.key()); + String kerberosKeytabFilePath = + configuration.get(SecurityConfiguration.KERBEROS_LOGIN_KEYTAB.key()); + if (StringUtils.isNotBlank(kerberosPrincipal) + && StringUtils.isNotBlank(kerberosKeytabFilePath)) { + configuration.set("hadoop.security.authentication", "kerberos"); + PaimonSecurityContext.verifyKerberosAuthentication(configuration); + } + } + + /** + * Loading Hadoop configuration by hadoop conf path or props set by paimon.hadoop.conf + * + * @return + */ + public static PaimonHadoopConfiguration loadHadoopConfig(PaimonConfig paimonConfig) { + PaimonHadoopConfiguration configuration = new PaimonHadoopConfiguration(); + String hdfsSitePath = paimonConfig.getHdfsSitePath(); + if (StringUtils.isNotBlank(hdfsSitePath)) { + configuration.addResource(new Path(hdfsSitePath)); + } + String hadoopConfPath = paimonConfig.getHadoopConfPath(); + if (StringUtils.isNotBlank(hadoopConfPath)) { + HADOOP_CONF_FILES.forEach( + confFile -> { + java.nio.file.Path path = Paths.get(hadoopConfPath, confFile); + if (Files.exists(path)) { + try { + configuration.addResource(path.toUri().toURL()); + } catch (IOException e) { + log.warn( + "Error adding Hadoop resource {}, resource was not added", + path, + e); + } + } + }); + } + paimonConfig.getHadoopConfProps().forEach((k, v) -> configuration.set(k, v)); + // This configuration is enabled to avoid affecting other hadoop filesystem jobs + // refer: + // org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy.createConfiguration + configuration.setBoolean(FS_DISABLE_CACHE, true); + log.info("Hadoop config initialized: {}", configuration.getClass().getName()); + return configuration; + } + + /** + * Check if we need to verify kerberos authentication + * + * @param configuration + */ + public static void verifyKerberosAuthentication(Configuration configuration) { + String principalKey = SecurityConfiguration.KERBEROS_LOGIN_PRINCIPAL.key(); + String keytabKey = SecurityConfiguration.KERBEROS_LOGIN_KEYTAB.key(); + String kerberosPrincipal = configuration.get(principalKey); + String kerberosKeytabFilePath = configuration.get(keytabKey); + String krb5Conf = configuration.get(KRB5_CONF_KEY); + Options options = new Options(); + options.set(principalKey, kerberosPrincipal); + options.set(keytabKey, kerberosKeytabFilePath); + String ticketCacheKey = SecurityConfiguration.KERBEROS_LOGIN_USETICKETCACHE.key(); + boolean ticketCache = + configuration.getBoolean( + ticketCacheKey, + SecurityConfiguration.KERBEROS_LOGIN_USETICKETCACHE.defaultValue()); + options.set(ticketCacheKey, String.valueOf(ticketCache)); + try { + CatalogContext catalogContext = CatalogContext.create(options, configuration); + if (StringUtils.isNotBlank(krb5Conf)) { + reloadKrb5conf(krb5Conf); + } + // refer: https://paimon.apache.org/docs/master/filesystems/hdfs/#kerberos. + // If the keytab is blank or principal is blank or keytabFile is not exists, the method + // of install will not perform kerberos authentication without any exception. + install(catalogContext); + } catch (Exception e) { + throw new PaimonConnectorException( + PaimonConnectorErrorCode.AUTHENTICATE_KERBEROS_FAILED, + "Failed to login user from keytab : " + + kerberosKeytabFilePath + + " and kerberos principal : " + + kerberosPrincipal, + e); + } + } + + private static void reloadKrb5conf(String krb5conf) { + System.setProperty(KRB5_CONF_KEY, krb5conf); + try { + Config.refresh(); + KerberosName.resetDefaultRealm(); + } catch (KrbException e) { + log.warn( + "resetting default realm failed, current default realm will still be used.", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index cdec4b0c760..bc96fdcd78e 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -32,9 +32,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler; +import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; @@ -74,11 +76,14 @@ public class PaimonSink private CatalogTable catalogTable; + private PaimonHadoopConfiguration paimonHadoopConfiguration; + public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { this.readonlyConfig = readonlyConfig; this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig); this.catalogTable = catalogTable; this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig); } @Override @@ -89,19 +94,22 @@ public String getPluginName() { @Override public SinkWriter createWriter( SinkWriter.Context context) throws IOException { - return new PaimonSinkWriter(context, table, seaTunnelRowType, jobContext); + return new PaimonSinkWriter( + context, table, seaTunnelRowType, jobContext, paimonHadoopConfiguration); } @Override public Optional> createAggregatedCommitter() throws IOException { - return Optional.of(new PaimonAggregatedCommitter(table, jobContext)); + return Optional.of( + new PaimonAggregatedCommitter(table, jobContext, paimonHadoopConfiguration)); } @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { - return new PaimonSinkWriter(context, table, seaTunnelRowType, states, jobContext); + return new PaimonSinkWriter( + context, table, seaTunnelRowType, states, jobContext, paimonHadoopConfiguration); } @Override diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index 2f5b316dd56..dd656cd8ceb 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -56,7 +56,9 @@ public OptionRule optionRule() { PaimonSinkConfig.DATA_SAVE_MODE, PaimonSinkConfig.PRIMARY_KEYS, PaimonSinkConfig.PARTITION_KEYS, - PaimonSinkConfig.WRITE_PROPS) + PaimonSinkConfig.WRITE_PROPS, + PaimonConfig.HADOOP_CONF, + PaimonConfig.HADOOP_CONF_PATH) .build(); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index a858c3ee7f1..88b3c1fa17d 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -22,8 +22,10 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil; @@ -82,7 +84,8 @@ public PaimonSinkWriter( Context context, Table table, SeaTunnelRowType seaTunnelRowType, - JobContext jobContext) { + JobContext jobContext, + PaimonHadoopConfiguration paimonHadoopConfiguration) { this.table = table; this.tableWriteBuilder = JobContextUtil.isBatchJob(jobContext) @@ -93,6 +96,7 @@ public PaimonSinkWriter( this.context = context; this.jobContext = jobContext; this.tableSchema = ((FileStoreTable) table).schema(); + PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); } public PaimonSinkWriter( @@ -100,8 +104,9 @@ public PaimonSinkWriter( Table table, SeaTunnelRowType seaTunnelRowType, List states, - JobContext jobContext) { - this(context, table, seaTunnelRowType, jobContext); + JobContext jobContext, + PaimonHadoopConfiguration paimonHadoopConfiguration) { + this(context, table, seaTunnelRowType, jobContext, paimonHadoopConfiguration); if (Objects.isNull(states) || states.isEmpty()) { return; } @@ -131,7 +136,11 @@ public PaimonSinkWriter( public void write(SeaTunnelRow element) throws IOException { InternalRow rowData = RowConverter.reconvert(element, seaTunnelRowType, tableSchema); try { - tableWrite.write(rowData); + PaimonSecurityContext.runSecured( + () -> { + tableWrite.write(rowData); + return null; + }); } catch (Exception e) { throw new PaimonConnectorException( PaimonConnectorErrorCode.TABLE_WRITE_RECORD_FAILED, diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java index 2c0be5d4241..2135a328b05 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java @@ -20,8 +20,10 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil; import org.apache.paimon.operation.Lock; @@ -55,12 +57,16 @@ public class PaimonAggregatedCommitter private final JobContext jobContext; - public PaimonAggregatedCommitter(Table table, JobContext jobContext) { + public PaimonAggregatedCommitter( + Table table, + JobContext jobContext, + PaimonHadoopConfiguration paimonHadoopConfiguration) { this.jobContext = jobContext; this.tableWriteBuilder = JobContextUtil.isBatchJob(jobContext) ? table.newBatchWriteBuilder() : table.newStreamWriteBuilder(); + PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); } @Override @@ -73,14 +79,18 @@ public List commit( .flatMap(List::stream) .flatMap(List::stream) .collect(Collectors.toList()); - if (JobContextUtil.isBatchJob(jobContext)) { - log.debug("Trying to commit states batch mode"); - ((BatchTableCommit) tableCommit).commit(fileCommittables); - } else { - log.debug("Trying to commit states streaming mode"); - ((StreamTableCommit) tableCommit) - .commit(Objects.hash(fileCommittables), fileCommittables); - } + PaimonSecurityContext.runSecured( + () -> { + if (JobContextUtil.isBatchJob(jobContext)) { + log.debug("Trying to commit states batch mode"); + ((BatchTableCommit) tableCommit).commit(fileCommittables); + } else { + log.debug("Trying to commit states streaming mode"); + ((StreamTableCommit) tableCommit) + .commit(Objects.hash(fileCommittables), fileCommittables); + } + return null; + }); } catch (Exception e) { throw new PaimonConnectorException( PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index fe1c24da80a..cb45edac739 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -47,11 +47,11 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.DateTimeUtils; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -283,7 +283,8 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn objects[i] = rowData.getBinary(i); break; case DATE: - objects[i] = rowData.getTimestamp(i, 3).toLocalDateTime().toLocalDate(); + int dateInt = rowData.getInt(i); + objects[i] = DateTimeUtils.toLocalDate(dateInt); break; case TIMESTAMP: // Now SeaTunnel not supported assigned the timezone for timestamp, @@ -391,9 +392,8 @@ public static InternalRow reconvert( break; case DATE: LocalDate date = (LocalDate) seaTunnelRow.getField(i); - LocalTime time = LocalTime.of(0, 0, 0); - binaryWriter.writeTimestamp( - i, Timestamp.fromLocalDateTime(date.atTime(time)), 3); + BinaryWriter.createValueSetter(DataTypes.DATE()) + .setValue(binaryWriter, i, DateTimeUtils.toInternal(date)); break; case TIMESTAMP: String fieldName = seaTunnelRowType.getFieldName(i); diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java index eec61aea6df..fd0bf333816 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java @@ -42,6 +42,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -50,7 +51,6 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -184,8 +184,7 @@ public void before() { binaryRowWriter.writeString(7, BinaryString.fromString(string)); binaryRowWriter.writeBinary(8, bytes); binaryRowWriter.writeBoolean(9, booleanValue); - binaryRowWriter.writeTimestamp( - 10, Timestamp.fromLocalDateTime(LocalDateTime.of(date, LocalTime.of(0, 0, 0))), 3); + binaryRowWriter.writeInt(10, DateTimeUtils.toInternal(date)); binaryRowWriter.writeTimestamp(11, Timestamp.fromLocalDateTime(timestamp), 6); BinaryArray binaryArray = new BinaryArray(); BinaryArrayWriter binaryArrayWriter = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java new file mode 100644 index 00000000000..13dcd3d675e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.paimon.data.Timestamp; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PaimonRecord { + public Long pkId; + public String name; + public String dt; + public Timestamp oneTime; + public Timestamp twoTime; + public Timestamp threeTime; + public Timestamp fourTime; + public Integer oneDate; + + public PaimonRecord(Long pkId, String name) { + this.pkId = pkId; + this.name = name; + } + + public PaimonRecord(Long pkId, String name, String dt) { + this(pkId, name); + this.dt = dt; + } + + public PaimonRecord(Long pkId, String name, Integer oneDate) { + this(pkId, name); + this.oneDate = oneDate; + } + + public PaimonRecord( + Long pkId, + String name, + Timestamp oneTime, + Timestamp twoTime, + Timestamp threeTime, + Timestamp fourTime) { + this(pkId, name); + this.oneTime = oneTime; + this.twoTime = twoTime; + this.threeTime = threeTime; + this.fourTime = fourTime; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index d2d88c1dbcc..5bec4cd41cd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -32,7 +32,6 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.Timestamp; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -41,7 +40,9 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DateType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.DateTimeUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -49,12 +50,10 @@ import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -373,6 +372,57 @@ public void testFakeCDCSinkPaimonWithTimestampN(TestContainer container) throws }); } + @TestTemplate + public void testFakeSinkPaimonWithDate(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case8.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // copy paimon to local + container.executeExtraCommands(containerExtendedFactory); + FileStoreTable table = + (FileStoreTable) getTable("seatunnel_namespace8", TARGET_TABLE); + List fields = table.schema().fields(); + for (DataField field : fields) { + if (field.name().equalsIgnoreCase("one_date")) { + Assertions.assertTrue(field.type() instanceof DateType); + } + } + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List result = new ArrayList<>(); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> + result.add( + new PaimonRecord( + row.getLong(0), + row.getString(1).toString(), + row.getInt(2)))); + } + Assertions.assertEquals(3, result.size()); + for (PaimonRecord paimonRecord : result) { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals( + paimonRecord.oneDate, + DateTimeUtils.toInternal( + LocalDate.parse("2024-03-20"))); + } else { + Assertions.assertEquals( + paimonRecord.oneDate, + DateTimeUtils.toInternal( + LocalDate.parse("2024-03-10"))); + } + } + }); + } + protected final ContainerExtendedFactory containerExtendedFactory = container -> { FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); @@ -456,41 +506,4 @@ private Catalog getCatalog() { Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); return catalog; } - - @Data - @NoArgsConstructor - @AllArgsConstructor - public class PaimonRecord { - private Long pkId; - private String name; - private String dt; - private Timestamp oneTime; - private Timestamp twoTime; - private Timestamp threeTime; - private Timestamp fourTime; - - public PaimonRecord(Long pkId, String name) { - this.pkId = pkId; - this.name = name; - } - - public PaimonRecord(Long pkId, String name, String dt) { - this(pkId, name); - this.dt = dt; - } - - public PaimonRecord( - Long pkId, - String name, - Timestamp oneTime, - Timestamp twoTime, - Timestamp threeTime, - Timestamp fourTime) { - this(pkId, name); - this.oneTime = oneTime; - this.twoTime = twoTime; - this.threeTime = threeTime; - this.fourTime = fourTime; - } - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java new file mode 100644 index 00000000000..72301bcf9d7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}) +@Disabled( + "HDFS is not available in CI, if you want to run this test, please set up your own HDFS environment in the test case file and the below setup") +public class PaimonSinkHdfsIT extends TestSuiteBase { + private Map PAIMON_SINK_PROPERTIES; + + @BeforeAll + public void setup() { + Map map = new HashMap<>(); + map.put("warehouse", "hdfs:///tmp/paimon"); + map.put("database", "seatunnel_namespace1"); + map.put("table", "st_test"); + Map paimonHadoopConf = new HashMap<>(); + paimonHadoopConf.put("fs.defaultFS", "hdfs://nameservice1"); + paimonHadoopConf.put("dfs.nameservices", "nameservice1"); + paimonHadoopConf.put("dfs.ha.namenodes.nameservice1", "nn1,nn2"); + paimonHadoopConf.put("dfs.namenode.rpc-address.nameservice1.nn1", "hadoop03:8020"); + paimonHadoopConf.put("dfs.namenode.rpc-address.nameservice1.nn2", "hadoop04:8020"); + paimonHadoopConf.put( + "dfs.client.failover.proxy.provider.nameservice1", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + paimonHadoopConf.put("dfs.client.use.datanode.hostname", "true"); + map.put("paimon.hadoop.conf", paimonHadoopConf); + this.PAIMON_SINK_PROPERTIES = map; + } + + @TestTemplate + public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { + Container.ExecResult execResult = + container.executeJob("/fake_cdc_sink_paimon_with_hdfs_ha.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(200L, TimeUnit.MILLISECONDS) + .atMost(40L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + PaimonSinkConfig paimonSinkConfig = + new PaimonSinkConfig( + ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES)); + PaimonCatalogLoader paimonCatalogLoader = + new PaimonCatalogLoader(paimonSinkConfig); + Catalog catalog = paimonCatalogLoader.loadCatalog(); + Table table = + catalog.getTable( + Identifier.create("seatunnel_namespace1", "st_test")); + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List paimonRecords = new ArrayList<>(); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> + paimonRecords.add( + new PaimonRecord( + row.getLong(0), + row.getString(1).toString()))); + } + Assertions.assertEquals(2, paimonRecords.size()); + paimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + }); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf new file mode 100644 index 00000000000..2fc4910e98a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + columns = [ + { + name = pk_id + type = bigint + nullable = false + comment = "primary key id" + }, + { + name = name + type = "string" + nullable = true + comment = "name" + }, + { + name = one_date + type = date + nullable = false + comment = "one date" + } + ] + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", "2024-03-10"] + }, + { + kind = INSERT + fields = [2, "B", "2024-03-10"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10"] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", "2024-03-10"] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", "2024-03-20"] + } + ] + } +} + +transform { + +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "seatunnel_namespace8" + table = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf new file mode 100644 index 00000000000..7e5fd6da948 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + schema_save_mode = "RECREATE_SCHEMA" + catalog_name="seatunnel_test" + warehouse="hdfs:///tmp/paimon" + database="seatunnel_namespace1" + table="st_test" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +}