Skip to content

Commit

Permalink
Assign offset in offsetTable even if the subscription key not exist.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 committed Jan 21, 2025
1 parent 90136a7 commit 8d92df4
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs
}

String key = topic + TOPIC_GROUP_SEPARATOR + group;
resetOffsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset);
resetOffsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset);

// Two things are important here:
// 1, currentOffsetMap might be null if there is no previous records;
// 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
// sense in cases like clients are offline.
offsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset);
offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
}

public boolean hasOffsetReset(String topic, String group, int queueId) {
Expand Down

0 comments on commit 8d92df4

Please sign in to comment.