Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9141] [RIP-75] Supports timer message on RocksDB #9142

Open
wants to merge 62 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
d692c56
init: Initialization preparations
Jan 13, 2025
382d846
init: Initialization preparations
Jan 13, 2025
c5905e4
init: Initialization preparations
Jan 13, 2025
69305b1
init: prepare for metric
Jan 13, 2025
cbfdfdc
init: Initialization preparations
Jan 14, 2025
0642c1d
init: Initialization preparations service
Jan 14, 2025
2fdbd7e
init: prepare for metric
Jan 14, 2025
efd3071
init: Initialization preparations service
Jan 15, 2025
1e41aa4
feat: Improve the message flow service
Jan 15, 2025
b0d6044
feat: Adaptation logic
Jan 15, 2025
8bb252c
feat: Adaptation logic
Jan 15, 2025
fc4c7a5
feat: Adaptation logic
Jan 15, 2025
6bc279a
init: prepare for metric
Jan 15, 2025
37bb349
init: prepare for metric
Jan 15, 2025
43df4ea
feat: Adaptation logic
Jan 16, 2025
ecff650
feat: Adaptation logic
Jan 16, 2025
d49f8cf
feat: Adaptation logic
Jan 16, 2025
7a858d7
feat: Adaptation logic
Jan 16, 2025
9a37312
feat: Adaptation logic
Jan 16, 2025
4b08bf9
feat: Adaptation logic
Jan 16, 2025
f29a325
feat: Adaptation logic
Jan 16, 2025
5588a3b
feat: Adaptation logic
Jan 17, 2025
4779614
feat: Adaptation logic
Jan 17, 2025
2d066ea
feat: Adaptation logic
Jan 17, 2025
20ce52a
feat: Adaptation logic
Jan 17, 2025
57bd15d
feat: Adaptation logic
Jan 17, 2025
29341ef
Merge branch 'apache:develop' into rocksdb_timer
3424672656 Jan 17, 2025
29e78c3
fix test
Jan 17, 2025
b2a5edb
fix test
Jan 17, 2025
4700b83
fix test
Jan 17, 2025
f458c23
fix test
Jan 17, 2025
92f210a
fix test
Jan 17, 2025
8f6efef
fix test
Jan 17, 2025
4823690
fix test
Jan 17, 2025
c40e210
fix test
Jan 17, 2025
af0f4af
Optimize some redundant code and logic
Jan 20, 2025
1b5c4b0
Move the new class to TimerMessageStore
Jan 20, 2025
42d432a
Move the new class to TimerMessageStore
Jan 20, 2025
3c4e922
Move the new class to TimerMessageStore
Jan 20, 2025
1a1252e
Move the new class to TimerMessageStore
Jan 20, 2025
27e0d32
Move the new class to TimerMessageStore
Jan 20, 2025
28acd4b
Move the new class to TimerMessageStore
Jan 20, 2025
0d445e8
fix fail test
Jan 20, 2025
4842454
fix fail test
Jan 20, 2025
e29c0c8
fix fail test
Jan 20, 2025
f795ba0
fix fail test
Jan 20, 2025
5a675f9
fix fail test
Jan 20, 2025
2741799
fix fail test
Jan 20, 2025
e424e1f
fix fail test
Jan 20, 2025
ca49bf6
fix fail test
Jan 20, 2025
07e720b
fix fail test
Jan 20, 2025
8bf70a4
fix fail test
Jan 21, 2025
d22fd7f
fix fail test
Jan 21, 2025
af7753b
fix fail test
Jan 21, 2025
4d1af12
fix fail test
Jan 21, 2025
4803ec7
Optimized site maintenance
Jan 22, 2025
5fe5a6c
init metric calculate
Jan 22, 2025
8e155e3
init metric calculate
Jan 22, 2025
593307f
fix test
Jan 22, 2025
64d3f7a
Implement millisecond level indicator statistics
Jan 23, 2025
27fece9
Adaptive correlation index
Jan 23, 2025
799153b
Add metrics to test accurately to the millisecond level
Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES";
public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS";
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DEL_FLAG = "TIMER_DEL_FLAG";
3424672656 marked this conversation as resolved.
Show resolved Hide resolved
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
public static final String PROPERTY_CRC32 = "__CRC32#";
Expand Down Expand Up @@ -151,6 +152,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_TIMER_ROLL_TIMES);
STRING_HASH_SET.add(PROPERTY_TIMER_OUT_MS);
STRING_HASH_SET.add(PROPERTY_TIMER_DEL_UNIQKEY);
STRING_HASH_SET.add(PROPERTY_TIMER_DEL_FLAG);
STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
STRING_HASH_SET.add(PROPERTY_BORN_HOST);
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@ public void setRocksdbCompressionType(String compressionType) {
**/
private boolean useABSLock = false;

/**
* Maximum number of messages to be read each time
* -1 : read all messages
*/
private int readCountTimerOnRocksDB = -1;

/**
* When enabled, the scheduled message is using rocksdb
*/
private boolean enableTimerMessageOnRocksDB = false;
3424672656 marked this conversation as resolved.
Show resolved Hide resolved

public boolean isRocksdbCQDoubleWriteEnable() {
return rocksdbCQDoubleWriteEnable;
}
Expand Down Expand Up @@ -1714,6 +1725,10 @@ public boolean isTimerStopDequeue() {
return timerStopDequeue;
}

public void setTimerStopDequeue(boolean timerStopDequeue) {
this.timerStopDequeue = timerStopDequeue;
}

public int getTimerMetricSmallThreshold() {
return timerMetricSmallThreshold;
}
Expand Down Expand Up @@ -1950,4 +1965,20 @@ public void setUseABSLock(boolean useABSLock) {
public boolean getUseABSLock() {
return useABSLock;
}

public void setReadCountTimerOnRocksDB(int readCountTimerOnRocksDB) {
this.readCountTimerOnRocksDB = readCountTimerOnRocksDB;
}

public int getReadCountTimerOnRocksDB() {
return readCountTimerOnRocksDB;
}

public void setEnableTimerMessageOnRocksDB(boolean enableTimerMessageOnRocksDB) {
this.enableTimerMessageOnRocksDB = enableTimerMessageOnRocksDB;
}

public boolean getEnableTimerMessageOnRocksDB() {
return enableTimerMessageOnRocksDB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,108 @@ public static ColumnFamilyOptions createPopCFOptions() {
.setOptimizeFiltersForHits(true);
}

public static ColumnFamilyOptions createTimerCFOptions() {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
.setFormatVersion(5)
.setIndexType(IndexType.kBinarySearch)
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
.setDataBlockHashTableUtilRatio(0.75)
.setBlockSize(128 * SizeUnit.KB)
.setMetadataBlockSize(4 * SizeUnit.KB)
.setFilterPolicy(new BloomFilter(16, false))
.setCacheIndexAndFilterBlocks(false)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(false)
.setPinTopLevelIndexAndFilter(true)
.setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false))
.setWholeKeyFiltering(true);

CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal()
.setSizeRatio(100)
.setMaxSizeAmplificationPercent(25)
.setAllowTrivialMove(true)
.setMinMergeWidth(2)
.setMaxMergeWidth(Integer.MAX_VALUE)
.setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize)
.setCompressionSizePercent(-1);

//noinspection resource
return new ColumnFamilyOptions()
.setMaxWriteBufferNumber(6)
.setWriteBufferSize(128 * SizeUnit.MB)
.setMinWriteBufferNumberToMerge(1)
.setTableFormatConfig(blockBasedTableConfig)
.setMemTableConfig(new SkipListMemTableConfig())
.setCompressionType(CompressionType.NO_COMPRESSION)
.setBottommostCompressionType(CompressionType.NO_COMPRESSION)
.setNumLevels(7)
.setCompactionPriority(CompactionPriority.MinOverlappingRatio)
.setCompactionStyle(CompactionStyle.UNIVERSAL)
.setCompactionOptionsUniversal(compactionOption)
.setMaxCompactionBytes(100 * SizeUnit.GB)
.setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB)
.setHardPendingCompactionBytesLimit(256 * SizeUnit.GB)
.setLevel0FileNumCompactionTrigger(2)
.setLevel0SlowdownWritesTrigger(8)
.setLevel0StopWritesTrigger(10)
.setTargetFileSizeBase(256 * SizeUnit.MB)
.setTargetFileSizeMultiplier(2)
.setMergeOperator(new StringAppendOperator())
.setReportBgIoStats(true)
.setOptimizeFiltersForHits(true);
}

