Skip to content

Commit

Permalink
[Feature][Mongodb-CDC] support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 15, 2024
1 parent 0764304 commit c650eb8
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ public static List<CatalogTable> getCatalogTables(
return Collections.singletonList(catalogTable);
}

List<Map<String, Object>> schemaMaps =
readonlyConfig.get(TableSchemaOptions.TABLES_CONFIGS);
if (schemaMaps != null) {
if (schemaMaps.isEmpty()) {
throw new SeaTunnelException("tables_configs can not be empty");
}
return schemaMaps.stream()
.map(map -> buildWithConfig(factoryId, ReadonlyConfig.fromMap(map)))
.collect(Collectors.toList());
}
Optional<Catalog> optionalCatalog =
FactoryUtil.createOptionalCatalog(
factoryId, readonlyConfig, classLoader, factoryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
Expand All @@ -28,6 +30,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
Expand All @@ -36,7 +39,10 @@
import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
Expand Down Expand Up @@ -84,9 +90,7 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
List<CatalogTable> catalogTables = buildWithConfig(context.getOptions());
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
if (collections.size() != catalogTables.size()) {
throw new MongodbConnectorException(
Expand Down Expand Up @@ -115,4 +119,29 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
new MongodbIncrementalSource<>(context.getOptions(), catalogTables);
};
}

private List<CatalogTable> buildWithConfig(ReadonlyConfig config) {
String factoryId = config.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
Map<String, Object> schemaMap = config.get(TableSchemaOptions.SCHEMA);
if (schemaMap != null) {
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, config);
return Collections.singletonList(catalogTable);
}
List<Map<String, Object>> schemaMaps = config.get(TableSchemaOptions.TABLES_CONFIGS);
if (schemaMaps != null) {
if (schemaMaps.isEmpty()) {
throw new SeaTunnelException("tables_configs can not be empty");
}
return schemaMaps.stream()
.map(
map ->
CatalogTableUtil.buildWithConfig(
factoryId, ReadonlyConfig.fromMap(map)))
.collect(Collectors.toList());
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void truncateMysqlTable(String tableName) {

try (Connection connection = getJdbcConnection();
PreparedStatement checkStmt = connection.prepareStatement(checkTableExistsSql)) {
checkStmt.setString(1, MYSQL_DATABASE); // 或者你可以直接提供数据库名
checkStmt.setString(1, MYSQL_DATABASE);
checkStmt.setString(2, tableName);
try (ResultSet rs = checkStmt.executeQuery()) {
if (rs.next() && rs.getInt(1) > 0) {
Expand Down
6 changes: 0 additions & 6 deletions seatunnel-examples/seatunnel-engine-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,5 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-mongodb</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SeaTunnelEngineLocalExample {

public static void main(String[] args)
throws FileNotFoundException, URISyntaxException, CommandException {
String configurePath = args.length > 0 ? args[0] : "/examples/mongodb_source.conf";
String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# The minimum amount of time, in seconds, that must elapse before the file configuration is checked for changes.
monitorInterval = 60

rootLogger.level = INFO
rootLogger.level = DEBUG

rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
Expand Down

0 comments on commit c650eb8

Please sign in to comment.