diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java index dc3180f7a9b..cd57f984490 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java @@ -47,6 +47,7 @@ public class ServerLocatorConfig { public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS; public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS; public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; + public int onMessageCloseTimeout = ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT; public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL; public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index 94feade8e7d..e5b5229f1a8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -138,6 +138,8 @@ public final class ActiveMQClient { public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500; + public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000; + public static final boolean DEFAULT_XA = false; public static final boolean DEFAULT_HA = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index f94bf22577e..5da6fc7929a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -744,6 +744,25 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig */ ServerLocator setInitialMessagePacketSize(int size); + /** + * Returns the timeout for onMessage completion when closing ClientConsumers created through this factory. + *

+ * Value is in milliseconds, default value is {@link ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}. + * + * @return the timeout for onMessage completion when closing ClientConsumers created through this factory + */ + int getOnMessageCloseTimeout(); + + /** + * Sets the timeout for onMessage completion when closing ClientConsumers created through this factory. + *

+ * Value must be greater than 0. + * + * @param onMessageCloseTimeout how long to wait for the ClientConsumer's MessageHandler's onMessage method to finish before closing or stopping the ClientConsumer. + * @return this ServerLocator + */ + ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout); + /** * Adds an interceptor which will be executed after packets are received from the server. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 027543cdf78..e617e552e76 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -36,8 +36,8 @@ public interface ActiveMQClientLogger { @LogMessage(id = 212001, value = "Error on clearing messages", level = LogMessage.Level.WARN) void errorClearingMessages(Throwable e); - @LogMessage(id = 212002, value = "Timed out waiting for handler to complete processing", level = LogMessage.Level.WARN) - void timeOutWaitingForProcessing(); + @LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler to complete processing", level = LogMessage.Level.WARN) + void timeOutWaitingForProcessing(long duration); @LogMessage(id = 212003, value = "Unable to close session", level = LogMessage.Level.WARN) void unableToCloseSession(Exception e); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index de5a183f631..52bae79d0ce 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -136,6 +136,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private final ClassLoader contextClassLoader; private volatile boolean manualFlowManagement; + private final int onMessageCloseTimeout; + public ClientConsumerImpl(final ClientSessionInternal session, final ConsumerContext consumerContext, final SimpleString queueName, @@ -150,7 +152,8 @@ public ClientConsumerImpl(final ClientSessionInternal session, final Executor flowControlExecutor, final SessionContext sessionContext, final ClientSession.QueueQuery queueInfo, - final ClassLoader contextClassLoader) { + final ClassLoader contextClassLoader, + final int onMessageCloseTimeout) { this.consumerContext = consumerContext; this.queueName = queueName; @@ -181,6 +184,8 @@ public ClientConsumerImpl(final ClientSessionInternal session, this.flowControlExecutor = flowControlExecutor; + this.onMessageCloseTimeout = onMessageCloseTimeout; + if (logger.isTraceEnabled()) { logger.trace("{}:: being created at", this, new Exception("trace")); } @@ -911,10 +916,10 @@ private void waitForOnMessageToComplete(boolean waitForOnMessage) { sessionExecutor.execute(future); - boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS); + boolean ok = future.await(onMessageCloseTimeout); if (!ok) { - ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(); + ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 7298074a1de..1d45bc10809 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -841,7 +841,7 @@ private ClientSession createSessionInternal(final String rawUsername, SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor()); + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), serverLocator.getOnMessageCloseTimeout(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor()); synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 537fafd0af8..f140001f1c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -144,6 +144,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final String groupID; + private volatile int onMessageCloseTimeout; + private volatile boolean inClose; private volatile boolean mayAttemptToFailover = true; @@ -189,6 +191,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final int compressionLevel, final int initialMessagePacketSize, final String groupID, + final int onMessageCloseTimeout, final SessionContext sessionContext, final Executor executor, final Executor confirmationExecutor, @@ -246,6 +249,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi this.groupID = groupID; + this.onMessageCloseTimeout = onMessageCloseTimeout; + producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize); this.sessionContext = sessionContext; @@ -2012,7 +2017,7 @@ private ClientConsumer internalCreateConsumer(final SimpleString queueName, final boolean browseOnly) throws ActiveMQException { checkClosed(); - ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor); + ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, onMessageCloseTimeout); addConsumer(consumer); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index e235821bc2d..7e8418ca654 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1298,6 +1298,18 @@ public ServerLocatorImpl setInitialMessagePacketSize(final int size) { return this; } + @Override + public int getOnMessageCloseTimeout() { + return config.onMessageCloseTimeout; + } + + @Override + public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) { + checkWrite(); + config.onMessageCloseTimeout = onMessageCloseTimeout; + return this; + } + @Override public ServerLocatorImpl setGroupID(final String groupID) { checkWrite(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 4170c2e7a9c..0ea23600cbf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -400,7 +400,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException { + Executor flowControlExecutor, + int onMessageCloseTimeout) throws ActiveMQException { long consumerID = idGenerator.generateID(); ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); @@ -420,7 +421,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, // The value we send is just a hint final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize; - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 7c33fff2901..550ac75075a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -340,7 +340,8 @@ public abstract ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException; + Executor flowControlExecutor, + int onMessageCloseTimeout) throws ActiveMQException; /** * Performs a round trip to the server requesting what is the current tx timeout on the session diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index a0389be865f..eafa4345c2b 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -83,7 +83,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException { + Executor flowControlExecutor, + int onMessageCloseTimeout) throws ActiveMQException { long consumerID = idGenerator.generateID(); ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); @@ -96,7 +97,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java index 614a6abb913..570fc9d7012 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java @@ -16,16 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -38,6 +34,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class MessageHandlerTest extends ActiveMQTestBase { private ActiveMQServer server; @@ -117,6 +118,42 @@ public void onMessage(final ClientMessage message) { session.close(); } + @Test + public void testMessageHandlerCloseTimeout() throws Exception { + // create Netty acceptor so client can use new onMessageCloseTimeout URL parameter + server.getRemotingService().createAcceptor("netty", "tcp://127.0.0.1:61616").start(); + final int timeout = 1000; + locator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout=" + timeout); + sf = createSessionFactory(locator); + ClientSession session = addClientSession(sf.createSession(false, true, true)); + session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false)); + ClientProducer producer = session.createProducer(QUEUE); + producer.send(createTextMessage(session, "m")); + + ClientConsumer consumer = session.createConsumer(QUEUE, null, false); + + session.start(); + + CountDownLatch latch = new CountDownLatch(1); + consumer.setMessageHandler(message -> { + latch.countDown(); + // don't just Thread.sleep() here because it will be interrupted on ClientConsumer.close() + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 2000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + }); + latch.await(); + long start = System.currentTimeMillis(); + consumer.close(); + long end = System.currentTimeMillis(); + assertTrue( (end - start >= timeout) && (end - start <= 2000), "Closing consumer took " + (end - start) + "ms"); + } + @Test public void testSetResetMessageHandler() throws Exception { final ClientSession session = sf.createSession(false, true, true);