public static ColumnFamilyOptions createTimerMetricCFOptions() {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
.setFormatVersion(5)
.setIndexType(IndexType.kBinarySearch)
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
.setDataBlockHashTableUtilRatio(0.75)
.setBlockSize(4 * SizeUnit.KB)
.setMetadataBlockSize(4 * SizeUnit.KB)
.setFilterPolicy(new BloomFilter(16, false))
.setCacheIndexAndFilterBlocks(false)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(false)
.setPinTopLevelIndexAndFilter(true)
.setBlockCache(new LRUCache(64 * SizeUnit.MB, 8, false))
.setWholeKeyFiltering(true);

CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal()
.setSizeRatio(100)
.setMaxSizeAmplificationPercent(25)
.setAllowTrivialMove(true)
.setMinMergeWidth(2)
.setMaxMergeWidth(Integer.MAX_VALUE)
.setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize)
.setCompressionSizePercent(-1);

//noinspection resource
return new ColumnFamilyOptions()
.setMaxWriteBufferNumber(4)
.setWriteBufferSize(64 * SizeUnit.MB)
.setMinWriteBufferNumberToMerge(1)
.setTableFormatConfig(blockBasedTableConfig)
.setMemTableConfig(new SkipListMemTableConfig())
.setCompressionType(CompressionType.NO_COMPRESSION)
.setBottommostCompressionType(CompressionType.NO_COMPRESSION)
.setNumLevels(7)
.setCompactionPriority(CompactionPriority.MinOverlappingRatio)
.setCompactionStyle(CompactionStyle.UNIVERSAL)
.setCompactionOptionsUniversal(compactionOption)
.setMaxCompactionBytes(100 * SizeUnit.GB)
.setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB)
.setHardPendingCompactionBytesLimit(256 * SizeUnit.GB)
.setLevel0FileNumCompactionTrigger(2)
.setLevel0SlowdownWritesTrigger(8)
.setLevel0StopWritesTrigger(10)
.setTargetFileSizeBase(128 * SizeUnit.MB)
.setTargetFileSizeMultiplier(2)
.setMergeOperator(new StringAppendOperator())
.setReportBgIoStats(true)
.setOptimizeFiltersForHits(true);
}

