Skip to content

Commit

Permalink
[Hotfix][Mongodb cdc] Solve startup resume token is negative (#5143)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: chenzy15 <[email protected]>
  • Loading branch information
MonsterChenzhuo and chenzy15 authored Jul 25, 2023
1 parent 1e18a8c commit e964c03
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.mongodb.client.MongoClient;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;

Expand All @@ -52,6 +53,7 @@
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getLatestResumeToken;

@Slf4j
public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {

private final Map<MongodbSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache =
Expand Down Expand Up @@ -137,6 +139,11 @@ public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig sourceConfig)
ChangeStreamOffset changeStreamOffset;
if (startupResumeToken != null) {
changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
log.info(
"startup resume token={},change stream offset={}",
startupResumeToken,
changeStreamOffset);

} else {
changeStreamOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;

import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
Expand All @@ -29,41 +27,33 @@
import java.nio.ByteOrder;
import java.util.Objects;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;

public class ResumeToken {

private static final int K_TIMESTAMP = 130;

public static @Nonnull BsonTimestamp decodeTimestamp(BsonDocument resumeToken) {
Objects.requireNonNull(resumeToken, "Missing ResumeToken.");
BsonValue bsonValue = resumeToken.get("_data");
byte[] keyStringBytes = extractKeyStringBytes(bsonValue);
validateKeyType(keyStringBytes);

ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
int t = buffer.getInt();
int i = buffer.getInt();
return new BsonTimestamp(t, i);
}

private static byte[] extractKeyStringBytes(@Nonnull BsonValue bsonValue) {
if (bsonValue.isBinary()) {
return bsonValue.asBinary().getData();
} else if (bsonValue.isString()) {
return hexToUint8Array(bsonValue.asString().getValue());
public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) {
BsonValue bsonValue =
Objects.requireNonNull(resumeToken, "Missing ResumeToken.").get("_data");
final byte[] keyStringBytes;
// Resume Tokens format: https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
if (bsonValue.isBinary()) { // BinData
keyStringBytes = bsonValue.asBinary().getData();
} else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
} else {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unknown resume token format: " + bsonValue);
throw new IllegalArgumentException(
"Unknown resume token format: " + resumeToken.toJson());
}
}

private static void validateKeyType(byte[] keyStringBytes) {
int kType = keyStringBytes[0] & 0xff;
ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
int kType = buffer.get() & 0xff;
if (kType != K_TIMESTAMP) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + kType);
throw new IllegalArgumentException("Unknown keyType of timestamp: " + kType);
}

int t = buffer.getInt();
int i = buffer.getInt();
return new BsonTimestamp(t, i);
}

private static byte[] hexToUint8Array(@Nonnull String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public MongoDBContainer(Network network, ShardingClusterRole clusterRole) {
withExposedPorts(MONGODB_PORT);
withCommand(ShardingClusterRole.startupCommand(clusterRole));
waitingFor(clusterRole.waitStrategy);
withEnv("TZ", "Asia/Shanghai");
}

public void executeCommand(String command) {
Expand Down

0 comments on commit e964c03

Please sign in to comment.