Skip to content

Commit

Permalink
Enabled seatunnel e2e and add doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 29, 2023
1 parent a9e46e9 commit 62b31d5
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 47 deletions.
48 changes: 48 additions & 0 deletions docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Kafka source compatible kafka-connect-json

Seatunnel connector kafka supports parsing data extracted through kafka connect source, especially data extracted from kafka connect jdbc and kafka connect debezium

# How to use

## Kafka output to mysql

```bash
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "jdbc_source_record"
result_table_name = "kafka_table"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = COMPATIBLE_KAFKA_CONNECT_JSON
from_debezium_record = false
}
}


sink {
Jdbc {
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://localhost:3306/seatunnel"
user = st_user
password = seatunnel
generate_sink_sql = true
database = seatunnel
table = jdbc_sink
primary_keys = ["id"]
}
}
```

5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Source connector for Apache Kafka.
| start_mode.offsets | | no | |
| start_mode.timestamp | Long | no | |
| partition-discovery.interval-millis | long | no | -1 |
| from_debezium_record | boolean | no | false |

### topic [string]

Expand Down Expand Up @@ -111,6 +112,10 @@ start_mode.offsets = {
}
```

## from_debezium_record

Convert data captured by Debezium connect

## Example

### Simple
Expand Down
21 changes: 0 additions & 21 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${debezium.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ public enum MessageFormat {
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -57,21 +58,19 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.given;

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.SEATUNNEL})
type = {EngineType.SPARK})
public class KafkaConnectToKafkaIT extends TestSuiteBase implements TestResource {

private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectToKafkaIT.class);
Expand Down Expand Up @@ -186,14 +185,14 @@ public void testJdbcRecordKafkaToMysql(TestContainer container)
Container.ExecResult execResult =
container.executeJob("/kafkasource_jdbc_record_to_mysql.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Set<List<Object>> actual = new HashSet<>();
List<Object> actual = new ArrayList<>();
try (Connection connection =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword())) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("select * from jdbc_sink");
ResultSet resultSet = statement.executeQuery("select * from jdbc_sink order by id");
while (resultSet.next()) {
List<Object> row =
Arrays.asList(
Expand All @@ -205,12 +204,12 @@ public void testJdbcRecordKafkaToMysql(TestContainer container)
}
}
}
Set<List<Object>> expected =
Stream.<List<Object>>of(
Arrays.asList(15, "test", "test", "20"),
Arrays.asList(16, "test-001", "test", "30"),
Arrays.asList(18, "sdc", "sdc", "sdc"))
.collect(Collectors.toSet());

List<Object> expected =
Lists.newArrayList(
Arrays.asList(15, "test", "test", "20"),
Arrays.asList(16, "test-001", "test", "30"),
Arrays.asList(18, "sdc", "sdc", "sdc"));
Assertions.assertIterableEquals(expected, actual);
}

Expand All @@ -220,14 +219,15 @@ public void testDebeziumRecordKafkaToMysql(TestContainer container)
Container.ExecResult execResult =
container.executeJob("/kafkasource_debezium_record_to_mysql.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Set<List<Object>> actual = new HashSet<>();
List<Object> actual = new ArrayList<>();
try (Connection connection =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword())) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("select * from debezium_sink");
ResultSet resultSet =
statement.executeQuery("select * from debezium_sink order by id");
while (resultSet.next()) {
List<Object> row =
Arrays.asList(
Expand All @@ -239,12 +239,11 @@ public void testDebeziumRecordKafkaToMysql(TestContainer container)
}
}
}
Set<List<Object>> expected =
Stream.<List<Object>>of(
Arrays.asList(15, "test", "test", "20"),
Arrays.asList(16, "test-001", "test", "30"),
Arrays.asList(18, "sdc", "sdc", "sdc"))
.collect(Collectors.toSet());
List<Object> expected =
Lists.newArrayList(
Arrays.asList(15, "test", "test", "20"),
Arrays.asList(16, "test-001", "test", "30"),
Arrays.asList(18, "sdc", "sdc", "sdc"));
Assertions.assertIterableEquals(expected, actual);
}

Expand Down
22 changes: 17 additions & 5 deletions seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<artifactId>connect-file</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>*</artifactId>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand All @@ -64,7 +67,16 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${debezium.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>
Expand Down

0 comments on commit 62b31d5

Please sign in to comment.