/**
* Create a rocksdb db options, the user must take care to close it after closing db.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.apache.rocketmq.store.util.PerfCounter;

public class TimerMessageStore {
Expand Down Expand Up @@ -158,6 +159,7 @@ public class TimerMessageStore {
protected volatile boolean shouldRunningDequeue;
private final BrokerStatsManager brokerStatsManager;
private Function<MessageExtBrokerInner, PutMessageResult> escapeBridgeHook;
private TimerMessageRocksDBStore timerRocksDBStore;

public TimerMessageStore(final MessageStore messageStore, final MessageStoreConfig storeConfig,
TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics,
Expand Down Expand Up @@ -212,6 +214,7 @@ protected ByteBuffer initialValue() {
dequeuePutQueue = new LinkedBlockingDeque<>(DEFAULT_CAPACITY);
}
this.brokerStatsManager = brokerStatsManager;
this.timerRocksDBStore = new TimerMessageRocksDBStore(messageStore, storeConfig, timerMetrics, brokerStatsManager);
}

public void initService() {
Expand All @@ -238,6 +241,7 @@ public boolean load() {
this.initService();
boolean load = timerLog.load();
load = load && this.timerMetrics.load();
load = load && timerRocksDBStore.load();
recover();
calcTimerDistribution();
return load;
Expand Down Expand Up @@ -464,6 +468,7 @@ public static boolean isMagicOK(int magic) {
}

public void start() {
timerRocksDBStore.start();
this.shouldStartTime = storeConfig.getDisappearTimeAfterStart() + System.currentTimeMillis();
maybeMoveWriteTime();
enqueueGetService.start();
Expand Down Expand Up @@ -552,6 +557,7 @@ public void shutdown() {
timerWheel.shutdown(false);

this.scheduler.shutdown();
this.timerRocksDBStore.shutdown();
UtilAll.cleanBuffer(this.bufferLocal.get());
this.bufferLocal.remove();
}
Expand Down Expand Up @@ -653,7 +659,7 @@ public void holdMomentForUnknownError() {
}

public boolean enqueue(int queueId) {
if (storeConfig.isTimerStopEnqueue()) {
if (storeConfig.isTimerStopEnqueue() || storeConfig.getEnableTimerMessageOnRocksDB()) {
return false;
}
if (!isRunningEnqueue()) {
Expand Down Expand Up @@ -1735,6 +1741,9 @@ public long getCongestNum(long deliverTimeMs) {
}

public boolean isReject(long deliverTimeMs) {
if (storeConfig.getEnableTimerMessageOnRocksDB()) {
return false;
}
long congestNum = timerWheel.getNum(deliverTimeMs);
if (congestNum <= storeConfig.getTimerCongestNumEachSlot()) {
return false;
Expand Down Expand Up @@ -1915,4 +1924,12 @@ public TimerCheckpoint getTimerCheckpoint() {
public static String buildDeleteKey(String realTopic, String uniqueKey) {
return realTopic + "+" + uniqueKey;
}

public static String extractUniqueKey(String deleteKey) {
int separatorIndex = deleteKey.indexOf('+');
if (separatorIndex == -1) {
throw new IllegalArgumentException("Invalid deleteKey format");
}
return deleteKey.substring(separatorIndex + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public long updateDistPair(int period, int value) {
return distPair.getCount().addAndGet(value);
}

public void resetDistPair(int period, int value) {
Metric distPair = getDistPair(period);
distPair.getCount().set(value);
}

public long addAndGet(MessageExt msg, int value) {
String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
Metric pair = getTopicPair(topic);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.timer.rocksdb;

import java.util.List;

public interface TimerMessageKVStore {

/**
* Start the timer message kv store.
*/
boolean start();

