diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java index c10ab3e0613..ed04fb0f5d7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportCoordinate; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; @@ -76,7 +77,7 @@ @NoArgsConstructor public abstract class IncrementalSource - implements SeaTunnelSource { + implements SeaTunnelSource, SupportCoordinate { protected ReadonlyConfig readonlyConfig; protected SourceConfig.Factory configFactory; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 7fb75064a4c..41ab0aef755 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -313,19 +313,22 @@ private void setCheckpoint() { } } - public void registerResultTable(Config config, DataStream dataStream) { - if (config.hasPath(RESULT_TABLE_NAME)) { - String name = config.getString(RESULT_TABLE_NAME); - StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); - if (!TableUtil.tableExists(tableEnvironment, name)) { + public void registerResultTable( + Config config, DataStream dataStream, String name, Boolean isAppend) { + StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); + if (!TableUtil.tableExists(tableEnvironment, name)) { + if (isAppend) { if (config.hasPath("field_name")) { String fieldName = config.getString("field_name"); tableEnvironment.registerDataStream(name, dataStream, fieldName); - } else { - tableEnvironment.registerDataStream(name, dataStream); + return; } + tableEnvironment.registerDataStream(name, dataStream); + return; } } + tableEnvironment.createTemporaryView( + name, tableEnvironment.fromChangelogStream(dataStream)); } public static FlinkRuntimeEnvironment getInstance(Config config) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java index e9d36ba068e..6c61f61b957 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java @@ -31,15 +31,19 @@ import java.net.URL; import java.net.URLClassLoader; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; +import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; + public abstract class FlinkAbstractPluginExecuteProcessor implements PluginExecuteProcessor, FlinkRuntimeEnvironment> { protected static final String ENGINE_TYPE = "seatunnel"; protected static final String PLUGIN_NAME = "plugin_name"; protected static final String SOURCE_TABLE_NAME = "source_table_name"; + protected static HashMap isAppendMap = new HashMap<>(); protected static final BiConsumer ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { @@ -76,14 +80,41 @@ protected Optional> fromSourceTable(Config pluginConfig) { if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { StreamTableEnvironment tableEnvironment = flinkRuntimeEnvironment.getStreamTableEnvironment(); - Table table = tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME)); - return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true)); + String tableName = pluginConfig.getString(SOURCE_TABLE_NAME); + Table table = tableEnvironment.from(tableName); + return Optional.ofNullable( + TableUtil.tableToDataStream( + tableEnvironment, table, isAppendMap.getOrDefault(tableName, true))); } return Optional.empty(); } protected void registerResultTable(Config pluginConfig, DataStream dataStream) { - flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream); + if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) { + String resultTable = pluginConfig.getString(RESULT_TABLE_NAME.key()); + if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { + String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME); + flinkRuntimeEnvironment.registerResultTable( + pluginConfig, + dataStream, + resultTable, + isAppendMap.getOrDefault(sourceTable, true)); + registerAppendStream(pluginConfig); + return; + } + flinkRuntimeEnvironment.registerResultTable( + pluginConfig, + dataStream, + resultTable, + isAppendMap.getOrDefault(resultTable, true)); + } + } + + protected void registerAppendStream(Config pluginConfig) { + if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) { + String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key()); + isAppendMap.put(tableName, false); + } } protected abstract List initializePlugins( diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 4b5bef07cb0..b813e71cfb7 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -313,19 +313,22 @@ private void setCheckpoint() { } } - public void registerResultTable(Config config, DataStream dataStream) { - if (config.hasPath(RESULT_TABLE_NAME)) { - String name = config.getString(RESULT_TABLE_NAME); - StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); - if (!TableUtil.tableExists(tableEnvironment, name)) { + public void registerResultTable( + Config config, DataStream dataStream, String name, Boolean isAppend) { + StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); + if (!TableUtil.tableExists(tableEnvironment, name)) { + if (isAppend) { if (config.hasPath("field_name")) { String fieldName = config.getString("field_name"); tableEnvironment.registerDataStream(name, dataStream, fieldName); - } else { - tableEnvironment.registerDataStream(name, dataStream); + return; } + tableEnvironment.registerDataStream(name, dataStream); + return; } } + tableEnvironment.createTemporaryView( + name, tableEnvironment.fromChangelogStream(dataStream)); } public static FlinkRuntimeEnvironment getInstance(Config config) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index 6bcc5fe8939..f3ebdd04378 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -65,9 +65,11 @@ public List> execute(List> upstreamDataStreams) List> sources = new ArrayList<>(); for (int i = 0; i < plugins.size(); i++) { SeaTunnelSource internalSource = plugins.get(i); + Config pluginConfig = pluginConfigs.get(i); BaseSeaTunnelSourceFunction sourceFunction; if (internalSource instanceof SupportCoordinate) { sourceFunction = new SeaTunnelCoordinatedSource(internalSource); + registerAppendStream(pluginConfig); } else { sourceFunction = new SeaTunnelParallelSource(internalSource); } @@ -80,7 +82,6 @@ public List> execute(List> upstreamDataStreams) sourceFunction, "SeaTunnel " + internalSource.getClass().getSimpleName(), bounded); - Config pluginConfig = pluginConfigs.get(i); if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key()); sourceStream.setParallelism(parallelism); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java index ca1603cdf99..aad97518f4b 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java @@ -37,11 +37,9 @@ public static DataStream tableToDataStream( if (isAppend) { return tableEnvironment.toAppendStream(table, typeInfo); } - return tableEnvironment - .toRetractStream(table, typeInfo) - .filter(row -> row.f0) - .map(row -> row.f1) - .returns(typeInfo); + DataStream dataStream = tableEnvironment.toChangelogStream(table); + dataStream.getTransformation().setOutputType(typeInfo); + return dataStream; } public static boolean tableExists(TableEnvironment tableEnvironment, String name) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index 1d0d90853fc..b648febd7d9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -55,7 +55,7 @@ @Slf4j @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, + type = {EngineType.SPARK}, disabledReason = "Currently SPARK and FLINK do not support cdc") public class MysqlCDCIT extends TestSuiteBase implements TestResource { @@ -88,6 +88,9 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource { + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned," + " f_json, cast(f_year as year) from mysql_cdc_e2e_sink_table"; + private static final String CLEAN_SOURCE = "truncate table mysql_cdc_e2e_source_table"; + private static final String CLEAN_SINK = "truncate table mysql_cdc_e2e_sink_table"; + private static MySqlContainer createMySqlContainer(MySqlVersion version) { MySqlContainer mySqlContainer = new MySqlContainer(version) @@ -134,6 +137,9 @@ public void startUp() throws ClassNotFoundException, InterruptedException { @TestTemplate public void testMysqlCdcCheckDataE2e(TestContainer container) throws IOException, InterruptedException { + // Clear related content to ensure that multiple operations are not affected + executeSql(CLEAN_SOURCE); + executeSql(CLEAN_SINK); CompletableFuture executeJobFuture = CompletableFuture.supplyAsync( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java index 8bca3e3b036..bfe2a358889 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java @@ -65,7 +65,7 @@ @Slf4j @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, + type = {EngineType.SPARK}, disabledReason = "Currently SPARK and FLINK do not support cdc") public class SqlServerCDCIT extends TestSuiteBase implements TestResource {