From 62b31d59c981d9ca86a468fe8713d08b2c7571ab Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 6 May 2023 10:43:59 +0800 Subject: [PATCH] Enabled seatunnel e2e and add doc --- .../kafka-compatible-kafkaconnect-json.md | 48 +++++++++++++++++++ docs/en/connector-v2/source/kafka.md | 5 ++ .../connector-kafka/pom.xml | 21 -------- .../seatunnel/kafka/config/MessageFormat.java | 1 - .../kafka/KafkaConnectToKafkaIT.java | 39 ++++++++------- .../pom.xml | 22 +++++++-- 6 files changed, 89 insertions(+), 47 deletions(-) create mode 100644 docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md diff --git a/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md new file mode 100644 index 000000000000..cfd211a062d3 --- /dev/null +++ b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md @@ -0,0 +1,48 @@ +# Kafka source compatible kafka-connect-json + +Seatunnel connector kafka supports parsing data extracted through kafka connect source, especially data extracted from kafka connect jdbc and kafka connect debezium + +# How to use + +## Kafka output to mysql + +```bash +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "localhost:9092" + topic = "jdbc_source_record" + result_table_name = "kafka_table" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = COMPATIBLE_KAFKA_CONNECT_JSON + from_debezium_record = false + } +} + + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://localhost:3306/seatunnel" + user = st_user + password = seatunnel + generate_sink_sql = true + database = seatunnel + table = jdbc_sink + primary_keys = ["id"] + } +} +``` + diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 2ed6ec6f12e4..85f00f0b72ee 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -34,6 +34,7 @@ Source connector for Apache Kafka. | start_mode.offsets | | no | | | start_mode.timestamp | Long | no | | | partition-discovery.interval-millis | long | no | -1 | +| from_debezium_record | boolean | no | false | ### topic [string] @@ -111,6 +112,10 @@ start_mode.offsets = { } ``` +## from_debezium_record + +Convert data captured by Debezium connect + ## Example ### Simple diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 4bace2682efd..ce3f0bce4383 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -68,27 +68,6 @@ ${project.version} - - io.debezium - debezium-embedded - ${debezium.version} - - - org.apache.kafka - kafka-log4j-appender - - - org.glassfish.jersey.core - * - - - - - - io.debezium - debezium-connector-mongodb - ${debezium.version} - diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java index 81d13d53c04a..07f9a38ddffe 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java @@ -24,5 +24,4 @@ public enum MessageFormat { DEBEZIUM_JSON, COMPATIBLE_DEBEZIUM_JSON, COMPATIBLE_KAFKA_CONNECT_JSON - } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java index b02583d9e66f..7a2551cfc2ba 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java @@ -48,6 +48,7 @@ import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; +import com.google.common.collect.Lists; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -57,13 +58,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.awaitility.Awaitility.given; @@ -71,7 +70,7 @@ @Slf4j @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.SEATUNNEL}) + type = {EngineType.SPARK}) public class KafkaConnectToKafkaIT extends TestSuiteBase implements TestResource { private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectToKafkaIT.class); @@ -186,14 +185,14 @@ public void testJdbcRecordKafkaToMysql(TestContainer container) Container.ExecResult execResult = container.executeJob("/kafkasource_jdbc_record_to_mysql.conf"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - Set> actual = new HashSet<>(); + List actual = new ArrayList<>(); try (Connection connection = DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword())) { try (Statement statement = connection.createStatement()) { - ResultSet resultSet = statement.executeQuery("select * from jdbc_sink"); + ResultSet resultSet = statement.executeQuery("select * from jdbc_sink order by id"); while (resultSet.next()) { List row = Arrays.asList( @@ -205,12 +204,12 @@ public void testJdbcRecordKafkaToMysql(TestContainer container) } } } - Set> expected = - Stream.>of( - Arrays.asList(15, "test", "test", "20"), - Arrays.asList(16, "test-001", "test", "30"), - Arrays.asList(18, "sdc", "sdc", "sdc")) - .collect(Collectors.toSet()); + + List expected = + Lists.newArrayList( + Arrays.asList(15, "test", "test", "20"), + Arrays.asList(16, "test-001", "test", "30"), + Arrays.asList(18, "sdc", "sdc", "sdc")); Assertions.assertIterableEquals(expected, actual); } @@ -220,14 +219,15 @@ public void testDebeziumRecordKafkaToMysql(TestContainer container) Container.ExecResult execResult = container.executeJob("/kafkasource_debezium_record_to_mysql.conf"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - Set> actual = new HashSet<>(); + List actual = new ArrayList<>(); try (Connection connection = DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword())) { try (Statement statement = connection.createStatement()) { - ResultSet resultSet = statement.executeQuery("select * from debezium_sink"); + ResultSet resultSet = + statement.executeQuery("select * from debezium_sink order by id"); while (resultSet.next()) { List row = Arrays.asList( @@ -239,12 +239,11 @@ public void testDebeziumRecordKafkaToMysql(TestContainer container) } } } - Set> expected = - Stream.>of( - Arrays.asList(15, "test", "test", "20"), - Arrays.asList(16, "test-001", "test", "30"), - Arrays.asList(18, "sdc", "sdc", "sdc")) - .collect(Collectors.toSet()); + List expected = + Lists.newArrayList( + Arrays.asList(15, "test", "test", "20"), + Arrays.asList(16, "test-001", "test", "30"), + Arrays.asList(18, "sdc", "sdc", "sdc")); Assertions.assertIterableEquals(expected, actual); } diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml index 817cc70d37d0..f2357f6665b0 100644 --- a/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml @@ -47,15 +47,18 @@ io.debezium debezium-embedded ${debezium.version} - provided + + org.slf4j + slf4j-api + org.apache.kafka - kafka-log4j-appender + connect-file - org.glassfish.jersey.core - * + org.apache.kafka + connect-runtime @@ -64,7 +67,16 @@ io.debezium debezium-connector-mongodb ${debezium.version} - provided + + + io.debezium + debezium-core + + + org.mongodb + mongodb-driver-sync + +