diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index 12157ce162f..0f879c823de 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -111,16 +111,6 @@ public static List getCatalogTables( return Collections.singletonList(catalogTable); } - List> schemaMaps = - readonlyConfig.get(TableSchemaOptions.TABLES_CONFIGS); - if (schemaMaps != null) { - if (schemaMaps.isEmpty()) { - throw new SeaTunnelException("tables_configs can not be empty"); - } - return schemaMaps.stream() - .map(map -> buildWithConfig(factoryId, ReadonlyConfig.fromMap(map))) - .collect(Collectors.toList()); - } Optional optionalCatalog = FactoryUtil.createOptionalCatalog( factoryId, readonlyConfig, classLoader, factoryId); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java index 726b7bc1c48..fbb3da4c164 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb; +import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; @@ -28,6 +30,7 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions; @@ -36,7 +39,10 @@ import com.google.auto.service.AutoService; import java.io.Serializable; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT; @@ -84,9 +90,7 @@ public Class getSourceClass() { public TableSource createSource(TableSourceFactoryContext context) { return () -> { - List catalogTables = - CatalogTableUtil.getCatalogTables( - context.getOptions(), context.getClassLoader()); + List catalogTables = buildWithConfig(context.getOptions()); List collections = context.getOptions().get(MongodbSourceOptions.COLLECTION); if (collections.size() != catalogTables.size()) { throw new MongodbConnectorException( @@ -115,4 +119,29 @@ TableSource createSource(TableSourceFactoryContext context) { new MongodbIncrementalSource<>(context.getOptions(), catalogTables); }; } + + private List buildWithConfig(ReadonlyConfig config) { + String factoryId = config.get(CommonOptions.PLUGIN_NAME).replace("-CDC", ""); + Map schemaMap = config.get(TableSchemaOptions.SCHEMA); + if (schemaMap != null) { + if (schemaMap.isEmpty()) { + throw new SeaTunnelException("Schema config can not be empty"); + } + CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, config); + return Collections.singletonList(catalogTable); + } + List> schemaMaps = config.get(TableSchemaOptions.TABLES_CONFIGS); + if (schemaMaps != null) { + if (schemaMaps.isEmpty()) { + throw new SeaTunnelException("tables_configs can not be empty"); + } + return schemaMaps.stream() + .map( + map -> + CatalogTableUtil.buildWithConfig( + factoryId, ReadonlyConfig.fromMap(map))) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index 59b9710a097..3fee4352148 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -319,7 +319,7 @@ private void truncateMysqlTable(String tableName) { try (Connection connection = getJdbcConnection(); PreparedStatement checkStmt = connection.prepareStatement(checkTableExistsSql)) { - checkStmt.setString(1, MYSQL_DATABASE); // 或者你可以直接提供数据库名 + checkStmt.setString(1, MYSQL_DATABASE); checkStmt.setString(2, tableName); try (ResultSet rs = checkStmt.executeQuery()) { if (rs.next() && rs.getInt(1) > 0) { diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 2d43147ccd6..5ebe8ab7305 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -67,11 +67,5 @@ ${project.version} - - org.apache.seatunnel - connector-mongodb - ${project.version} - - diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java index a1fcea2837c..3c23400214b 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java @@ -36,7 +36,7 @@ public class SeaTunnelEngineLocalExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { - String configurePath = args.length > 0 ? args[0] : "/examples/mongodb_source.conf"; + String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf"; String configFile = getTestConfigFile(configurePath); ClientCommandArgs clientCommandArgs = new ClientCommandArgs(); clientCommandArgs.setConfigFile(configFile); diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/log4j2.properties b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/log4j2.properties index 35ef1327dba..77985ea23a9 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/log4j2.properties +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/log4j2.properties @@ -19,7 +19,7 @@ # The minimum amount of time, in seconds, that must elapse before the file configuration is checked for changes. monitorInterval = 60 -rootLogger.level = INFO +rootLogger.level = DEBUG rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender