Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][Connector-V2][cdc] Support flink running cdc job #4918

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
Expand Down Expand Up @@ -76,7 +77,7 @@

@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, SupportCoordinate {

protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,22 @@ private void setCheckpoint() {
}
}

public void registerResultTable(Config config, DataStream<Row> dataStream) {
if (config.hasPath(RESULT_TABLE_NAME)) {
String name = config.getString(RESULT_TABLE_NAME);
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
public void registerResultTable(
Config config, DataStream<Row> dataStream, String name, Boolean isAppend) {
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
if (isAppend) {
if (config.hasPath("field_name")) {
String fieldName = config.getString("field_name");
tableEnvironment.registerDataStream(name, dataStream, fieldName);
} else {
tableEnvironment.registerDataStream(name, dataStream);
return;
}
tableEnvironment.registerDataStream(name, dataStream);
return;
}
}
tableEnvironment.createTemporaryView(
name, tableEnvironment.fromChangelogStream(dataStream));
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@

import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;

public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> {
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
protected static final String SOURCE_TABLE_NAME = "source_table_name";
protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();

protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
Expand Down Expand Up @@ -76,14 +80,41 @@ protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkRuntimeEnvironment.getStreamTableEnvironment();
Table table = tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME));
return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
Table table = tableEnvironment.from(tableName);
return Optional.ofNullable(
TableUtil.tableToDataStream(
tableEnvironment, table, isAppendMap.getOrDefault(tableName, true)));
}
return Optional.empty();
}

protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream);
if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
String resultTable = pluginConfig.getString(RESULT_TABLE_NAME.key());
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME);
flinkRuntimeEnvironment.registerResultTable(
pluginConfig,
dataStream,
resultTable,
isAppendMap.getOrDefault(sourceTable, true));
registerAppendStream(pluginConfig);
return;
}
flinkRuntimeEnvironment.registerResultTable(
pluginConfig,
dataStream,
resultTable,
isAppendMap.getOrDefault(resultTable, true));
Comment on lines +95 to +109
Copy link
Member

@Hisoka-X Hisoka-X Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Carl-Zhou-CN , could you explain for me why we should register table as source table name when both have RESULT_TABLE_NAME and SOURCE_TABLE_NAME? Thanks.

I got it. Please ignore it.

}
}

protected void registerAppendStream(Config pluginConfig) {
if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
isAppendMap.put(tableName, false);
}
}

protected abstract List<T> initializePlugins(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,22 @@ private void setCheckpoint() {
}
}

public void registerResultTable(Config config, DataStream<Row> dataStream) {
if (config.hasPath(RESULT_TABLE_NAME)) {
String name = config.getString(RESULT_TABLE_NAME);
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
public void registerResultTable(
Config config, DataStream<Row> dataStream, String name, Boolean isAppend) {
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
if (isAppend) {
if (config.hasPath("field_name")) {
String fieldName = config.getString("field_name");
tableEnvironment.registerDataStream(name, dataStream, fieldName);
} else {
tableEnvironment.registerDataStream(name, dataStream);
return;
}
tableEnvironment.registerDataStream(name, dataStream);
return;
}
}
tableEnvironment.createTemporaryView(
name, tableEnvironment.fromChangelogStream(dataStream));
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
List<DataStream<Row>> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
SeaTunnelSource internalSource = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
if (internalSource instanceof SupportCoordinate) {
sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
registerAppendStream(pluginConfig);
} else {
sourceFunction = new SeaTunnelParallelSource(internalSource);
}
Expand All @@ -80,7 +82,6 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
sourceFunction,
"SeaTunnel " + internalSource.getClass().getSimpleName(),
bounded);
Config pluginConfig = pluginConfigs.get(i);
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ public static DataStream<Row> tableToDataStream(
if (isAppend) {
return tableEnvironment.toAppendStream(table, typeInfo);
}
return tableEnvironment
.toRetractStream(table, typeInfo)
.filter(row -> row.f0)
.map(row -> row.f1)
.returns(typeInfo);
DataStream<Row> dataStream = tableEnvironment.toChangelogStream(table);
dataStream.getTransformation().setOutputType(typeInfo);
return dataStream;
}

public static boolean tableExists(TableEnvironment tableEnvironment, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK and FLINK do not support cdc")
public class MysqlCDCIT extends TestSuiteBase implements TestResource {

Expand Down Expand Up @@ -88,6 +88,9 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource {
+ " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ " f_json, cast(f_year as year) from mysql_cdc_e2e_sink_table";

private static final String CLEAN_SOURCE = "truncate table mysql_cdc_e2e_source_table";
private static final String CLEAN_SINK = "truncate table mysql_cdc_e2e_sink_table";

private static MySqlContainer createMySqlContainer(MySqlVersion version) {
MySqlContainer mySqlContainer =
new MySqlContainer(version)
Expand Down Expand Up @@ -134,6 +137,9 @@ public void startUp() throws ClassNotFoundException, InterruptedException {
@TestTemplate
public void testMysqlCdcCheckDataE2e(TestContainer container)
throws IOException, InterruptedException {
// Clear related content to ensure that multiple operations are not affected
executeSql(CLEAN_SOURCE);
executeSql(CLEAN_SINK);

CompletableFuture<Void> executeJobFuture =
CompletableFuture.supplyAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
type = {EngineType.SPARK},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disabledReason = "Currently SPARK and FLINK do not support cdc")
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {

Expand Down
Loading