-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][Connector-V2][cdc] Support flink running cdc job #4918
Conversation
good |
good pr |
niubility! |
waitting for ci check |
@hailin0 Please help to start the client,the mysql cdc driver is abnormal |
@evan766 Can you share your json configuration? |
|
@evan766 Can I add my wechat zy330600 |
Please merge dev branch into this pr |
@hailin0 ok |
@@ -65,7 +65,7 @@ | |||
@Slf4j | |||
@DisabledOnContainer( | |||
value = {}, | |||
type = {EngineType.SPARK, EngineType.FLINK}, | |||
type = {EngineType.SPARK}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update testcase
https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java#L68
https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java#L58
check ci error |
Temporarily blocks mongo cdc effects #5251 |
08049cf
to
f621076
Compare
@ic4y @hailin0 @EricJoy2048 @Hisoka-X @TyrantLucifer Please help review, thank you |
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you for your contribution!
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { | ||
String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME); | ||
flinkRuntimeEnvironment.registerResultTable( | ||
pluginConfig, | ||
dataStream, | ||
resultTable, | ||
isAppendMap.getOrDefault(sourceTable, true)); | ||
registerAppendStream(pluginConfig); | ||
return; | ||
} | ||
flinkRuntimeEnvironment.registerResultTable( | ||
pluginConfig, | ||
dataStream, | ||
resultTable, | ||
isAppendMap.getOrDefault(resultTable, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Carl-Zhou-CN , could you explain for me why we should register table as source table name when both have RESULT_TABLE_NAME
and SOURCE_TABLE_NAME
? Thanks.
I got it. Please ignore it.
* [feature][Connector-V2][cdc] Support flink running cdc job * [feature][Connector-V2][cdc] Support flink running cdc job * [feature][Connector-V2][cdc] mongo e2e delete --------- Co-authored-by: zhouyao <[email protected]>
Purpose of this pull request
close #4605
Check list
New License Guide
release-note
.