Skip to content

Commit

Permalink
ARTEMIS-5093 support configurable onMessage timeout w/closing consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Oct 10, 2024
1 parent 6186805 commit 2b6fbd1
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ServerLocatorConfig {
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public int onMessageCloseTimeout = ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public final class ActiveMQClient {

public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;

public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000;

public static final boolean DEFAULT_XA = false;

public static final boolean DEFAULT_HA = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,25 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
ServerLocator setInitialMessagePacketSize(int size);

/**
* Returns the timeout for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* Value is in milliseconds, default value is {@link ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}.
*
* @return the timeout for onMessage completion when closing ClientConsumers created through this factory
*/
int getOnMessageCloseTimeout();

/**
* Sets the timeout for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* Value must be greater than 0.
*
* @param onMessageCloseTimeout how long to wait for the ClientConsumer's MessageHandler's onMessage method to finish before closing or stopping the ClientConsumer.
* @return this ServerLocator
*/
ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout);

/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 212001, value = "Error on clearing messages", level = LogMessage.Level.WARN)
void errorClearingMessages(Throwable e);

@LogMessage(id = 212002, value = "Timed out waiting for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing();
@LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing(long duration);

@LogMessage(id = 212003, value = "Unable to close session", level = LogMessage.Level.WARN)
void unableToCloseSession(Exception e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;

private final int onMessageCloseTimeout;

public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
final SimpleString queueName,
Expand All @@ -150,7 +152,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,
final Executor flowControlExecutor,
final SessionContext sessionContext,
final ClientSession.QueueQuery queueInfo,
final ClassLoader contextClassLoader) {
final ClassLoader contextClassLoader,
final int onMessageCloseTimeout) {
this.consumerContext = consumerContext;

this.queueName = queueName;
Expand Down Expand Up @@ -181,6 +184,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,

this.flowControlExecutor = flowControlExecutor;

this.onMessageCloseTimeout = onMessageCloseTimeout;

if (logger.isTraceEnabled()) {
logger.trace("{}:: being created at", this, new Exception("trace"));
}
Expand Down Expand Up @@ -911,10 +916,10 @@ private void waitForOnMessageToComplete(boolean waitForOnMessage) {

sessionExecutor.execute(future);

boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
boolean ok = future.await(onMessageCloseTimeout);

if (!ok) {
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ private ClientSession createSessionInternal(final String rawUsername,

SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);

ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), serverLocator.getOnMessageCloseTimeout(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());

synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

private final String groupID;

private volatile int onMessageCloseTimeout;

private volatile boolean inClose;

private volatile boolean mayAttemptToFailover = true;
Expand Down Expand Up @@ -189,6 +191,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final int compressionLevel,
final int initialMessagePacketSize,
final String groupID,
final int onMessageCloseTimeout,
final SessionContext sessionContext,
final Executor executor,
final Executor confirmationExecutor,
Expand Down Expand Up @@ -246,6 +249,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

this.groupID = groupID;

this.onMessageCloseTimeout = onMessageCloseTimeout;

producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);

this.sessionContext = sessionContext;
Expand Down Expand Up @@ -2012,7 +2017,7 @@ private ClientConsumer internalCreateConsumer(final SimpleString queueName,
final boolean browseOnly) throws ActiveMQException {
checkClosed();

ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, onMessageCloseTimeout);

addConsumer(consumer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,18 @@ public ServerLocatorImpl setInitialMessagePacketSize(final int size) {
return this;
}

@Override
public int getOnMessageCloseTimeout() {
return config.onMessageCloseTimeout;
}

@Override
public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) {
checkWrite();
config.onMessageCloseTimeout = onMessageCloseTimeout;
return this;
}

@Override
public ServerLocatorImpl setGroupID(final String groupID) {
checkWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();

ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
Expand All @@ -420,7 +421,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;

return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public abstract ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException;
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException;

/**
* Performs a round trip to the server requesting what is the current tx timeout on the session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();

ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
Expand All @@ -96,7 +97,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
// could be overridden on the queue settings
// The value we send is just a hint

return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.client;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
Expand All @@ -38,6 +34,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MessageHandlerTest extends ActiveMQTestBase {

private ActiveMQServer server;
Expand Down Expand Up @@ -117,6 +118,42 @@ public void onMessage(final ClientMessage message) {
session.close();
}

@Test
public void testMessageHandlerCloseTimeout() throws Exception {
// create Netty acceptor so client can use new onMessageCloseTimeout URL parameter
server.getRemotingService().createAcceptor("netty", "tcp://127.0.0.1:61616").start();
final int timeout = 1000;
locator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout=" + timeout);
sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true));
session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false));
ClientProducer producer = session.createProducer(QUEUE);
producer.send(createTextMessage(session, "m"));

ClientConsumer consumer = session.createConsumer(QUEUE, null, false);

session.start();

CountDownLatch latch = new CountDownLatch(1);
consumer.setMessageHandler(message -> {
latch.countDown();
// don't just Thread.sleep() here because it will be interrupted on ClientConsumer.close()
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
});
latch.await();
long start = System.currentTimeMillis();
consumer.close();
long end = System.currentTimeMillis();
assertTrue( (end - start >= timeout) && (end - start <= 2000), "Closing consumer took " + (end - start) + "ms");
}

@Test
public void testSetResetMessageHandler() throws Exception {
final ClientSession session = sf.createSession(false, true, true);
Expand Down

0 comments on commit 2b6fbd1

Please sign in to comment.