From 163451ed30c3f3cd3029d7b6c8b77d65d163dcee Mon Sep 17 00:00:00 2001 From: rongtong Date: Mon, 18 Nov 2024 14:30:01 +0800 Subject: [PATCH] Support for Persisting LMQ Consumer Offsets in Config V1 Using RocksDB (#8939) --- .../rocketmq/broker/BrokerController.java | 3 +- .../v1/RocksDBLmqConsumerOffsetManager.java | 103 ------------------ .../RocksDBLmqConsumerOffsetManagerTest.java | 55 +++------- 3 files changed, 19 insertions(+), 142 deletions(-) delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index b6c903929d7..143922e456f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -77,7 +77,6 @@ import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager; import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; -import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.processor.AckMessageProcessor; @@ -352,7 +351,7 @@ public BrokerController( } else if (this.messageStoreConfig.isEnableRocksDBStore()) { this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this); this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this); - this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this); + this.consumerOffsetManager = new RocksDBConsumerOffsetManager(this); } else { this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java deleted file mode 100644 index e961c6c635a..00000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.broker.config.v1; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -public class RocksDBLmqConsumerOffsetManager extends RocksDBConsumerOffsetManager { - private ConcurrentHashMap lmqOffsetTable = new ConcurrentHashMap<>(512); - - public RocksDBLmqConsumerOffsetManager(BrokerController brokerController) { - super(brokerController); - } - - @Override - public long queryOffset(final String group, final String topic, final int queueId) { - if (!MixAll.isLmq(group)) { - return super.queryOffset(group, topic, queueId); - } - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - Long offset = lmqOffsetTable.get(key); - if (offset != null) { - return offset; - } - return -1; - } - - @Override - public Map queryOffset(final String group, final String topic) { - if (!MixAll.isLmq(group)) { - return super.queryOffset(group, topic); - } - Map map = new HashMap<>(); - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - Long offset = lmqOffsetTable.get(key); - if (offset != null) { - map.put(0, offset); - } - return map; - } - - @Override - public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, - final long offset) { - if (!MixAll.isLmq(group)) { - super.commitOffset(clientHost, group, topic, queueId, offset); - return; - } - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - lmqOffsetTable.put(key, offset); - } - - @Override - public String encode() { - return this.encode(false); - } - - @Override - public void decode(String jsonString) { - if (jsonString != null) { - RocksDBLmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, RocksDBLmqConsumerOffsetManager.class); - if (obj != null) { - super.setOffsetTable(obj.getOffsetTable()); - this.lmqOffsetTable = obj.lmqOffsetTable; - } - } - } - - @Override - public String encode(final boolean prettyFormat) { - return RemotingSerializable.toJson(this, prettyFormat); - } - - public ConcurrentHashMap getLmqOffsetTable() { - return lmqOffsetTable; - } - - public void setLmqOffsetTable(ConcurrentHashMap lmqOffsetTable) { - this.lmqOffsetTable = lmqOffsetTable; - } -} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java index 1b9916d6ac1..aa5003fc103 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.broker.offset; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager; +import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -28,45 +28,37 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.when; public class RocksDBLmqConsumerOffsetManagerTest { private static final String LMQ_GROUP = MixAll.LMQ_PREFIX + "FooBarGroup"; private static final String NON_LMQ_GROUP = "nonLmqGroup"; - private static final String TOPIC = "FooBarTopic"; + + private static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "FooBarTopic"; + private static final String NON_LMQ_TOPIC = "FooBarTopic"; private static final int QUEUE_ID = 0; private static final long OFFSET = 12345; private BrokerController brokerController; - private RocksDBLmqConsumerOffsetManager offsetManager; + private RocksDBConsumerOffsetManager offsetManager; @Before public void setUp() { brokerController = Mockito.mock(BrokerController.class); when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class)); - when(brokerController.getBrokerConfig()).thenReturn(Mockito.mock(BrokerConfig.class)); - offsetManager = new RocksDBLmqConsumerOffsetManager(brokerController); + when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); + offsetManager = new RocksDBConsumerOffsetManager(brokerController); } - @Test - public void testQueryOffsetForLmq() { - // Setup - offsetManager.getLmqOffsetTable().put(getKey(), OFFSET); - // Execute - long actualOffset = offsetManager.queryOffset(LMQ_GROUP, TOPIC, QUEUE_ID); - // Verify - assertEquals("Offset should match the expected value.", OFFSET, actualOffset); - } @Test public void testQueryOffsetForNonLmq() { - long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC, QUEUE_ID); + long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID); // Verify assertEquals("Offset should not be null.", -1, actualOffset); } @@ -74,10 +66,10 @@ public void testQueryOffsetForNonLmq() { @Test public void testQueryOffsetForLmqGroupWithExistingOffset() { - offsetManager.getLmqOffsetTable().put(getKey(), OFFSET); + offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Act - Map actualOffsets = offsetManager.queryOffset(LMQ_GROUP, TOPIC); + Map actualOffsets = offsetManager.queryOffset(LMQ_GROUP, LMQ_TOPIC); // Assert assertNotNull(actualOffsets); @@ -89,23 +81,20 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() { public void testQueryOffsetForLmqGroupWithoutExistingOffset() { // Act Map actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic"); - // Assert - assertNotNull(actualOffsets); - assertTrue("The map should be empty for non-existing offsets", actualOffsets.isEmpty()); + assertNull(actualOffsets); } @Test public void testQueryOffsetForNonLmqGroup() { - when(brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep()).thenReturn(1L); // Arrange Map mockOffsets = new HashMap<>(); mockOffsets.put(QUEUE_ID, OFFSET); - offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET); + offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID, OFFSET); // Act - Map actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC); + Map actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC); // Assert assertNotNull(actualOffsets); @@ -115,21 +104,13 @@ public void testQueryOffsetForNonLmqGroup() { @Test public void testCommitOffsetForLmq() { // Execute - offsetManager.commitOffset("clientHost", LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET); + offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Verify - Long expectedOffset = offsetManager.getLmqOffsetTable().get(getKey()); + Long expectedOffset = offsetManager.getOffsetTable().get(getLMQKey()).get(QUEUE_ID); assertEquals("Offset should be updated correctly.", OFFSET, expectedOffset.longValue()); } - @Test - public void testEncode() { - offsetManager.setLmqOffsetTable(new ConcurrentHashMap<>(512)); - offsetManager.getLmqOffsetTable().put(getKey(), OFFSET); - String encodedData = offsetManager.encode(); - assertTrue(encodedData.contains(String.valueOf(OFFSET))); - } - - private String getKey() { - return TOPIC + "@" + LMQ_GROUP; + private String getLMQKey() { + return LMQ_TOPIC + "@" + LMQ_GROUP; } }