Skip to content

Commit

Permalink
Assign offset in offsetTable even if the subscription key not exist.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 committed Jan 21, 2025
1 parent 14ca1ce commit 90136a7
Showing 1 changed file with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -417,27 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs
}

String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>();
ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map);
if (null != previous) {
map = previous;
}
}

map.put(queueId, offset);
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}",
topic, group, queueId, offset);
resetOffsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset);
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset);

// Two things are important here:
// 1, currentOffsetMap might be null if there is no previous records;
// 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
// sense in cases like clients are offline.
ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
if (null != currentOffsetMap) {
currentOffsetMap.put(queueId, offset);
}
offsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset);
}

public boolean hasOffsetReset(String topic, String group, int queueId) {
Expand Down

0 comments on commit 90136a7

Please sign in to comment.