diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/ChangeStreamHandler.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/ChangeStreamHandler.java index b4b5e5b9..35fa7a1f 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/ChangeStreamHandler.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/ChangeStreamHandler.java @@ -31,7 +31,9 @@ public class ChangeStreamHandler extends MongoDbHandler { { put(OperationType.CREATE, new MongoDbInsert()); put(OperationType.READ, new MongoDbInsert()); - put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream)); + put( + OperationType.UPDATE, + new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream, false)); put(OperationType.DELETE, new MongoDbDelete()); } }; diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbHandler.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbHandler.java index a2881356..385a2c59 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbHandler.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbHandler.java @@ -44,7 +44,7 @@ public class MongoDbHandler extends DebeziumCdcHandler { { put(OperationType.CREATE, new MongoDbInsert()); put(OperationType.READ, new MongoDbInsert()); - put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog)); + put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, false)); put(OperationType.DELETE, new MongoDbDelete()); } }; diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandler.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandler.java new file mode 100644 index 00000000..e6ed92ab --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandler.java @@ -0,0 +1,24 @@ +package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb; + +import java.util.HashMap; +import java.util.Map; + +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; +import com.mongodb.kafka.connect.sink.cdc.CdcOperation; +import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType; + +public class MongoDbUniqueFieldHandler extends MongoDbHandler { + private static final Map DEFAULT_OPERATIONS = + new HashMap() { + { + put(OperationType.CREATE, new MongoDbInsert()); + put(OperationType.READ, new MongoDbInsert()); + put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, true)); + put(OperationType.DELETE, new MongoDbDelete()); + } + }; + + public MongoDbUniqueFieldHandler(final MongoSinkTopicConfig config) { + super(config, DEFAULT_OPERATIONS); + } +} diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.java index 86592e6e..8609c13c 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.java @@ -45,11 +45,14 @@ public enum EventFormat { private static final String JSON_DOC_FIELD_AFTER = "after"; public static final String INTERNAL_OPLOG_FIELD_V = "$v"; + public static final String JSON_DOC_FIELD_FILTER = "filter"; private final EventFormat eventFormat; + private final boolean isUseFilterInValueDoc; - public MongoDbUpdate(final EventFormat eventFormat) { + public MongoDbUpdate(final EventFormat eventFormat, final boolean isUseFilterInValueDoc) { this.eventFormat = eventFormat; + this.isUseFilterInValueDoc = isUseFilterInValueDoc; } @Override @@ -91,8 +94,11 @@ private WriteModel handleOplogEvent(final SinkDocument doc) { return new ReplaceOneModel<>(filterDoc, updateDoc, REPLACE_OPTIONS); } + BsonDocument filterDoc = + !isUseFilterInValueDoc ? getFilterDocByKeyId(doc) : getFilterDocByValue(doc); + // patch contains idempotent change only to update original document with - return new UpdateOneModel<>(getFilterDocByKeyId(doc), updateDoc); + return new UpdateOneModel<>(filterDoc, updateDoc); } private WriteModel handleChangeStreamEvent(final SinkDocument doc) { @@ -119,6 +125,23 @@ private BsonDocument getFilterDocByKeyId(final SinkDocument doc) { format("{%s: %s}", ID_FIELD, keyDoc.getString(JSON_ID_FIELD).getValue())); } + private BsonDocument getFilterDocByValue(final SinkDocument doc) { + BsonDocument valueDoc = getDocumentValue(doc); + + if (!valueDoc.containsKey(JSON_DOC_FIELD_FILTER)) { + throw new DataException(format("Update document missing `%s` field.", JSON_DOC_FIELD_FILTER)); + } + + BsonDocument filterDoc = + BsonDocument.parse(valueDoc.getString(JSON_DOC_FIELD_FILTER).getValue()); + + if (!filterDoc.containsKey(ID_FIELD)) { + throw new DataException(format("Filter document missing `%s` field.", ID_FIELD)); + } + + return filterDoc; + } + private BsonDocument getDocumentKey(final SinkDocument doc) { return doc.getKeyDoc() .orElseThrow( diff --git a/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandlerTest.java b/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandlerTest.java new file mode 100644 index 00000000..9911d7ed --- /dev/null +++ b/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUniqueFieldHandlerTest.java @@ -0,0 +1,205 @@ +package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb; + +import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig; +import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; + +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonNull; +import org.bson.BsonString; + +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType; +import com.mongodb.kafka.connect.sink.converter.SinkDocument; + +class MongoDbUniqueFieldHandlerTest { + + private static final MongoDbUniqueFieldHandler HANDLER_DEFAULT_MAPPING = + new MongoDbUniqueFieldHandler(createTopicConfig()); + + @Test + @DisplayName("verify existing default config from base class") + void testExistingDefaultConfig() { + assertAll( + () -> + assertNotNull( + HANDLER_DEFAULT_MAPPING.getConfig(), "default config for handler must not be null"), + () -> + assertNotNull( + new MongoDbHandler(createTopicConfig(), emptyMap()).getConfig(), + "default config for handler must not be null")); + } + + @Test + @DisplayName("when key document missing then DataException") + void testMissingKeyDocument() { + assertThrows( + DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(new SinkDocument(null, null))); + } + + @Test + @DisplayName("when key doc contains 'id' field but value is empty then null due to tombstone") + void testTombstoneEvent() { + assertEquals( + Optional.empty(), + HANDLER_DEFAULT_MAPPING.handle( + new SinkDocument(new BsonDocument("id", new BsonInt32(1234)), new BsonDocument())), + "tombstone event must result in Optional.empty()"); + } + + @Test + @DisplayName("when value doc contains unknown operation type then DataException") + void testUnknownCdcOperationType() { + SinkDocument cdcEvent = + new SinkDocument( + new BsonDocument("id", new BsonInt32(1234)), + new BsonDocument("op", new BsonString("x"))); + assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent)); + } + + @Test + @DisplayName("when value doc contains unmapped operation type then DataException") + void testUnmappedCdcOperationType() { + SinkDocument cdcEvent = + new SinkDocument( + new BsonDocument("_id", new BsonInt32(1234)), + new BsonDocument("op", new BsonString("z")) + .append("after", new BsonString("{_id:1234,foo:\"blah\"}"))); + + assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent)); + } + + @Test + @DisplayName("when value doc contains operation type other than string then DataException") + void testInvalidCdcOperationType() { + SinkDocument cdcEvent = + new SinkDocument( + new BsonDocument("id", new BsonInt32(1234)), + new BsonDocument("op", new BsonInt32('c'))); + + assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent)); + } + + @Test + @DisplayName("when value doc is missing operation type then DataException") + void testMissingCdcOperationType() { + SinkDocument cdcEvent = + new SinkDocument( + new BsonDocument("id", new BsonInt32(1234)), new BsonDocument("po", BsonNull.VALUE)); + + assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent)); + } + + @TestFactory + @DisplayName("when valid CDC event then correct WriteModel") + Stream testValidCdcDocument() { + + return Stream.of( + dynamicTest( + "test operation " + OperationType.CREATE, + () -> { + Optional> result = + HANDLER_DEFAULT_MAPPING.handle( + new SinkDocument( + new BsonDocument("_id", new BsonString("1234")), + new BsonDocument("op", new BsonString("c")) + .append("after", new BsonString("{_id:1234,foo:\"blah\"}")))); + assertTrue(result.isPresent()); + assertTrue( + result.get() instanceof ReplaceOneModel, + "result expected to be of type ReplaceOneModel"); + }), + dynamicTest( + "test operation " + OperationType.READ, + () -> { + Optional> result = + HANDLER_DEFAULT_MAPPING.handle( + new SinkDocument( + new BsonDocument("_id", new BsonString("1234")), + new BsonDocument("op", new BsonString("r")) + .append("after", new BsonString("{_id:1234,foo:\"blah\"}")))); + assertTrue(result.isPresent()); + assertTrue( + result.get() instanceof ReplaceOneModel, + "result expected to be of type ReplaceOneModel"); + }), + dynamicTest( + "test operation " + OperationType.UPDATE, + () -> { + Optional> result = + HANDLER_DEFAULT_MAPPING.handle( + new SinkDocument( + new BsonDocument("id", new BsonString("1234")), + new BsonDocument("op", new BsonString("u")) + .append("patch", new BsonString("{\"$set\":{foo:\"blah\"}}")) + .append("filter", new BsonString("{_id:1234,email:\"blah\"}")))); + assertTrue(result.isPresent()); + assertTrue( + result.get() instanceof UpdateOneModel, + "result expected to be of type UpdateOneModel"); + }), + dynamicTest( + "test operation " + OperationType.DELETE, + () -> { + Optional> result = + HANDLER_DEFAULT_MAPPING.handle( + new SinkDocument( + new BsonDocument("id", new BsonString("1234")), + new BsonDocument("op", new BsonString("d")))); + assertTrue(result.isPresent(), "write model result must be present"); + assertTrue( + result.get() instanceof DeleteOneModel, + "result expected to be of type DeleteOneModel"); + })); + } + + @TestFactory + @DisplayName("when valid cdc operation type then correct MongoDB CdcOperation") + Stream testValidCdcOpertionTypes() { + + return Stream.of( + dynamicTest( + "test operation " + OperationType.CREATE, + () -> + assertTrue( + HANDLER_DEFAULT_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("c"))) + instanceof MongoDbInsert)), + dynamicTest( + "test operation " + OperationType.READ, + () -> + assertTrue( + HANDLER_DEFAULT_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("r"))) + instanceof MongoDbInsert)), + dynamicTest( + "test operation " + OperationType.UPDATE, + () -> + assertTrue( + HANDLER_DEFAULT_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("u"))) + instanceof MongoDbUpdate)), + dynamicTest( + "test operation " + OperationType.DELETE, + () -> + assertTrue( + HANDLER_DEFAULT_MAPPING.getCdcOperation( + new BsonDocument("op", new BsonString("d"))) + instanceof MongoDbDelete))); + } +} diff --git a/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdateTest.java b/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdateTest.java index c74b447b..4e474d80 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdateTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdateTest.java @@ -38,11 +38,17 @@ class MongoDbUpdateTest { private static final MongoDbUpdate OPLOG_UPDATE = - new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog); + new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, false); + private static final MongoDbUpdate OPLOG_UPDATE_WITH_UNIQUE_FILTER = + new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, true); private static final MongoDbUpdate CHANGE_STREAM_UPDATE = - new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream); + new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream, false); private static final BsonDocument FILTER_DOC = BsonDocument.parse("{_id: 1234}"); + private static final BsonDocument FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD = + BsonDocument.parse("{_id: 1234, username: 'unique'}"); + private static final BsonDocument FILTER_DOC_WITH_UNIQUE_FIELD_ONLY = + BsonDocument.parse("{username: 'unique'}"); private static final BsonDocument REPLACEMENT_DOC = BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper'}"); private static final BsonDocument UPDATE_DOC = @@ -119,6 +125,55 @@ public void testValidSinkDocumentWithInternalOploagFieldForUpdate() { assertEquals(FILTER_DOC, writeModel.getFilter()); } + @Test + @DisplayName( + "when valid doc change cdc event containing the filter field then correct UpdateOneModel") + public void testValidSinkDocumentWithFilterFieldForUpdate() { + BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}"); + BsonDocument valueDoc = + new BsonDocument("op", new BsonString("u")) + .append("patch", new BsonString(UPDATE_DOC.toJson())) + .append("filter", new BsonString(FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD.toJson())); + + WriteModel result = + OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc)); + assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel"); + + UpdateOneModel writeModel = (UpdateOneModel) result; + assertEquals(UPDATE_DOC, writeModel.getUpdate(), "update doc not matching what is expected"); + assertTrue( + writeModel.getFilter() instanceof BsonDocument, + "filter expected to be of type BsonDocument"); + assertEquals(FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD, writeModel.getFilter()); + } + + @Test + @DisplayName("when missing filter document then DataException") + public void testMissingFilterDocument() { + BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}"); + BsonDocument valueDoc = + new BsonDocument("op", new BsonString("u")) + .append("patch", new BsonString(UPDATE_DOC.toJson())); + + assertThrows( + DataException.class, + () -> OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc))); + } + + @Test + @DisplayName("when missing id doc in the filter document then DataException") + public void testMissingIdInFilterDocument() { + BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}"); + BsonDocument valueDoc = + new BsonDocument("op", new BsonString("u")) + .append("patch", new BsonString(UPDATE_DOC.toJson())) + .append("filter", new BsonString(FILTER_DOC_WITH_UNIQUE_FIELD_ONLY.toJson())); + + assertThrows( + DataException.class, + () -> OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc))); + } + @Test @DisplayName("when valid doc replace change stream cdc event then correct ReplaceOneModel") void testValidChangeStreamSinkDocumentForReplacement() {