/**
* Shutdown the timer message kv store.
*/
boolean shutdown();

/**
* Get the file path of the timer message kv store.
*/
String getFilePath();

/**
* Write the timer message records to the timer message kv store.
* @param columnFamily the column family of the timer message kv store.
* @param consumerRecordList the list of timer message records to be written.
* @param offset the cq offset of the timer message records to be written.
* @param timestamp the key of the timer message metric column family.
*/
void writeAssignRecords(byte[] columnFamily, List<TimerMessageRecord> consumerRecordList, long offset, int timestamp);

/**
* Delete the timer message records from the timer message kv store.
* @param columnFamily the column family of the timer message kv store.
* @param consumerRecordList the list of timer message records to be deleted.
* @param timestamp the key of the timer message metric column family.
*/
void deleteAssignRecords(byte[] columnFamily, List<TimerMessageRecord> consumerRecordList, int timestamp, long offset);

/**
* Scan the timer message records from the timer message kv store.
* @param columnFamily the column family of the timer message kv store.
* @param lowerTime the lower time of the timer message records to be scanned.
* @param upperTime the upper time of the timer message records to be scanned.
* @return the list of timer message records.
*/
3424672656 marked this conversation as resolved.
Show resolved Hide resolved
List<TimerMessageRecord> scanRecords(byte[] columnFamily, long lowerTime, long upperTime, long timestamp);

/**
* Get the commit offset of the timer message kv store from cq.
* @return the commit offset of the timer message kv store.
*/
long getCommitOffset();

/**
* Get the metric size of the timer message kv store.
* @param lowerTime the lower time of the timer message records to be scanned.
* @param upperTime the upper time of the timer message records to be scanned.
* @return sum.
*/
int getMetricSize(int lowerTime, int upperTime);

/**
* Get the checkpoint of the timer message kv store.
* @param columnFamily the column family of the timer message kv store.
* @return the checkpoint of the timer message kv store.
*/
long getCheckpoint(byte[] columnFamily);
}
Loading
Loading