Skip to content

Commit

Permalink
[ISSUE apache#8979] Add configurable switch for timer message retry l…
Browse files Browse the repository at this point in the history
…ogic (apache#8980)
  • Loading branch information
chi3316 authored Dec 5, 2024
1 parent 01a5123 commit d1fd7af
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class MessageStoreConfig {
private boolean timerSkipUnknownError = false;
private boolean timerWarmEnable = false;
private boolean timerStopDequeue = false;
private boolean timerEnableRetryUntilSuccess = false;
private int timerCongestNumEachSlot = Integer.MAX_VALUE;

private int timerMetricSmallThreshold = 1000000;
Expand Down Expand Up @@ -1689,6 +1690,14 @@ public void setTimerSkipUnknownError(boolean timerSkipUnknownError) {
this.timerSkipUnknownError = timerSkipUnknownError;
}

public boolean isTimerEnableRetryUntilSuccess() {
return timerEnableRetryUntilSuccess;
}

public void setTimerEnableRetryUntilSuccess(boolean timerEnableRetryUntilSuccess) {
this.timerEnableRetryUntilSuccess = timerEnableRetryUntilSuccess;
}

public boolean isTimerWarmEnable() {
return timerWarmEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,46 +1097,44 @@ public int doPut(MessageExtBrokerInner message, boolean roll) throws Exception {
putMessageResult = messageStore.putMessage(message);
}

int retryNum = 0;
while (retryNum < 3) {
if (null == putMessageResult || null == putMessageResult.getPutMessageStatus()) {
retryNum++;
} else {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
if (putMessageResult.getAppendMessageResult() != null) {
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
}
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
if (brokerStatsManager != null) {
brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
if (putMessageResult.getAppendMessageResult() != null) {
brokerStatsManager.incTopicPutSize(message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
return PUT_OK;
case SERVICE_NOT_AVAILABLE:
return PUT_NEED_RETRY;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;

case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
case WHEEL_TIMER_NOT_ENABLE:
case WHEEL_TIMER_MSG_ILLEGAL:
return PUT_NO_RETRY;

case SERVICE_NOT_AVAILABLE:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case OS_PAGE_CACHE_BUSY:
case CREATE_MAPPED_FILE_FAILED:
case SLAVE_NOT_AVAILABLE:
return PUT_NEED_RETRY;

case UNKNOWN_ERROR:
default:
if (storeConfig.isTimerSkipUnknownError()) {
LOGGER.warn("Skipping message due to unknown error, msg: {}", message);
return PUT_NO_RETRY;
case CREATE_MAPPED_FILE_FAILED:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case OS_PAGE_CACHE_BUSY:
case SLAVE_NOT_AVAILABLE:
case UNKNOWN_ERROR:
default:
retryNum++;
}
}
Thread.sleep(50);
if (escapeBridgeHook != null) {
putMessageResult = escapeBridgeHook.apply(message);
} else {
putMessageResult = messageStore.putMessage(message);
} else {
holdMomentForUnknownError();
return PUT_NEED_RETRY;
}
}
LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} msg:{}", retryNum, putMessageResult, message);
}
return PUT_NO_RETRY;
return PUT_NEED_RETRY;
}

public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) {
Expand Down Expand Up @@ -1471,7 +1469,6 @@ protected boolean isState(int state) {
}

public class TimerDequeuePutMessageService extends AbstractStateService {

@Override
public String getServiceName() {
return getServiceThreadName() + this.getClass().getSimpleName();
Expand All @@ -1481,48 +1478,71 @@ public String getServiceName() {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");

while (!this.isStopped() || dequeuePutQueue.size() != 0) {
try {
setState(AbstractStateService.WAITING);
TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);
if (null == tr) {
continue;
}

setState(AbstractStateService.RUNNING);
boolean doRes = false;
boolean tmpDequeueChangeFlag = false;

try {
while (!isStopped() && !doRes) {
while (!isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;
}

try {
perfCounterTicks.startTick(DEQUEUE_PUT);

MessageExt msgExt = tr.getMsg();
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));

if (tr.getEnqueueTime() == Long.MAX_VALUE) {
// never enqueue, mark it.
// Never enqueue, mark it.
MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
}

addMetric(msgExt, -1);
MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;

boolean processed = false;
int retryCount = 0;

while (!processed && !isStopped()) {
int result = doPut(msg, needRoll(tr.getMagic()));

if (result == PUT_OK) {
processed = true;
} else if (result == PUT_NO_RETRY) {
TimerMessageStore.LOGGER.warn("Skipping message due to unrecoverable error. Msg: {}", msg);
processed = true;
} else {
retryCount++;
// Without enabling TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being discarded
if (!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) {
TimerMessageStore.LOGGER.error("Message processing failed after {} retries. Msg: {}", retryCount, msg);
processed = true;
} else {
Thread.sleep(500L * precisionMs / 1000);
TimerMessageStore.LOGGER.warn("Retrying to process message. Retry count: {}, Msg: {}", retryCount, msg);
}
}
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
Thread.sleep(500L * precisionMs / 1000);
}

perfCounterTicks.endTick(DEQUEUE_PUT);
break;

} catch (Throwable t) {
LOGGER.info("Unknown error", t);
TimerMessageStore.LOGGER.info("Unknown error", t);
if (storeConfig.isTimerSkipUnknownError()) {
doRes = true;
break;
} else {
holdMomentForUnknownError();
}
Expand All @@ -1531,7 +1551,6 @@ public void run() {
} finally {
tr.idempotentRelease(!tmpDequeueChangeFlag);
}

} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
}
Expand Down
Loading

0 comments on commit d1fd7af

Please sign in to comment.