diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveJDBCCatalog.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveJDBCCatalog.java index 22886023c16..6644297e9e3 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveJDBCCatalog.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveJDBCCatalog.java @@ -98,7 +98,9 @@ public void open() throws CatalogException { @Override public void close() throws CatalogException { try { - connection.close(); + if (connection != null) { + connection.close(); + } } catch (SQLException e) { throw new CatalogException(e); } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 7988999e162..cdfe1b83cfb 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -245,7 +245,6 @@ public Optional getSaveModeHandler() { return Optional.empty(); } HiveJDBCCatalog catalog = new HiveJDBCCatalog(readonlyConfig); - catalog.open(); return Optional.of( new DefaultSaveModeHandler( readonlyConfig.get(HiveSinkOptions.SCHEMA_SAVE_MODE), diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java index 1e7ea8f1e04..6687eec7efc 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java @@ -36,6 +36,8 @@ import com.google.auto.service.AutoService; import java.util.Arrays; +import java.util.Collections; +import java.util.List; @AutoService(Factory.class) public class HiveSinkFactory @@ -84,6 +86,12 @@ private CatalogTable generateCatalogTable( String databaseName = fullTableName.split("\\.")[0]; String tableName = fullTableName.split("\\.")[1]; TableIdentifier newTableId = TableIdentifier.of("Hive", databaseName, null, tableName); + return CatalogTable.of(newTableId, catalogTable); } + + @Override + public List excludeTablePlaceholderReplaceKeys() { + return Collections.singletonList("save_mode_create_template"); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveSaveModeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveSaveModeIT.java index 66239e04673..e09dd89c790 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveSaveModeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveSaveModeIT.java @@ -17,11 +17,210 @@ package org.apache.seatunnel.e2e.connector.hive; -import org.junit.jupiter.api.Disabled; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + @Slf4j -@Disabled( - "Need both start hive metastore and hive server2, you can start the docker-compose in your local env, and run the test within example module.") -public class HiveSaveModeIT {} +@DisabledOnContainer( + value = {}, + type = {EngineType.FLINK, EngineType.SPARK}, + disabledReason = "") +public class HiveSaveModeIT extends TestSuiteBase implements TestResource { + + private static final String HIVE_IMAGE = "apache/hive:3.1.3"; + private static final int THRIFT_PORT = 9083; + private static final int JDBC_PORT = 10000; + private GenericContainer metastore; + private GenericContainer hive2; + private String jdbcUrl; + private String hmsUrl; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + // The jar of hive-exec + Container.ExecResult extraCommands = + container.execInContainer( + "sh", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Hive/lib && cd /tmp/seatunnel/plugins/Hive/lib " + + "&& wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar " + + "&& wget https://repo1.maven.org/maven2/org/apache/hive/hive-service/3.1.3/hive-service-3.1.3.jar " + + "&& wget https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3.jar " + + "&& wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar"); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + metastore = + new GenericContainer<>(HIVE_IMAGE) + .withExposedPorts(THRIFT_PORT) + .withNetwork(NETWORK) + .withNetworkAliases("metastore") + .withEnv("SERVICE_NAME", "metastore") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(HIVE_IMAGE))); + metastore.setPortBindings( + Lists.newArrayList(String.format("%s:%s", THRIFT_PORT, THRIFT_PORT))); + Startables.deepStart(Stream.of(metastore)).join(); + + hive2 = + new GenericContainer<>(HIVE_IMAGE) + .withExposedPorts(JDBC_PORT) + .withNetwork(NETWORK) + .withNetworkAliases("hiveserver2") + .withEnv("SERVICE_NAME", "hiveserver2") + .withEnv( + "SERVICE_OPTS", + "-Dhive.metastore.uris=thrift://" + + InetAddress.getLocalHost().getHostAddress() + + ":9083") + .withEnv("IS_RESUME", "true") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(HIVE_IMAGE))) + .waitingFor( + Wait.forListeningPort() + .withStartupTimeout(Duration.ofSeconds(180))); + hive2.setPortBindings(Lists.newArrayList(String.format("%s:%s", JDBC_PORT, JDBC_PORT))); + Startables.deepStart(Stream.of(hive2)).join(); + createTable( + InetAddress.getLocalHost().getHostAddress(), + "10000", + "default", + "hive_jdbc_example2"); + changeConnectionURLConf("src/test/resources/savemode/fake_to_hive_create_table.conf"); + changeConnectionURLConf("src/test/resources/savemode/fake_to_hive_re_create_table.conf"); + changeConnectionURLConf("src/test/resources/savemode/fake_to_hive_table_not_exist.conf"); + changeConnectionURLConf("src/test/resources/savemode/hive1_to_assert.conf"); + changeConnectionURLConf("src/test/resources/savemode/hive2_to_assert.conf"); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (metastore != null) { + metastore.close(); + } + if (hive2 != null) { + hive2.close(); + } + } + + private void createTable(String host, String port, String db, String tableName) + throws SQLException { + String jdbcUrl = "jdbc:hive2://" + host + ":" + port + "/" + db; + String ddl = + "CREATE TABLE " + + tableName + + "(" + + " int_column INT," + + " integer_column INTEGER," + + " bigint_column BIGINT," + + " smallint_column SMALLINT," + + " tinyint_column TINYINT," + + " double_column DOUBLE," + + " double_PRECISION_column DOUBLE PRECISION," + + " float_column FLOAT," + + " string_column STRING," + + " char_column CHAR(10)," + + " varchar_column VARCHAR(20)," + + " boolean_column BOOLEAN," + + " date_column DATE," + + " timestamp_column TIMESTAMP," + + " decimal_column DECIMAL(10, 2)," + + " numeric_column NUMERIC(10, 2)" + + ")"; + Connection connection = DriverManager.getConnection(jdbcUrl); + Statement statement = connection.createStatement(); + statement.execute(ddl); + } + + private void changeConnectionURLConf(String resourceFilePath) throws UnknownHostException { + jdbcUrl = "jdbc:hive2://" + InetAddress.getLocalHost().getHostAddress() + ":10000/default"; + hmsUrl = "thrift://" + InetAddress.getLocalHost().getHostAddress() + ":9083"; + Path path = Paths.get(resourceFilePath); + try { + List lines = Files.readAllLines(path); + List newLines = + lines.stream() + .map( + line -> { + if (line.contains("hive_jdbc_url")) { + return " hive_jdbc_url = " + "\"" + jdbcUrl + "\""; + } + if (line.contains("metastore_uri")) { + return " metastore_uri = " + "\"" + hmsUrl + "\""; + } + return line; + }) + .collect(Collectors.toList()); + Files.write(path, newLines); + log.info("Conf has been updated successfully."); + } catch (IOException e) { + throw new RuntimeException("Change conf error", e); + } + } + + @TestTemplate + public void testCreateTable(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/savemode/fake_to_hive_create_table.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Container.ExecResult checkJobRes = container.executeJob("/savemode/hive1_to_assert.conf"); + Assertions.assertEquals(0, checkJobRes.getExitCode()); + } + + @TestTemplate + public void testReCreateTable(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/savemode/fake_to_hive_re_create_table.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Container.ExecResult checkJobRes = container.executeJob("/savemode/hive2_to_assert.conf"); + Assertions.assertEquals(0, checkJobRes.getExitCode()); + } + + @TestTemplate + public void testTableExist(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/savemode/fake_to_hive_table_not_exist.conf"); + Assertions.assertNotEquals(0, execResult.getExitCode()); + Assertions.assertTrue(execResult.getStderr().contains("The sink table not exist")); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_create_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_create_table.conf index b22794907ae..e4094b258e7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_create_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_create_table.conf @@ -42,8 +42,8 @@ sink { Hive { source_table_name = "fake" table_name = "default.hive_jdbc_example1" - hive_jdbc_url = "jdbc:hive2://127.0.0.1:10000/default" - metastore_uri = "thrift://127.0.0.1:9083" + hive_jdbc_url = "jdbc:hive2://192.168.31.63:10000/default" + metastore_uri = "thrift://192.168.31.63:9083" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" save_mode_create_template = """ create table default.${table_name}( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_re_create_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_re_create_table.conf index 1f7fea43cef..4f8ff594676 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_re_create_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_re_create_table.conf @@ -43,8 +43,8 @@ sink { Hive { source_table_name = "fake" table_name = "default.hive_jdbc_example2" - hive_jdbc_url = "jdbc:hive2://hive:10000/default" - metastore_uri = "thrift://hive:9083" + hive_jdbc_url = "jdbc:hive2://192.168.31.63:10000/default" + metastore_uri = "thrift://192.168.31.63:9083" schema_save_mode = "RECREATE_SCHEMA" save_mode_create_template = """ create table default.${table_name}( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_exist.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_not_exist.conf similarity index 92% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_exist.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_not_exist.conf index 8bf26c3b9de..4a28b736b24 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_exist.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/fake_to_hive_table_not_exist.conf @@ -43,8 +43,8 @@ sink { Hive { source_table_name = "fake" table_name = "default.hive_not_exist_table" - hive_jdbc_url = "jdbc:hive2://hive:10000/default" - metastore_uri = "thrift://hive:9083" + hive_jdbc_url = "jdbc:hive2://192.168.31.63:10000/default" + metastore_uri = "thrift://192.168.31.63:9083" schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive1_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive1_to_assert.conf index a668ac729c1..b5612ea493e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive1_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive1_to_assert.conf @@ -23,7 +23,7 @@ env { source { Hive { table_name = "default.hive_jdbc_example1" - metastore_uri = "thrift://127.0.0.1:9083" + metastore_uri = "thrift://192.168.31.63:9083" result_table_name = hive_source } } @@ -36,10 +36,6 @@ sink { { rule_type = MIN_ROW rule_value = 100 - }, - { - rule_type = MAX_ROW - rule_value = 100 } ], field_rules = [ @@ -64,4 +60,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive2_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive2_to_assert.conf index a668ac729c1..392f1807af2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive2_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/savemode/hive2_to_assert.conf @@ -22,8 +22,8 @@ env { source { Hive { - table_name = "default.hive_jdbc_example1" - metastore_uri = "thrift://127.0.0.1:9083" + table_name = "default.hive_jdbc_example2" + metastore_uri = "thrift://192.168.31.63:9083" result_table_name = hive_source } } @@ -36,10 +36,6 @@ sink { { rule_type = MIN_ROW rule_value = 100 - }, - { - rule_type = MAX_ROW - rule_value = 100 } ], field_rules = [ @@ -64,4 +60,4 @@ sink { ] } } -} \ No newline at end of file +}