Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5093 support configurable onMessage timeout w/closing consumer #5291

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ServerLocatorConfig {
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public int onMessageCloseTimeout = ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public final class ActiveMQClient {

public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;

public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000;

public static final boolean DEFAULT_XA = false;

public static final boolean DEFAULT_HA = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,25 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
ServerLocator setInitialMessagePacketSize(int size);

/**
* Returns the timeout for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* Value is in milliseconds, default value is {@link ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}.
*
* @return the timeout for onMessage completion when closing ClientConsumers created through this factory
*/
int getOnMessageCloseTimeout();

/**
* Sets the timeout for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* Value must be greater than 0.
*
* @param onMessageCloseTimeout how long to wait for the ClientConsumer's MessageHandler's onMessage method to finish before closing or stopping the ClientConsumer.
* @return this ServerLocator
*/
ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout);

/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 212001, value = "Error on clearing messages", level = LogMessage.Level.WARN)
void errorClearingMessages(Throwable e);

@LogMessage(id = 212002, value = "Timed out waiting for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing();
@LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing(long duration);

@LogMessage(id = 212003, value = "Unable to close session", level = LogMessage.Level.WARN)
void unableToCloseSession(Exception e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;

private final int onMessageCloseTimeout;

public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
final SimpleString queueName,
Expand All @@ -150,7 +152,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,
final Executor flowControlExecutor,
final SessionContext sessionContext,
final ClientSession.QueueQuery queueInfo,
final ClassLoader contextClassLoader) {
final ClassLoader contextClassLoader,
final int onMessageCloseTimeout) {
this.consumerContext = consumerContext;

this.queueName = queueName;
Expand Down Expand Up @@ -181,6 +184,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,

this.flowControlExecutor = flowControlExecutor;

this.onMessageCloseTimeout = onMessageCloseTimeout;

if (logger.isTraceEnabled()) {
logger.trace("{}:: being created at", this, new Exception("trace"));
}
Expand Down Expand Up @@ -911,10 +916,10 @@ private void waitForOnMessageToComplete(boolean waitForOnMessage) {

sessionExecutor.execute(future);

boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
boolean ok = future.await(onMessageCloseTimeout);
Comment on lines -914 to +919
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the const CLOSE_TIMEOUT_MILLISECONDS still required?


if (!ok) {
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ private ClientSession createSessionInternal(final String rawUsername,

SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);

ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), serverLocator.getOnMessageCloseTimeout(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());

synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

private final String groupID;

private volatile int onMessageCloseTimeout;

private volatile boolean inClose;

private volatile boolean mayAttemptToFailover = true;
Expand Down Expand Up @@ -189,6 +191,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final int compressionLevel,
final int initialMessagePacketSize,
final String groupID,
final int onMessageCloseTimeout,
final SessionContext sessionContext,
final Executor executor,
final Executor confirmationExecutor,
Expand Down Expand Up @@ -246,6 +249,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

this.groupID = groupID;

this.onMessageCloseTimeout = onMessageCloseTimeout;

producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);

this.sessionContext = sessionContext;
Expand Down Expand Up @@ -2012,7 +2017,7 @@ private ClientConsumer internalCreateConsumer(final SimpleString queueName,
final boolean browseOnly) throws ActiveMQException {
checkClosed();

ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, onMessageCloseTimeout);

addConsumer(consumer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,18 @@ public ServerLocatorImpl setInitialMessagePacketSize(final int size) {
return this;
}

@Override
public int getOnMessageCloseTimeout() {
return config.onMessageCloseTimeout;
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API docs specify the value set must be greater than zero, but there isn't any validation of that, should there be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...That comment was left-over from a copy & paste. I went looking through the existing code to see how validation is done for other parameters, and I'm not actually seeing any. I'm continuing my investigation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look into what it does with that configuration value and since it is passing into a CountdownLatch is seems like it will not trigger an exception but will return immediately so from the standpoint of being able to be set to zero or negative it works, although seems an absurd value to assign.

public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) {
checkWrite();
config.onMessageCloseTimeout = onMessageCloseTimeout;
return this;
}

@Override
public ServerLocatorImpl setGroupID(final String groupID) {
checkWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();

ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
Expand All @@ -420,7 +421,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;

return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public abstract ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException;
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException;

/**
* Performs a round trip to the server requesting what is the current tx timeout on the session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();

ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
Expand All @@ -96,7 +97,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
// could be overridden on the queue settings
// The value we send is just a hint

return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.client;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
Expand All @@ -38,6 +34,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MessageHandlerTest extends ActiveMQTestBase {

private ActiveMQServer server;
Expand Down Expand Up @@ -117,6 +118,42 @@ public void onMessage(final ClientMessage message) {
session.close();
}

@Test
public void testMessageHandlerCloseTimeout() throws Exception {
// create Netty acceptor so client can use new onMessageCloseTimeout URL parameter
server.getRemotingService().createAcceptor("netty", "tcp://127.0.0.1:61616").start();
final int timeout = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the test work with a lower timeout to reduce the test duration?

locator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout=" + timeout);
sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true));
session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false));
ClientProducer producer = session.createProducer(QUEUE);
producer.send(createTextMessage(session, "m"));

ClientConsumer consumer = session.createConsumer(QUEUE, null, false);

session.start();

CountDownLatch latch = new CountDownLatch(1);
consumer.setMessageHandler(message -> {
latch.countDown();
// don't just Thread.sleep() here because it will be interrupted on ClientConsumer.close()
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
});
latch.await();
long start = System.currentTimeMillis();
consumer.close();
long end = System.currentTimeMillis();
assertTrue( (end - start >= timeout) && (end - start <= 2000), "Closing consumer took " + (end - start) + "ms");
}

@Test
public void testSetResetMessageHandler() throws Exception {
final ClientSession session = sf.createSession(false, true, true);
Expand Down