diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 24f7bdb99a5..d90835034aa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -95,6 +95,7 @@ public class MessageConst { public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES"; public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS"; public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY"; + public static final String PROPERTY_TIMER_DEL_FLAG = "TIMER_DEL_FLAG"; public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL"; public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS"; public static final String PROPERTY_CRC32 = "__CRC32#"; @@ -151,6 +152,7 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_TIMER_ROLL_TIMES); STRING_HASH_SET.add(PROPERTY_TIMER_OUT_MS); STRING_HASH_SET.add(PROPERTY_TIMER_DEL_UNIQKEY); + STRING_HASH_SET.add(PROPERTY_TIMER_DEL_FLAG); STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL); STRING_HASH_SET.add(PROPERTY_BORN_HOST); STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP); diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 0ea58415487..4063b1d0544 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -472,6 +472,17 @@ public void setRocksdbCompressionType(String compressionType) { **/ private boolean useABSLock = false; + /** + * Maximum number of messages to be read each time + * -1 : read all messages + */ + private int readCountTimerOnRocksDB = -1; + + /** + * When enabled, the scheduled message is using rocksdb + */ + private boolean enableTimerMessageOnRocksDB = false; + public boolean isRocksdbCQDoubleWriteEnable() { return rocksdbCQDoubleWriteEnable; } @@ -1638,6 +1649,22 @@ public void setTimerPrecisionMs(int timerPrecisionMs) { this.timerPrecisionMs = candidates[candidates.length - 1]; } + // visible for test + public void setTimerPrecision(int timerPrecisionMs) { + if (enableTimerMessageOnRocksDB) { + this.timerPrecisionMs = timerPrecisionMs; + return; + } + int[] candidates = {100, 200, 500, 1000}; + for (int i = 1; i < candidates.length; i++) { + if (timerPrecisionMs < candidates[i]) { + this.timerPrecisionMs = candidates[i - 1]; + return; + } + } + this.timerPrecisionMs = candidates[candidates.length - 1]; + } + public int getTimerRollWindowSlot() { return timerRollWindowSlot; } @@ -1714,6 +1741,10 @@ public boolean isTimerStopDequeue() { return timerStopDequeue; } + public void setTimerStopDequeue(boolean timerStopDequeue) { + this.timerStopDequeue = timerStopDequeue; + } + public int getTimerMetricSmallThreshold() { return timerMetricSmallThreshold; } @@ -1950,4 +1981,20 @@ public void setUseABSLock(boolean useABSLock) { public boolean getUseABSLock() { return useABSLock; } + + public void setReadCountTimerOnRocksDB(int readCountTimerOnRocksDB) { + this.readCountTimerOnRocksDB = readCountTimerOnRocksDB; + } + + public int getReadCountTimerOnRocksDB() { + return readCountTimerOnRocksDB; + } + + public void setEnableTimerMessageOnRocksDB(boolean enableTimerMessageOnRocksDB) { + this.enableTimerMessageOnRocksDB = enableTimerMessageOnRocksDB; + } + + public boolean getEnableTimerMessageOnRocksDB() { + return enableTimerMessageOnRocksDB; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index 5687d6a222d..09355fcc57f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -182,6 +182,108 @@ public static ColumnFamilyOptions createPopCFOptions() { .setOptimizeFiltersForHits(true); } + public static ColumnFamilyOptions createTimerCFOptions() { + BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig() + .setFormatVersion(5) + .setIndexType(IndexType.kBinarySearch) + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + .setDataBlockHashTableUtilRatio(0.75) + .setBlockSize(128 * SizeUnit.KB) + .setMetadataBlockSize(4 * SizeUnit.KB) + .setFilterPolicy(new BloomFilter(16, false)) + .setCacheIndexAndFilterBlocks(false) + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setPinL0FilterAndIndexBlocksInCache(false) + .setPinTopLevelIndexAndFilter(true) + .setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)) + .setWholeKeyFiltering(true); + + CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal() + .setSizeRatio(100) + .setMaxSizeAmplificationPercent(25) + .setAllowTrivialMove(true) + .setMinMergeWidth(2) + .setMaxMergeWidth(Integer.MAX_VALUE) + .setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize) + .setCompressionSizePercent(-1); + + //noinspection resource + return new ColumnFamilyOptions() + .setMaxWriteBufferNumber(6) + .setWriteBufferSize(128 * SizeUnit.MB) + .setMinWriteBufferNumberToMerge(1) + .setTableFormatConfig(blockBasedTableConfig) + .setMemTableConfig(new SkipListMemTableConfig()) + .setCompressionType(CompressionType.NO_COMPRESSION) + .setBottommostCompressionType(CompressionType.NO_COMPRESSION) + .setNumLevels(7) + .setCompactionPriority(CompactionPriority.MinOverlappingRatio) + .setCompactionStyle(CompactionStyle.UNIVERSAL) + .setCompactionOptionsUniversal(compactionOption) + .setMaxCompactionBytes(100 * SizeUnit.GB) + .setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB) + .setHardPendingCompactionBytesLimit(256 * SizeUnit.GB) + .setLevel0FileNumCompactionTrigger(2) + .setLevel0SlowdownWritesTrigger(8) + .setLevel0StopWritesTrigger(10) + .setTargetFileSizeBase(256 * SizeUnit.MB) + .setTargetFileSizeMultiplier(2) + .setMergeOperator(new StringAppendOperator()) + .setReportBgIoStats(true) + .setOptimizeFiltersForHits(true); + } + + public static ColumnFamilyOptions createTimerMetricCFOptions() { + BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig() + .setFormatVersion(5) + .setIndexType(IndexType.kBinarySearch) + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + .setDataBlockHashTableUtilRatio(0.75) + .setBlockSize(4 * SizeUnit.KB) + .setMetadataBlockSize(4 * SizeUnit.KB) + .setFilterPolicy(new BloomFilter(16, false)) + .setCacheIndexAndFilterBlocks(false) + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setPinL0FilterAndIndexBlocksInCache(false) + .setPinTopLevelIndexAndFilter(true) + .setBlockCache(new LRUCache(64 * SizeUnit.MB, 8, false)) + .setWholeKeyFiltering(true); + + CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal() + .setSizeRatio(100) + .setMaxSizeAmplificationPercent(25) + .setAllowTrivialMove(true) + .setMinMergeWidth(2) + .setMaxMergeWidth(Integer.MAX_VALUE) + .setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize) + .setCompressionSizePercent(-1); + + //noinspection resource + return new ColumnFamilyOptions() + .setMaxWriteBufferNumber(4) + .setWriteBufferSize(64 * SizeUnit.MB) + .setMinWriteBufferNumberToMerge(1) + .setTableFormatConfig(blockBasedTableConfig) + .setMemTableConfig(new SkipListMemTableConfig()) + .setCompressionType(CompressionType.NO_COMPRESSION) + .setBottommostCompressionType(CompressionType.NO_COMPRESSION) + .setNumLevels(7) + .setCompactionPriority(CompactionPriority.MinOverlappingRatio) + .setCompactionStyle(CompactionStyle.UNIVERSAL) + .setCompactionOptionsUniversal(compactionOption) + .setMaxCompactionBytes(100 * SizeUnit.GB) + .setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB) + .setHardPendingCompactionBytesLimit(256 * SizeUnit.GB) + .setLevel0FileNumCompactionTrigger(2) + .setLevel0SlowdownWritesTrigger(8) + .setLevel0StopWritesTrigger(10) + .setTargetFileSizeBase(128 * SizeUnit.MB) + .setTargetFileSizeMultiplier(2) + .setMergeOperator(new StringAppendOperator()) + .setReportBgIoStats(true) + .setOptimizeFiltersForHits(true); + } + /** * Create a rocksdb db options, the user must take care to close it after closing db. * @return diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 4287ce78ab0..1ce5a8ba9d6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -71,6 +71,7 @@ import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.ReferredIterator; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore; import org.apache.rocketmq.store.util.PerfCounter; public class TimerMessageStore { @@ -158,6 +159,7 @@ public class TimerMessageStore { protected volatile boolean shouldRunningDequeue; private final BrokerStatsManager brokerStatsManager; private Function escapeBridgeHook; + private TimerMessageRocksDBStore timerRocksDBStore; public TimerMessageStore(final MessageStore messageStore, final MessageStoreConfig storeConfig, TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics, @@ -212,6 +214,7 @@ protected ByteBuffer initialValue() { dequeuePutQueue = new LinkedBlockingDeque<>(DEFAULT_CAPACITY); } this.brokerStatsManager = brokerStatsManager; + this.timerRocksDBStore = new TimerMessageRocksDBStore(messageStore, storeConfig, timerMetrics, brokerStatsManager); } public void initService() { @@ -238,6 +241,7 @@ public boolean load() { this.initService(); boolean load = timerLog.load(); load = load && this.timerMetrics.load(); + load = load && timerRocksDBStore.load(); recover(); calcTimerDistribution(); return load; @@ -464,6 +468,7 @@ public static boolean isMagicOK(int magic) { } public void start() { + timerRocksDBStore.start(); this.shouldStartTime = storeConfig.getDisappearTimeAfterStart() + System.currentTimeMillis(); maybeMoveWriteTime(); enqueueGetService.start(); @@ -552,6 +557,7 @@ public void shutdown() { timerWheel.shutdown(false); this.scheduler.shutdown(); + this.timerRocksDBStore.shutdown(); UtilAll.cleanBuffer(this.bufferLocal.get()); this.bufferLocal.remove(); } @@ -653,7 +659,7 @@ public void holdMomentForUnknownError() { } public boolean enqueue(int queueId) { - if (storeConfig.isTimerStopEnqueue()) { + if (storeConfig.isTimerStopEnqueue() || storeConfig.getEnableTimerMessageOnRocksDB()) { return false; } if (!isRunningEnqueue()) { @@ -1727,14 +1733,23 @@ public void run() { } public long getAllCongestNum() { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return timerRocksDBStore.getAllCongestNum(); + } return timerWheel.getAllNum(currReadTimeMs); } public long getCongestNum(long deliverTimeMs) { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return timerRocksDBStore.getCongestNum(deliverTimeMs); + } return timerWheel.getNum(deliverTimeMs); } public boolean isReject(long deliverTimeMs) { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return false; + } long congestNum = timerWheel.getNum(deliverTimeMs); if (congestNum <= storeConfig.getTimerCongestNumEachSlot()) { return false; @@ -1749,6 +1764,9 @@ public boolean isReject(long deliverTimeMs) { } public long getEnqueueBehindMessages() { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return timerRocksDBStore.getEnqueueBehindMessages(); + } long tmpQueueOffset = currQueueOffset; ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0); long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); @@ -1756,6 +1774,9 @@ public long getEnqueueBehindMessages() { } public long getEnqueueBehindMillis() { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return timerRocksDBStore.getEnqueueBehindMillis(); + } if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) { return System.currentTimeMillis() - lastEnqueueButExpiredStoreTime; } @@ -1767,10 +1788,13 @@ public long getEnqueueBehind() { } public long getDequeueBehindMessages() { - return timerWheel.getAllNum(currReadTimeMs); + return getAllCongestNum(); } public long getDequeueBehindMillis() { + if (storeConfig.getEnableTimerMessageOnRocksDB()) { + return timerRocksDBStore.getDequeueBehindMillis(); + } return System.currentTimeMillis() - currReadTimeMs; } @@ -1915,4 +1939,12 @@ public TimerCheckpoint getTimerCheckpoint() { public static String buildDeleteKey(String realTopic, String uniqueKey) { return realTopic + "+" + uniqueKey; } + + public static String extractUniqueKey(String deleteKey) { + int separatorIndex = deleteKey.indexOf('+'); + if (separatorIndex == -1) { + throw new IllegalArgumentException("Invalid deleteKey format"); + } + return deleteKey.substring(separatorIndex + 1); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index 7f8fedd8a5b..bd81e92a9ce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -80,6 +80,11 @@ public long updateDistPair(int period, int value) { return distPair.getCount().addAndGet(value); } + public void resetDistPair(int period, int value) { + Metric distPair = getDistPair(period); + distPair.getCount().set(value); + } + public long addAndGet(MessageExt msg, int value) { String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC); Metric pair = getTopicPair(topic); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java new file mode 100644 index 00000000000..a850ab7067d --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.timer.rocksdb; + +import java.util.List; + +public interface TimerMessageKVStore { + + /** + * Start the timer message kv store. + */ + boolean start(); + + /** + * Shutdown the timer message kv store. + */ + boolean shutdown(); + + /** + * Get the file path of the timer message kv store. + */ + String getFilePath(); + + /** + * Write the timer message records to the timer message kv store. + * @param columnFamily the column family of the timer message kv store. + * @param consumerRecordList the list of timer message records to be written. + * @param offset the cq offset of the timer message records to be written. + */ + void writeAssignRecords(byte[] columnFamily, List consumerRecordList, long offset); + + /** + * Delete the timer message records from the timer message kv store. + * @param columnFamily the column family of the timer message kv store. + * @param consumerRecordList the list of timer message records to be deleted. + * @param offset the wheel offset of the timer message records to be read. + */ + void deleteAssignRecords(byte[] columnFamily, List consumerRecordList, long offset); + + /** + * Scan the timer message records from the timer message kv store. + * @param columnFamily the column family of the timer message kv store. + * @param lowerTime the lower time of the timer message records to be scanned. + * @param upperTime the upper time of the timer message records to be scanned. + * @return the list of timer message records. + */ + List scanRecords(byte[] columnFamily, long lowerTime, long upperTime); + + /** + * Get the commit offset of the timer message kv store from cq. + * @return the commit offset of the timer message kv store. + */ + long getCommitOffset(); + + /** + * Get the metric size of the timer message kv store. + * @param lowerTime the lower time of the timer message records to be scanned. + * @param upperTime the upper time of the timer message records to be scanned. + * @return sum. + */ + int getMetricSize(long lowerTime, long upperTime); + + /** + * Get the checkpoint of the timer message kv store. + * Key : columnFamily byte[]; Value : checkpoint long. + * @param columnFamily the column family of the timer message kv store. + * @return the checkpoint of the timer message kv store. + */ + long getCheckpoint(byte[] columnFamily); + + /** + * Sync the metric of the timer message kv store. + * @param key the key of the metric. + * @param update the value of the metric. + */ + void syncMetric(long key, int update); +} diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRecord.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRecord.java new file mode 100644 index 00000000000..8ceeb5e2709 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRecord.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.timer.rocksdb; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class TimerMessageRecord { + enum Flag { + DEFAULT, + TRANSACTION, + POP + } + + // key: delayTime + uniqueKey + private long delayTime; + private String uniqueKey; + private MessageExt messageExt; + private boolean roll = false; + // value: sizeReal + offsetPY + private int sizeReal; + private long offsetPY; + private long readOffset; + private final static int VALUE_LENGTH = Integer.BYTES + Long.BYTES; + + public TimerMessageRecord() { + } + + public TimerMessageRecord(long delayTime, String uniqueKey, long offsetPY, int sizeReal) { + this.delayTime = delayTime; + this.uniqueKey = uniqueKey; + this.offsetPY = offsetPY; + this.sizeReal = sizeReal; + } + + public byte[] getKeyBytes() { + byte[] value = uniqueKey.getBytes(StandardCharsets.UTF_8); + int keyLength = Long.BYTES + value.length; + byte[] keyBytes = new byte[keyLength]; + ByteBuffer buffer = ByteBuffer.wrap(keyBytes); + buffer.putLong(delayTime); + buffer.put(value); + return keyBytes; + } + + public byte[] getValueBytes() { + byte[] valueBytes = new byte[VALUE_LENGTH]; + ByteBuffer buffer = ByteBuffer.wrap(valueBytes); + buffer.putInt(this.getSizeReal()); + buffer.putLong(this.getOffsetPY()); + return valueBytes; + } + + public static TimerMessageRecord decode(byte[] body) { + TimerMessageRecord timerMessageRecord = new TimerMessageRecord(); + ByteBuffer buffer = ByteBuffer.wrap(body); + timerMessageRecord.setSizeReal(buffer.getInt()); + timerMessageRecord.setOffsetPY(buffer.getLong()); + return timerMessageRecord; + } + + public void setDelayTime(long delayTime) { + this.delayTime = delayTime; + } + + public void setOffsetPY(long offsetPY) { + this.offsetPY = offsetPY; + } + + public void setSizeReal(int sizeReal) { + this.sizeReal = sizeReal; + } + + public int getSizeReal() { + return sizeReal; + } + + public long getDelayTime() { + return delayTime; + } + + public long getOffsetPY() { + return offsetPY; + } + + public boolean isRoll() { + return roll; + } + + public void setRoll(boolean roll) { + this.roll = roll; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public void setMessageExt(MessageExt messageExt) { + this.messageExt = messageExt; + } + + public String getUniqueKey() { + return uniqueKey; + } + + public void setUniqueKey(String uniqueKey) { + this.uniqueKey = uniqueKey; + } + + public void setReadOffset(long readOffset) { + this.readOffset = readOffset; + } + + public long getReadOffset() { + return readOffset; + } + + @Override + public String toString() { + return "TimerMessageRecord{" + + "delayTime=" + delayTime + + ", offsetPY=" + offsetPY + + ", sizeReal=" + sizeReal + + ", messageExt=" + messageExt + + ", roll=" + roll + + ", uniqueKey='" + uniqueKey + '\'' + + '}'; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java new file mode 100644 index 00000000000..3bf33be24d0 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.timer.rocksdb; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.rocksdb.RocksDBOptionsFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteBatch; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.WriteOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.Slice; +import org.rocksdb.RocksIterator; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class TimerMessageRocksDBStorage extends AbstractRocksDBStorage implements TimerMessageKVStore { + private final static Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + public final static byte[] TRANSACTION_COLUMN_FAMILY = "transaction".getBytes(StandardCharsets.UTF_8); + public final static byte[] POP_COLUMN_FAMILY = "pop".getBytes(StandardCharsets.UTF_8); + // It is used to mark the CQ file scanning site + public final static byte[] TIMER_WRITE_OFFSET_KEY = "write_timer_offset".getBytes(StandardCharsets.UTF_8); + // key : 100 * n (delay time); value : long (msg number) + public final static byte[] METRIC_COLUMN_FAMILY = "metric".getBytes(StandardCharsets.UTF_8); + + private ColumnFamilyHandle popColumnFamilyHandle; + private ColumnFamilyHandle transactionColumnFamilyHandle; + + // Supports 100ms level statistics + private ColumnFamilyHandle metricColumnFamilyHandle; + + private WriteOptions writeOptions; + private WriteOptions deleteOptions; + + public TimerMessageRocksDBStorage(String filePath) { + super(filePath); + } + + @Override + protected void initOptions() { + this.options = RocksDBOptionsFactory.createDBOptions(); + + this.deleteOptions = new WriteOptions(); + this.deleteOptions.setSync(false); + this.deleteOptions.setDisableWAL(true); + this.deleteOptions.setNoSlowdown(false); + + this.writeOptions = new WriteOptions(); + this.writeOptions.setSync(false); + this.writeOptions.setDisableWAL(true); + this.writeOptions.setNoSlowdown(false); + + this.compactRangeOptions = new CompactRangeOptions(); + this.compactRangeOptions.setBottommostLevelCompaction( + CompactRangeOptions.BottommostLevelCompaction.kForce); + this.compactRangeOptions.setAllowWriteStall(true); + this.compactRangeOptions.setExclusiveManualCompaction(false); + this.compactRangeOptions.setChangeLevel(true); + this.compactRangeOptions.setTargetLevel(-1); + this.compactRangeOptions.setMaxSubcompactions(4); + } + @Override + protected boolean postLoad() { + try { + UtilAll.ensureDirOK(this.dbPath); + initOptions(); + + // init column family here + ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createTimerCFOptions(); + ColumnFamilyOptions popOptions = RocksDBOptionsFactory.createTimerCFOptions(); + ColumnFamilyOptions transactionOptions = RocksDBOptionsFactory.createTimerCFOptions(); + ColumnFamilyOptions metricOptions = RocksDBOptionsFactory.createTimerMetricCFOptions(); + + this.cfOptions.add(defaultOptions); + this.cfOptions.add(popOptions); + this.cfOptions.add(transactionOptions); + this.cfOptions.add(metricOptions); + + List cfDescriptors = new ArrayList<>(); + cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); + cfDescriptors.add(new ColumnFamilyDescriptor(POP_COLUMN_FAMILY, popOptions)); + cfDescriptors.add(new ColumnFamilyDescriptor(TRANSACTION_COLUMN_FAMILY, transactionOptions)); + cfDescriptors.add(new ColumnFamilyDescriptor(METRIC_COLUMN_FAMILY, metricOptions)); + + this.open(cfDescriptors); + this.defaultCFHandle = cfHandles.get(0); + this.popColumnFamilyHandle = cfHandles.get(1); + this.transactionColumnFamilyHandle = cfHandles.get(2); + this.metricColumnFamilyHandle = cfHandles.get(3); + + log.debug("Timer message on rocksdb init, filePath={}", this.dbPath); + } catch (final Exception e) { + log.error("Timer message on rocksdb init error, filePath={}", this.dbPath, e); + return false; + } + return true; + } + + @Override + protected void preShutdown() { + if (this.writeOptions != null) { + this.writeOptions.close(); + } + if (this.deleteOptions != null) { + this.deleteOptions.close(); + } + if (this.defaultCFHandle != null) { + this.defaultCFHandle.close(); + } + if (this.transactionColumnFamilyHandle != null) { + this.transactionColumnFamilyHandle.close(); + } + if (this.popColumnFamilyHandle != null) { + this.popColumnFamilyHandle.close(); + } + if (this.metricColumnFamilyHandle != null) { + this.metricColumnFamilyHandle.close(); + } + } + + @Override + public String getFilePath() { + return this.dbPath; + } + + @Override + public void writeAssignRecords(byte[] columnFamily, List consumerRecordList, long offset) { + ColumnFamilyHandle cfHandle = getColumnFamily(columnFamily); + + if (cfHandle != null && !consumerRecordList.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (TimerMessageRecord record : consumerRecordList) { + writeBatch.put(cfHandle, record.getKeyBytes(), record.getValueBytes()); + } + if (offset != -1) { + // -1 means not sync + syncCommitOffset(offset, writeBatch); + } + this.db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw new RuntimeException("Write record on RoCKsDB error", e); + } + } + } + + @Override + public void deleteAssignRecords(byte[] columnFamily, List consumerRecordList, long offset) { + ColumnFamilyHandle deleteCfHandle = getColumnFamily(columnFamily); + + if (deleteCfHandle != null && !consumerRecordList.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (TimerMessageRecord record : consumerRecordList) { + writeBatch.delete(deleteCfHandle, record.getKeyBytes()); + } + if (offset != -1) { + // -1 means delete type messages instead of thread scans + syncCheckpoint(columnFamily, offset, writeBatch); + } + this.db.write(deleteOptions, writeBatch); + } catch (RocksDBException e) { + throw new RuntimeException("Delete record on RocksDB error", e); + } + } + } + + private ColumnFamilyHandle getColumnFamily(byte[] columnFamily) { + ColumnFamilyHandle cfHandle; + if (columnFamily == POP_COLUMN_FAMILY) { + cfHandle = popColumnFamilyHandle; + } else if (columnFamily == TRANSACTION_COLUMN_FAMILY) { + cfHandle = transactionColumnFamilyHandle; + } else if (columnFamily == RocksDB.DEFAULT_COLUMN_FAMILY) { + cfHandle = defaultCFHandle; + } else { + throw new RuntimeException("Unknown column family"); + } + + return cfHandle; + } + + @Override + public List scanRecords(byte[] columnFamily, long lowerTime, long upperTime) { + List records = new ArrayList<>(); + ColumnFamilyHandle cfHandle = getColumnFamily(columnFamily); + + try (ReadOptions readOptions = new ReadOptions() + .setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array())) + .setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array())); + RocksIterator iterator = db.newIterator(cfHandle, readOptions)) { + iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array()); + while (iterator.isValid()) { + records.add(TimerMessageRecord.decode(iterator.value())); + iterator.next(); + } + } + return records; + } + + @Override + public long getCommitOffset() { + try { + byte[] offsetBytes = db.get(TIMER_WRITE_OFFSET_KEY); + return offsetBytes == null ? 0 : ByteBuffer.wrap(offsetBytes).getLong(); + } catch (RocksDBException e) { + throw new RuntimeException("Get commit offset from RocksDB error", e); + } + } + + @Override + public int getMetricSize(long lowerTime, long upperTime) { + int metricSize = 0; + + try (ReadOptions readOptions = new ReadOptions() + .setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array())) + .setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array())); + RocksIterator iterator = db.newIterator(metricColumnFamilyHandle, readOptions)) { + iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array()); + while (iterator.isValid()) { + metricSize += ByteBuffer.wrap(iterator.value()).getInt(); + iterator.next(); + } + } + return metricSize; + } + + @Override + public long getCheckpoint(byte[] columnFamily) { + try { + byte[] checkpointBytes = db.get(columnFamily); + return checkpointBytes == null ? System.currentTimeMillis() : ByteBuffer.wrap(checkpointBytes).getLong(); + } catch (RocksDBException e) { + throw new RuntimeException("Get checkpoint to RocksDB error", e); + } + } + + @Override + public void syncMetric(long key, int update) { + try { + byte[] keyBytes = db.get(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array()); + if (keyBytes != null) { + ByteBuffer oldValue = ByteBuffer.wrap(keyBytes); + update = oldValue.getInt() + update; + } + db.put(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array(), ByteBuffer.allocate(4).putInt(update).array()); + } catch (RocksDBException e) { + throw new RuntimeException("Sync metric to RocksDB error", e); + } + } + + private void syncCheckpoint(byte[] columnFamily, long checkpoint, WriteBatch writeBatch) { + try { + writeBatch.put(columnFamily, ByteBuffer.allocate(8).putLong(checkpoint).array()); + } catch (RocksDBException e) { + throw new RuntimeException("Sync checkpoint to RocksDB error", e); + } + } + + private void syncCommitOffset(long offset, WriteBatch writeBatch) { + try { + writeBatch.put(TIMER_WRITE_OFFSET_KEY, ByteBuffer.allocate(8).putLong(offset).array()); + } catch (RocksDBException e) { + throw new RuntimeException("Sync commit offset error", e); + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java new file mode 100644 index 00000000000..46ff237ba54 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.timer.rocksdb; + +import com.conversantmedia.util.concurrent.DisruptorBlockingQueue; +import io.opentelemetry.api.common.Attributes; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; +import org.apache.rocketmq.store.queue.CqUnit; +import org.apache.rocketmq.store.queue.ReferredIterator; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.store.timer.TimerMessageStore; +import org.apache.rocketmq.store.timer.TimerMetrics; +import org.rocksdb.RocksDB; + +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStorage.POP_COLUMN_FAMILY; +import static org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStorage.TRANSACTION_COLUMN_FAMILY; + +public class TimerMessageRocksDBStore { + private final static Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static final String ROCKSDB_DIRECTORY = "kvStore"; + + public static final int TIMER_WHEEL_TTL_DAY = 30; + public static final int DAY_SECS = 24 * 3600; + public static final int DEFAULT_CAPACITY = 1024; + public static final int INITIAL = 0, RUNNING = 1, HAULT = 2, SHUTDOWN = 3; + public static final int PUT_OK = 0, PUT_NEED_RETRY = 1, PUT_NO_RETRY = 2; + public static final String TIMER_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"; + public static final String TIMER_OUT_MS = MessageConst.PROPERTY_TIMER_OUT_MS; + public static final String TIMER_ROLL_TIMES = MessageConst.PROPERTY_TIMER_ROLL_TIMES; + public static final String TIMER_DEQUEUE_MS = MessageConst.PROPERTY_TIMER_DEQUEUE_MS; + public static final String TIMER_ENQUEUE_MS = MessageConst.PROPERTY_TIMER_ENQUEUE_MS; + + private final TimerMessageKVStore timerMessageKVStore; + private final MessageStore messageStore; + private final BrokerStatsManager brokerStatsManager; + private final MessageStoreConfig storeConfig; + private final TimerMetrics timerMetrics; + + private final int slotSize; + private final int precisionMs; + private final long metricsIntervalMs; + private volatile int state = INITIAL; + private long lastEnqueueButExpiredTime; + private long lastEnqueueButExpiredStoreTime; + + private TimerEnqueueGetService timerEnqueueGetService; + private TimerEnqueuePutService timerEnqueuePutService; + private List timerGetMessageServices; + private List timerWarmServices; + private TimerDequeueGetService[] timerDequeueGetServices; + private TimerDequeuePutService[] timerDequeuePutServices; + + private BlockingQueue enqueuePutQueue; + private BlockingQueue> dequeueGetQueue; + private BlockingQueue> dequeuePutQueue; + + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final AtomicLong commitOffset = new AtomicLong(0); + private final AtomicLong readOffset = new AtomicLong(0); + + public TimerMessageRocksDBStore(final MessageStore messageStore, final MessageStoreConfig storeConfig, + TimerMetrics timerMetrics, final BrokerStatsManager brokerStatsManager) { + this.storeConfig = storeConfig; + this.messageStore = messageStore; + this.timerMetrics = timerMetrics; + this.brokerStatsManager = brokerStatsManager; + + this.precisionMs = storeConfig.getTimerPrecisionMs(); + this.slotSize = 1000 * TIMER_WHEEL_TTL_DAY / precisionMs * DAY_SECS; + this.metricsIntervalMs = 1000L * TIMER_WHEEL_TTL_DAY * DAY_SECS; + this.timerMessageKVStore = new TimerMessageRocksDBStorage(Paths.get( + storeConfig.getStorePathRootDir(), ROCKSDB_DIRECTORY).toString()); + + this.timerGetMessageServices = new ArrayList<>(); + this.timerWarmServices = new ArrayList<>(); + } + + public boolean load() { + initService(); + boolean result = timerMessageKVStore.start(); + result &= this.timerMetrics.load(); + calcTimerDistribution(); + return result; + } + + public void start() { + if (state == RUNNING) { + return; + } + this.commitOffset.set(timerMessageKVStore.getCommitOffset()); + this.readOffset.set(commitOffset.get()); + this.timerEnqueueGetService.start(); + this.timerEnqueuePutService.start(); + + for (TimerDequeueGetService timerDequeueGetService : timerDequeueGetServices) { + timerDequeueGetService.start(); + } + for (TimerWarmService timerWarmService : timerWarmServices) { + timerWarmService.start(); + } + for (TimerGetMessageService timerGetMessageService : timerGetMessageServices) { + timerGetMessageService.start(); + } + for (TimerDequeuePutService timerDequeuePutService : timerDequeuePutServices) { + timerDequeuePutService.start(); + } + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + timerMetrics.persist(); + } + }, storeConfig.getTimerFlushIntervalMs(), storeConfig.getTimerFlushIntervalMs(), TimeUnit.MILLISECONDS); + state = RUNNING; + } + + public void shutdown() { + if (state != RUNNING || state == SHUTDOWN) { + return; + } + state = SHUTDOWN; + + this.timerEnqueueGetService.shutdown(); + this.timerEnqueuePutService.shutdown(); + + for (TimerDequeueGetService timerDequeueGetService : timerDequeueGetServices) { + timerDequeueGetService.shutdown(); + } + for (TimerWarmService timerWarmService : timerWarmServices) { + timerWarmService.shutdown(); + } + for (TimerGetMessageService timerGetMessageService : timerGetMessageServices) { + timerGetMessageService.shutdown(); + } + for (TimerDequeuePutService timerDequeuePutService : timerDequeuePutServices) { + timerDequeuePutService.shutdown(); + } + + this.dequeueGetQueue.clear(); + this.enqueuePutQueue.clear(); + this.dequeuePutQueue.clear(); + this.scheduledExecutorService.shutdown(); + this.timerMessageKVStore.shutdown(); + } + + public void createTimer(byte[] columnFamily) { + this.timerGetMessageServices.add(new TimerGetMessageService(columnFamily)); + this.timerWarmServices.add(new TimerWarmService(columnFamily)); + } + + // ---------------------------------------------------------------------------------------------------------------- + private void initService() { + createTimer(TRANSACTION_COLUMN_FAMILY); + createTimer(POP_COLUMN_FAMILY); + createTimer(RocksDB.DEFAULT_COLUMN_FAMILY); + this.timerEnqueueGetService = new TimerEnqueueGetService(); + this.timerEnqueuePutService = new TimerEnqueuePutService(); + + int getThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1); + int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1); + this.timerDequeuePutServices = new TimerDequeuePutService[putThreadNum]; + this.timerDequeueGetServices = new TimerDequeueGetService[getThreadNum]; + for (int i = 0; i < timerDequeuePutServices.length; i++) { + timerDequeuePutServices[i] = new TimerDequeuePutService(); + } + for (int i = 0; i < timerDequeueGetServices.length; i++) { + timerDequeueGetServices[i] = new TimerDequeueGetService(); + } + + if (storeConfig.isTimerEnableDisruptor()) { + this.enqueuePutQueue = new DisruptorBlockingQueue<>(DEFAULT_CAPACITY); + this.dequeuePutQueue = new DisruptorBlockingQueue<>(DEFAULT_CAPACITY); + this.dequeueGetQueue = new DisruptorBlockingQueue<>(DEFAULT_CAPACITY); + } else { + this.enqueuePutQueue = new LinkedBlockingDeque<>(DEFAULT_CAPACITY); + this.dequeuePutQueue = new LinkedBlockingDeque<>(DEFAULT_CAPACITY); + this.dequeueGetQueue = new LinkedBlockingDeque<>(DEFAULT_CAPACITY); + } + } + + private void calcTimerDistribution() { + int slotNumber = precisionMs; + int rocksdbNumber = 0; + for (int i = 0; i < this.slotSize; i++) { + timerMetrics.updateDistPair(i, timerMessageKVStore.getMetricSize(rocksdbNumber, rocksdbNumber + slotNumber)); + rocksdbNumber += slotNumber; + } + } + + private String getServiceThreadName() { + String brokerIdentifier = ""; + if (TimerMessageRocksDBStore.this.messageStore instanceof DefaultMessageStore) { + DefaultMessageStore messageStore = (DefaultMessageStore) TimerMessageRocksDBStore.this.messageStore; + if (messageStore.getBrokerConfig().isInBrokerContainer()) { + brokerIdentifier = messageStore.getBrokerConfig().getIdentifier(); + } + } + return brokerIdentifier; + } + + private byte[] getColumnFamily(int flag) { + TimerMessageRecord.Flag tag; + switch (flag) { + case 1: + tag = TimerMessageRecord.Flag.TRANSACTION; + break; + case 2: + tag = TimerMessageRecord.Flag.POP; + break; + default: + tag = TimerMessageRecord.Flag.DEFAULT; + } + if (TimerMessageRecord.Flag.TRANSACTION == tag) { + return TRANSACTION_COLUMN_FAMILY; + } else if (tag == TimerMessageRecord.Flag.POP) { + return POP_COLUMN_FAMILY; + } else { + return RocksDB.DEFAULT_COLUMN_FAMILY; + } + } + + private class TimerEnqueueGetService extends ServiceThread { + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + while (!this.isStopped()) { + try { + if (!enqueue(0)) { + waitForRunning(100L * precisionMs / 1000); + } + } catch (Throwable e) { + TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + } + + private class TimerEnqueuePutService extends ServiceThread { + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + while (!this.isStopped() || !enqueuePutQueue.isEmpty()) { + try { + fetchAndPutTimerRequest(); + } catch (Throwable e) { + TimerMessageRocksDBStore.log.error("Unknown error", e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + + private List fetchTimerMessageRecord() throws InterruptedException { + List trs = null; + TimerMessageRecord firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS); + if (null != firstReq) { + trs = new ArrayList<>(16); + trs.add(firstReq); + while (true) { + TimerMessageRecord tmpReq = enqueuePutQueue.poll(3, TimeUnit.MILLISECONDS); + if (null == tmpReq) { + break; + } + trs.add(tmpReq); + if (trs.size() > 100) { + break; + } + } + } + return trs; + } + + private void fetchAndPutTimerRequest() throws InterruptedException { + List trs = fetchTimerMessageRecord(); + + if (null == trs) { + return; + } + Map> increaseMetric = new HashMap<>(); + Map> deleteMetric = new HashMap<>(); + Map> increase = new HashMap<>(); + Map> delete = new HashMap<>(); + List expired = new ArrayList<>(); + + for (TimerMessageRecord tr : trs) { + long delayTime = tr.getDelayTime(); + int flag = tr.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG) == null ? + 0 : Integer.parseInt(tr.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG)); + if (tr.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null) { + // Construct original message + tr.setDelayTime(delayTime / precisionMs % slotSize); + tr.setUniqueKey(TimerMessageStore.extractUniqueKey(tr.getMessageExt(). + getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY))); + deleteMetric.computeIfAbsent(delayTime % metricsIntervalMs, k -> new ArrayList<>()).add(tr); + delete.computeIfAbsent(flag, k -> new ArrayList<>()).add(tr); + } else if (delayTime <= System.currentTimeMillis()) { + expired.add(tr); + addMetric(tr.getMessageExt(), 1); + addMetric((int) (Long.parseLong(tr.getMessageExt().getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), 1); + } else { + tr.setDelayTime(delayTime / precisionMs % slotSize); + increaseMetric.computeIfAbsent(delayTime % metricsIntervalMs, k -> new ArrayList<>()).add(tr); + increase.computeIfAbsent(flag, k -> new ArrayList<>()).add(tr); + } + } + + while (!expired.isEmpty() && !dequeueGetQueue.offer(expired, 100, TimeUnit.MILLISECONDS)) { + } + for (Map.Entry> entry : increase.entrySet()) { + timerMessageKVStore.writeAssignRecords(getColumnFamily(entry.getKey()), entry.getValue(), -1); + } + for (Map.Entry> entry : delete.entrySet()) { + timerMessageKVStore.deleteAssignRecords(getColumnFamily(entry.getKey()), entry.getValue(), -1); + } + // sync cq read offset + timerMessageKVStore.writeAssignRecords(getColumnFamily(0), new ArrayList<>(), commitOffset.addAndGet(trs.size())); + + for (Map.Entry> entry : deleteMetric.entrySet()) { + long delayTime = entry.getKey(); + for (TimerMessageRecord record : entry.getValue()) { + addMetric(record.getMessageExt(), -1); + addMetric((int) (Long.parseLong(record.getMessageExt().getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), -1); + } + timerMessageKVStore.syncMetric(delayTime, -entry.getValue().size()); + } + for (Map.Entry> entry : increaseMetric.entrySet()) { + long delayTime = entry.getKey(); + for (TimerMessageRecord record : entry.getValue()) { + addMetric(record.getMessageExt(), 1); + addMetric((int) (Long.parseLong(record.getMessageExt().getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), 1); + } + timerMessageKVStore.syncMetric(delayTime, entry.getValue().size()); + } + } + } + + private class TimerDequeueGetService extends ServiceThread { + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + while (!this.isStopped() || !dequeueGetQueue.isEmpty()) { + try { + List timerMessageRecord = dequeueGetQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS); + if (null == timerMessageRecord || timerMessageRecord.isEmpty()) { + continue; + } + for (TimerMessageRecord record : timerMessageRecord) { + MessageExt messageExt = getMessageByCommitOffset(record.getOffsetPY(), record.getSizeReal()); + long delayedTime = Long.parseLong(messageExt.getProperty(TIMER_OUT_MS)); + record.setMessageExt(messageExt); + record.setDelayTime(delayedTime); + record.setUniqueKey(MessageClientIDSetter.getUniqID(messageExt)); + record.setRoll(delayedTime >= System.currentTimeMillis() + precisionMs * 3L); + } + + while (!dequeuePutQueue.offer(timerMessageRecord, 3, TimeUnit.SECONDS)) { + } + } catch (InterruptedException e) { + TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + } + + private class TimerDequeuePutService extends ServiceThread { + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + while (!this.isStopped() || !dequeuePutQueue.isEmpty()) { + try { + List timerMessageRecord = dequeuePutQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS); + int flag = 0; + long delayTime = -1; + if (null == timerMessageRecord || timerMessageRecord.isEmpty()) { + continue; + } + for (TimerMessageRecord record : timerMessageRecord) { + MessageExt msg = record.getMessageExt(); + MessageExtBrokerInner messageExtBrokerInner = convert(msg, record.isRoll()); + if (delayTime == -1) { + delayTime = Long.parseLong(record.getMessageExt().getProperty(TIMER_OUT_MS)); + } + flag = record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG) == null ? + 0 : Integer.parseInt(record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG)); + boolean processed = false; + int retryCount = 0; + + while (!processed && !isStopped()) { + int result = doPut(messageExtBrokerInner, record.isRoll()); + + if (result == PUT_OK) { + processed = true; + } else if (result == PUT_NO_RETRY) { + TimerMessageRocksDBStore.log.warn("Skipping message due to unrecoverable error. Msg: {}", msg); + processed = true; + } else { + retryCount++; + // Without enabling TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being discarded + if (!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) { + TimerMessageRocksDBStore.log.error("Message processing failed after {} retries. Msg: {}", retryCount, msg); + processed = true; + } else { + Thread.sleep(500L * precisionMs / 1000); + TimerMessageRocksDBStore.log.warn("Retrying to process message. Retry count: {}, Msg: {}", retryCount, msg); + } + } + } + addMetric(msg, -1); + addMetric((int) (Long.parseLong(msg.getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), -1); + } + timerMessageKVStore.syncMetric(delayTime % metricsIntervalMs, -timerMessageRecord.size()); + timerMessageKVStore.deleteAssignRecords(getColumnFamily(flag), timerMessageRecord, timerMessageRecord.get(0).getReadOffset()); + } catch (InterruptedException e) { + TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + } + + private class TimerGetMessageService extends ServiceThread { + private final byte[] columnFamily; + private long checkpoint; + + public TimerGetMessageService(byte[] columnFamily) { + this.columnFamily = columnFamily; + } + + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + this.checkpoint = timerMessageKVStore.getCheckpoint(columnFamily); + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + while (!this.isStopped()) { + try { + if (-1 == dequeue(checkpoint, columnFamily)) { + waitForRunning(100L * precisionMs / 1000); + } else { + checkpoint += precisionMs; + } + } catch (Throwable e) { + TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); + throw new RuntimeException(e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + } + + private class TimerWarmService extends ServiceThread { + private final byte[] columnFamily; + private long checkpoint; + + public TimerWarmService(byte[] columnFamily) { + this.columnFamily = columnFamily; + } + + @Override + public String getServiceName() { + return getServiceThreadName() + this.getClass().getSimpleName(); + } + + @Override + public void run() { + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service start"); + this.checkpoint = timerMessageKVStore.getCheckpoint(columnFamily) + precisionMs; + while (!this.isStopped()) { + try { + int checkpoint = warm(this.checkpoint, columnFamily); + if (-1 == checkpoint) { + waitForRunning(100L * precisionMs / 1000); + } else { + this.checkpoint += precisionMs; + } + } catch (Throwable e) { + TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); + } + } + TimerMessageRocksDBStore.log.info(this.getServiceName() + " service end"); + } + } + + // ----------------------------------------------------------------------------------------------------------------- + public boolean enqueue(int queueId) { + if (!storeConfig.getEnableTimerMessageOnRocksDB() || storeConfig.isTimerStopEnqueue()) { + return false; + } + + ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); + if (null == cq) { + return false; + } + if (readOffset.get() < cq.getMinOffsetInQueue()) { + log.warn("Timer currQueueOffset:{} is smaller than minOffsetInQueue:{}", + readOffset, cq.getMinOffsetInQueue()); + readOffset.set(cq.getMinOffsetInQueue()); + } + + ReferredIterator iterator = null; + try { + iterator = cq.iterateFrom(readOffset.get()); + if (null == iterator) { + return false; + } + + int i = 0; + while (iterator.hasNext()) { + i++; + try { + CqUnit cqUnit = iterator.next(); + long offsetPy = cqUnit.getPos(); + int sizePy = cqUnit.getSize(); + MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy); + + if (null != msgExt) { + lastEnqueueButExpiredTime = System.currentTimeMillis(); + lastEnqueueButExpiredStoreTime = msgExt.getStoreTimestamp(); + long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)); + TimerMessageRecord timerRequest = new TimerMessageRecord(delayedTime, + MessageClientIDSetter.getUniqID(msgExt), offsetPy, sizePy); + timerRequest.setMessageExt(msgExt); + + while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { + } + Attributes attributes = DefaultStoreMetricsManager.newAttributesBuilder() + .put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build(); + DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); + } + } catch (Exception e) { + // here may cause the message loss + log.warn("Unknown error in skipped in enqueuing", e); + throw e; + } + readOffset.incrementAndGet(); + } + return i > 0; + } catch (Exception e) { + log.error("Unknown exception in enqueuing", e); + } finally { + if (iterator != null) { + iterator.release(); + } + } + return false; + } + + private MessageExt getMessageByCommitOffset(long offsetPy, int sizePy) { + ThreadLocal bufferLocal = ThreadLocal.withInitial(() -> ByteBuffer.allocate(sizePy)); + for (int i = 0; i < 3; i++) { + MessageExt msgExt = null; + bufferLocal.get().position(0); + bufferLocal.get().limit(sizePy); + boolean res = messageStore.getData(offsetPy, sizePy, bufferLocal.get()); + if (res) { + bufferLocal.get().flip(); + msgExt = MessageDecoder.decode(bufferLocal.get(), true, false, false); + } + if (null == msgExt) { + log.warn("Fail to read msg from commitLog offsetPy:{} sizePy:{}", offsetPy, sizePy); + } else { + return msgExt; + } + } + return null; + } + + private int dequeue(long checkpoint, byte[] columnFamily) throws InterruptedException { + if (storeConfig.isTimerStopDequeue()) { + return -1; + } + if (checkpoint > System.currentTimeMillis()) { + return -1; + } + int slot = (int) (checkpoint / precisionMs % slotSize); + + List timerMessageRecords = timerMessageKVStore.scanRecords(columnFamily, slot, slot + 1); + if (timerMessageRecords == null || timerMessageRecords.isEmpty()) { + return 0; + } + + for (TimerMessageRecord timerMessageRecord : timerMessageRecords) { + timerMessageRecord.setReadOffset(checkpoint); + } + while (!dequeueGetQueue.offer(timerMessageRecords, 3, TimeUnit.SECONDS)) { + } + return 0; + } + + private int warm(long checkpoint, byte[] columnFamily) { + if (!storeConfig.isTimerWarmEnable()) { + return -1; + } + if (checkpoint < System.currentTimeMillis() + precisionMs) { + checkpoint = System.currentTimeMillis() + precisionMs; + } + if (checkpoint >= System.currentTimeMillis() + 3L * precisionMs) { + return -1; + } + + int slot = (int) (checkpoint / precisionMs % slotSize); + timerMessageKVStore.scanRecords(columnFamily, slot, slot + 1); + return 0; + } + + private MessageExtBrokerInner convert(MessageExt messageExt, boolean needRoll) { + if (needRoll) { + if (messageExt.getProperty(TIMER_ROLL_TIMES) != null) { + MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); + } else { + MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, 1 + ""); + } + } + MessageAccessor.putProperty(messageExt, TIMER_DEQUEUE_MS, System.currentTimeMillis() + ""); + return convertMessage(messageExt, needRoll); + } + + private MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties())); + TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); + long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + msgInner.setTagsCode(tagsCodeValue); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + msgInner.setWaitStoreMsgOK(false); + + if (needRoll) { + msgInner.setTopic(msgExt.getTopic()); + msgInner.setQueueId(msgExt.getQueueId()); + } else { + msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); + msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); + } + return msgInner; + } + + private int doPut(MessageExtBrokerInner message, boolean roll) { + if (!roll && null != message.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) { + log.warn("Trying do put delete timer msg:[{}] roll:[{}]", message, roll); + return PUT_NO_RETRY; + } + + PutMessageResult putMessageResult = messageStore.putMessage(message); + if (putMessageResult != null && putMessageResult.getPutMessageStatus() != null) { + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + if (brokerStatsManager != null) { + brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); + if (putMessageResult.getAppendMessageResult() != null) { + brokerStatsManager.incTopicPutSize(message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); + } + brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); + } + return PUT_OK; + + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + case WHEEL_TIMER_NOT_ENABLE: + case WHEEL_TIMER_MSG_ILLEGAL: + return PUT_NO_RETRY; + + case SERVICE_NOT_AVAILABLE: + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case OS_PAGE_CACHE_BUSY: + case CREATE_MAPPED_FILE_FAILED: + case SLAVE_NOT_AVAILABLE: + return PUT_NEED_RETRY; + + case UNKNOWN_ERROR: + default: + if (storeConfig.isTimerSkipUnknownError()) { + log.warn("Skipping message due to unknown error, msg: {}", message); + return PUT_NO_RETRY; + } else { + return PUT_NEED_RETRY; + } + } + } + return PUT_NEED_RETRY; + } + + private void addMetric(MessageExt msg, int value) { + if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) { + return; + } + if (msg.getProperty(TIMER_ENQUEUE_MS) != null + && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) { + return; + } + timerMetrics.addAndGet(msg, value); + } + + private void addMetric(int delayTime, int value) { + timerMetrics.updateDistPair(delayTime, value); + } + + public TimerMetrics getTimerMetrics() { + return this.timerMetrics; + } + + public long getCommitOffset() { + return commitOffset.get(); + } + + public long getAllCongestNum() { + return timerMessageKVStore.getMetricSize(0, metricsIntervalMs); + } + + public long getCongestNum(long deliverTimeMs) { + long slot = deliverTimeMs / precisionMs % slotSize; + return timerMessageKVStore.getMetricSize(slot * precisionMs, (slot + 1) * precisionMs); + } + + public long getEnqueueBehindMessages() { + long tmpQueueOffset = readOffset.get(); + ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0); + long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); + return maxOffsetInQueue - tmpQueueOffset; + } + + public long getEnqueueBehindMillis() { + if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) { + return System.currentTimeMillis() - lastEnqueueButExpiredStoreTime; + } + return 0; + } + + public long getDequeueBehindMillis() { + return System.currentTimeMillis() - timerGetMessageServices.get(0).checkpoint; + } + + public TimerMessageKVStore getTimerMessageKVStore() { + return timerMessageKVStore; + } + + public long getMetricsIntervalMs() { + return metricsIntervalMs; + } +} \ No newline at end of file diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java new file mode 100644 index 00000000000..767433f1c55 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.timer.rocksdb; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageArrivingListener; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.store.timer.StoreTestUtils; +import org.apache.rocketmq.store.timer.TimerMessageStore; +import org.apache.rocketmq.store.timer.TimerMetrics; +import org.junit.Assert; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore.TIMER_TOPIC; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TimerMessageRocksDBStoreTest { + MessageStore messageStore; + MessageStoreConfig storeConfig; + int precisionMs = 1; + AtomicInteger counter = new AtomicInteger(0); + private SocketAddress bornHost; + private SocketAddress storeHost; + Set baseDirs = new HashSet<>(); + + @Before + public void setUp() throws Exception { + String baseDir = StoreTestUtils.createBaseDir(); + this.baseDirs.add(baseDir); + storeConfig = new MessageStoreConfig(); + storeConfig.setEnableTimerMessageOnRocksDB(true); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog"); + storeConfig.setTimerPrecision(1); + precisionMs = storeConfig.getTimerPrecisionMs(); + messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest", + false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); + + storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + assertTrue(messageStore.load()); + messageStore.start(); + } + + private TimerMessageRocksDBStore createTimerMessageRocksDBStore(String rootDir) { + if (null == rootDir) { + rootDir = StoreTestUtils.createBaseDir(); + baseDirs.add(rootDir); + } + TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics"); + return new TimerMessageRocksDBStore(messageStore, storeConfig, timerMetrics, null); + } + + public MessageExtBrokerInner buildMessage(long delayedMs, String topic, boolean relative) { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic(topic); + msg.setQueueId(0); + msg.setTags(counter.incrementAndGet() + ""); + msg.setKeys("timer"); + if (relative) { + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_DELAY_SEC, delayedMs / 1000 + ""); + } else { + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_DELIVER_MS, delayedMs + ""); + } + msg.setBody(new byte[100]); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setBornHost(bornHost); + msg.setStoreHost(storeHost); + MessageClientIDSetter.setUniqID(msg); + TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msg.getSysFlag()); + long tagsCodeValue = + MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msg.getTags()); + msg.setTagsCode(tagsCodeValue); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + return msg; + } + + public ByteBuffer getOneMessage(String topic, int queue, long offset, int timeout) throws Exception { + int retry = timeout / 100; + while (retry-- > 0) { + GetMessageResult getMessageResult = messageStore.getMessage("TimerGroup", topic, queue, offset, 1, null); + if (null != getMessageResult && GetMessageStatus.FOUND == getMessageResult.getStatus()) { + return getMessageResult.getMessageBufferList().get(0); + } + Thread.sleep(100); + } + return null; + } + + private PutMessageResult transformTimerMessage(TimerMessageRocksDBStore timerMessageStore, MessageExtBrokerInner msg) { + //do transform + int delayLevel = msg.getDelayTimeLevel(); + long deliverMs; + + try { + if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { + deliverMs = System.currentTimeMillis() + Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000L; + } else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) { + deliverMs = System.currentTimeMillis() + Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS)); + } else { + deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS)); + } + } catch (Exception e) { + return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); + } + if (deliverMs > System.currentTimeMillis()) { + if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > storeConfig.getTimerMaxDelaySec() * 1000L) { + return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); + } + + int timerPrecisionMs = storeConfig.getTimerPrecisionMs(); + if (deliverMs % timerPrecisionMs != 0) { + deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs; + } + + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + ""); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + msg.setTopic(TIMER_TOPIC); + msg.setQueueId(0); + } else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) { + return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null); + } + return null; + } + + @Test + public void testDoNormalTimer() throws Exception { + Assume.assumeFalse(MixAll.isUnix()); + Assume.assumeFalse(MixAll.isWindows()); + String topic = "TimerRocksdbTest_testPutTimerMessage"; + + final TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + long commitOffset = timerMessageStore.getCommitOffset(); + long curr = System.currentTimeMillis() / precisionMs * precisionMs; + long delayMs = curr + 3000; + storeConfig.setTimerStopDequeue(true); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + MessageExtBrokerInner inner = buildMessage((i % 2 == 0) ? 3000 : delayMs, topic + i, i % 2 == 0); + transformTimerMessage(timerMessageStore, inner); + PutMessageResult putMessageResult = messageStore.putMessage(inner); + assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + } + } + + // Wait until messages have been wrote to TimerLog but the slot (delayMs) hasn't expired. + await().atMost(2000, TimeUnit.MILLISECONDS).until(new Callable() { + @Override + public Boolean call() { + return timerMessageStore.getCommitOffset() - commitOffset == 10 * 5; + } + }); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(5, timerMessageStore.getTimerMetrics().getTimingCount(topic + i)); + } + storeConfig.setTimerStopDequeue(false); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + ByteBuffer msgBuff = getOneMessage(topic + i, 0, j, 4000); + assertNotNull(msgBuff); + MessageExt msgExt = MessageDecoder.decode(msgBuff); + assertNotNull(msgExt); + assertEquals(topic + i, msgExt.getTopic()); + } + } + for (int i = 0; i < 10; i++) { + Assert.assertEquals(0, timerMessageStore.getTimerMetrics().getTimingCount(topic + i)); + } + timerMessageStore.shutdown(); + } + + @Test + public void testPutExpiredTimerMessage() throws Exception { + // Skip on Mac to make CI pass + Assume.assumeFalse(MixAll.isMac()); + Assume.assumeFalse(MixAll.isWindows()); + + String topic = "TimerRocksdbTest_testPutExpiredTimerMessage"; + + TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + + long delayMs = System.currentTimeMillis() - 2L * precisionMs; + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + PutMessageResult putMessageResult = messageStore.putMessage(inner); + assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + } + + long curr = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) { + ByteBuffer msgBuff = getOneMessage(topic, 0, i, 1000); + assertNotNull(msgBuff); + assertTrue(System.currentTimeMillis() - curr < 200); + } + timerMessageStore.shutdown(); + } + + @Test + public void testDeleteTimerMessage() throws Exception { + String topic = "TimerRocksdbTest_testDeleteTimerMessage"; + + TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + + long delayMs = System.currentTimeMillis() + 3000; + String uniqKey = null; + for (int i = 0; i < 5; i++) { + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + if (null == uniqKey) { + uniqKey = MessageClientIDSetter.getUniqID(inner); + } + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + } + + MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, delMsg); + MessageAccessor.putProperty(delMsg, MessageConst.PROPERTY_TIMER_DEL_UNIQKEY, TimerMessageStore.buildDeleteKey(topic, uniqKey)); + delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties())); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus()); + + // The first one should have been deleted. + for (int i = 0; i < 4; i++) { + ByteBuffer msgBuff1 = getOneMessage(topic, 0, i, 4000); + assertNotNull(msgBuff1); + MessageExt msgExt = MessageDecoder.decode(msgBuff1); + assertNotNull(msgExt); + } + // The last one should be null. + assertNull(getOneMessage(topic, 0, 4, 500)); + timerMessageStore.shutdown(); + } + + @Test + public void testPutDeleteTimerMessage() throws Exception { + String topic = "TimerRocksdbTest_testPutDeleteTimerMessage"; + + final TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + + long curr = System.currentTimeMillis() / precisionMs * precisionMs; + final long delayMs = curr + 1000; + for (int i = 0; i < 5; i++) { + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + } + + MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, delMsg); + MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, "XXX+1"); + delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties())); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus()); + + for (int i = 0; i < 5; i++) { + ByteBuffer msgBuff = getOneMessage(topic, 0, i, 2000); + assertNotNull(msgBuff); + } + + // Test put expired delete msg. + MessageExtBrokerInner expiredInner = buildMessage(System.currentTimeMillis() - 100, topic, false); + MessageAccessor.putProperty(expiredInner, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, "XXX"); + PutMessageResult putMessageResult = transformTimerMessage(timerMessageStore, expiredInner); + assertEquals(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, putMessageResult.getPutMessageStatus()); + timerMessageStore.shutdown(); + } + + @Test + public void testExtractUniqueKey() { + String deleteKey = TimerMessageStore.buildDeleteKey("topic", "123456"); + assertEquals("123456", TimerMessageStore.extractUniqueKey(deleteKey)); + } + + @Test + public void testGetTimerMetrics() { + String topic = "TimerRocksdbTest_testGetTimerMetrics"; + TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + storeConfig.setTimerStopDequeue(true); + long delayMs = System.currentTimeMillis() + 3000; + + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + } + await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore(). + getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(), + delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 10); + + storeConfig.setTimerStopDequeue(false); + await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore(). + getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(), + delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 0); + timerMessageStore.shutdown(); + } + + private class MyMessageArrivingListener implements MessageArrivingListener { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, + byte[] filterBitMap, Map properties) { + } + } + + @After + public void destroy() { + if (null != messageStore) { + messageStore.shutdown(); + messageStore.destroy(); + } + for (String baseDir : baseDirs) { + StoreTestUtils.deleteFile(baseDir); + } + } +}