diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index d17a2a5470c..12af3ee9a1e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.constant.LoggerName; @@ -41,6 +43,8 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen private ConcurrentHashMap> consumerChannelMap = new ConcurrentHashMap<>(cacheSize); + private final ConcurrentHashMap activeGroupNotifyMap = new ConcurrentHashMap<>(); + public DefaultConsumerIdsChangeListener(BrokerController brokerController) { this.brokerController = brokerController; @@ -70,9 +74,25 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) { List channels = (List) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { if (this.brokerController.getBrokerConfig().isRealTimeNotifyConsumerChange()) { - for (Channel chl : channels) { + NotifyTaskControl currentNotifyTaskControl = new NotifyTaskControl(channels); + activeGroupNotifyMap.compute(group, (k, oldVal) -> { + if (null != oldVal) { + oldVal.interrupt(); + } + return currentNotifyTaskControl; + }); + + boolean isNormalCompletion = true; + for (Channel chl : currentNotifyTaskControl.getChannels()) { + if (currentNotifyTaskControl.isInterrupted()) { + isNormalCompletion = false; + break; + } this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } + if (isNormalCompletion) { + activeGroupNotifyMap.remove(group); + } } else { consumerChannelMap.put(group, channels); } @@ -125,4 +145,27 @@ private void notifyConsumerChange() { public void shutdown() { this.scheduledExecutorService.shutdown(); } + + private static class NotifyTaskControl { + + private final AtomicBoolean interrupted = new AtomicBoolean(false); + + private final List channels; + + public NotifyTaskControl(List channels) { + this.channels = channels; + } + + public boolean isInterrupted() { + return interrupted.get(); + } + + public void interrupt() { + interrupted.set(true); + } + + public List getChannels() { + return channels; + } + } }