Skip to content

Commit

Permalink
[BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapsho…
Browse files Browse the repository at this point in the history
…t phase
  • Loading branch information
zhouyao committed Jul 31, 2023
1 parent e4f666f commit 292efc9
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

public class MongoDBConnectorDeserializationSchema
Expand Down Expand Up @@ -154,17 +155,6 @@ private SeaTunnelRow extractRowData(BsonDocument document) {
return (SeaTunnelRow) physicalConverter.convert(document);
}

private BsonDocument extractBsonDocument(
Struct value, @Nonnull Schema valueSchema, String fieldName) {
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
return BsonDocument.parse(docString);
}
}
return null;
}

// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;

Expand All @@ -50,12 +52,16 @@
import java.util.stream.Collectors;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
Expand Down Expand Up @@ -172,9 +178,24 @@ public void rewriteOutputBuffer(

switch (OperationType.fromString(operationType)) {
case INSERT:
outputBuffer.put(key, changeRecord);
break;
case UPDATE:
case REPLACE:
outputBuffer.put(key, changeRecord);
Schema valueSchema = changeRecord.valueSchema();
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, FULL_DOCUMENT);
fullDocument.put(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT));
SourceRecord record =
buildSourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
fullDocument);
outputBuffer.put(key, record);
break;
case DELETE:
outputBuffer.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -66,7 +67,18 @@ public static BsonDocument getResumeToken(SourceRecord sourceRecord) {

public static BsonDocument getDocumentKey(@Nonnull SourceRecord sourceRecord) {
Struct value = (Struct) sourceRecord.value();
return BsonDocument.parse(value.getString(DOCUMENT_KEY));
return extractBsonDocument(value, sourceRecord.valueSchema(), DOCUMENT_KEY);
}

public static BsonDocument extractBsonDocument(
Struct value, @Nonnull Schema valueSchema, String fieldName) {
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
return BsonDocument.parse(docString);
}
}
return null;
}

public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String key) {
Expand Down Expand Up @@ -139,6 +151,30 @@ public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String k
valueSchemaAndValue.value());
}

public static @Nonnull SourceRecord buildSourceRecord(
Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset,
String topicName,
Integer partition,
Schema keySchema,
Object key,
BsonDocument valueDocument) {
BsonValueToSchemaAndValue schemaAndValue =
new BsonValueToSchemaAndValue(new DefaultJson().getJsonWriterSettings());
SchemaAndValue valueSchemaAndValue =
schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), valueDocument);

return new SourceRecord(
sourcePartition,
sourceOffset,
topicName,
partition,
keySchema,
key,
valueSchemaAndValue.schema(),
valueSchemaAndValue.value());
}

public static @Nonnull Map<String, String> createSourceOffsetMap(
@Nonnull BsonDocument idDocument, boolean isSnapshotRecord) {
Map<String, String> sourceOffset = new HashMap<>();
Expand Down

0 comments on commit 292efc9

Please sign in to comment.