Skip to content

Commit

Permalink
Don't flush after the request from publishAsync calls (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 31, 2024
1 parent dabca74 commit 1ceea55
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 30 deletions.
36 changes: 33 additions & 3 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public class Options {
// NOTE TO DEVS!!! To add an option, you have to address:
// ----------------------------------------------------------------------------------------------------
// CONSTANTS * optionally add a default value constant
// ENVIRONMENT PROPERTIES * most of the time add an environment property, should always be in the form PFX +
// ENVIRONMENT PROPERTIES * always add an environment property. Constant always starts with PFX, but code accepts without
// PROTOCOL CONNECT OPTION CONSTANTS * not related to options, but here because Options code uses them
// CLASS VARIABLES * add a variable to the class
// BUILDER VARIABLES * add a variable in builder
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
// BUILD CONSTRUCTOR PROPS * update build props constructor to read new props
// BUILDER METHODS * add a chainable method in builder for new variable
// BUILD IMPL * update build() implementation if needed
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
// CONSTRUCTOR * update constructor to ensure new variables are set from builder
// GETTERS * update getter to be able to retrieve class variable value
// HELPER FUNCTIONS * just helpers
// ----------------------------------------------------------------------------------------------------
// README - if you add a property or change it's comment, add it to or update the readme
// README - if you add a property or change its comment, add it to or update the readme
// ----------------------------------------------------------------------------------------------------

// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -492,6 +492,10 @@ public class Options {
* {@link Builder#useDispatcherWithExecutor()}.
*/
public static final String PROP_USE_DISPATCHER_WITH_EXECUTOR = PFX + "use.dispatcher.with.executor";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#forceFlushOnRequest() forceFlushOnRequest}.
*/
public static final String PROP_FORCE_FLUSH_ON_REQUEST = PFX + "force.flush.on.request";

// ----------------------------------------------------------------------------------------------------
// PROTOCOL CONNECT OPTION CONSTANTS
Expand Down Expand Up @@ -625,6 +629,7 @@ public class Options {
private final boolean tlsFirst;
private final boolean useTimeoutException;
private final boolean useDispatcherWithExecutor;
private final boolean forceFlushOnRequest;

private final AuthHandler authHandler;
private final ReconnectDelayHandler reconnectDelayHandler;
Expand Down Expand Up @@ -741,6 +746,7 @@ public static class Builder {
private boolean tlsFirst = false;
private boolean useTimeoutException = false;
private boolean useDispatcherWithExecutor = false;
private boolean forceFlushOnRequest = true; // true since it's the original b/w compatible way
private ServerPool serverPool = null;
private DispatcherFactory dispatcherFactory = null;

Expand Down Expand Up @@ -876,6 +882,7 @@ public Builder properties(Properties props) {
booleanProperty(props, PROP_TLS_FIRST, b -> this.tlsFirst = b);
booleanProperty(props, PROP_USE_TIMEOUT_EXCEPTION, b -> this.useTimeoutException = b);
booleanProperty(props, PROP_USE_DISPATCHER_WITH_EXECUTOR, b -> this.useDispatcherWithExecutor = b);
booleanProperty(props, PROP_FORCE_FLUSH_ON_REQUEST, b -> this.forceFlushOnRequest = b);

classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
Expand Down Expand Up @@ -1658,6 +1665,15 @@ public Builder useDispatcherWithExecutor() {
return this;
}

/**
* Instruct requests to turn off flush on requests.
* @return the Builder for chaining
*/
public Builder dontForceFlushOnRequest() {
this.forceFlushOnRequest = false;
return this;
}

/**
* Set the ServerPool implementation for connections to use instead of the default implementation
* @param serverPool the implementation
Expand Down Expand Up @@ -1905,6 +1921,7 @@ public Builder(Options o) {
this.tlsFirst = o.tlsFirst;
this.useTimeoutException = o.useTimeoutException;
this.useDispatcherWithExecutor = o.useDispatcherWithExecutor;
this.forceFlushOnRequest = o.forceFlushOnRequest;

this.serverPool = o.serverPool;
this.dispatcherFactory = o.dispatcherFactory;
Expand Down Expand Up @@ -1969,6 +1986,7 @@ private Options(Builder b) {
this.tlsFirst = b.tlsFirst;
this.useTimeoutException = b.useTimeoutException;
this.useDispatcherWithExecutor = b.useDispatcherWithExecutor;
this.forceFlushOnRequest = b.forceFlushOnRequest;

this.serverPool = b.serverPool;
this.dispatcherFactory = b.dispatcherFactory;
Expand Down Expand Up @@ -2405,8 +2423,20 @@ public boolean useTimeoutException() {
return useTimeoutException;
}

/**
* Whether the dispatcher should use an executor to async messages to handlers
* @return the flag
*/
public boolean useDispatcherWithExecutor() { return useDispatcherWithExecutor; }

/**
* Whether to flush on any user request
* @return the flag
*/
public boolean forceFlushOnRequest() {
return forceFlushOnRequest;
}

/**
* Get the ServerPool implementation. If null, a default implementation is used.
* @return the ServerPool implementation
Expand Down
30 changes: 17 additions & 13 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class NatsConnection implements Connection {
public static final double NANOS_PER_SECOND = 1_000_000_000.0;

private final Options options;
final boolean forceFlushOnRequest;

private final StatisticsCollector statistics;

Expand Down Expand Up @@ -112,6 +113,7 @@ class NatsConnection implements Connection {
timeTraceLogger.trace("creating connection object");

this.options = options;
forceFlushOnRequest = options.forceFlushOnRequest();

advancedTracking = options.isTrackAdvancedStats();
this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
Expand Down Expand Up @@ -1205,15 +1207,15 @@ else if (future.isDone()) {
*/
@Override
public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
return requestInternal(subject, null, body, timeout, cancelAction, true);
return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
}

/**
* {@inheritDoc}
*/
@Override
public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
return requestInternal(subject, headers, body, timeout, cancelAction, true);
return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
}

/**
Expand All @@ -1222,11 +1224,12 @@ public Message request(String subject, Headers headers, byte[] body, Duration ti
@Override
public Message request(Message message, Duration timeout) throws InterruptedException {
validateNotNull(message, "Message");
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
}

Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws InterruptedException {
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo);
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout,
CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
try {
return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
} catch (TimeoutException | ExecutionException | CancellationException e) {
Expand All @@ -1239,31 +1242,31 @@ Message requestInternal(String subject, Headers headers, byte[] data, Duration t
*/
@Override
public CompletableFuture<Message> request(String subject, byte[] body) {
return requestFutureInternal(subject, null, body, null, cancelAction, true);
return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
return requestFutureInternal(subject, headers, body, null, cancelAction, true);
return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
return requestFutureInternal(subject, null, body, timeout, cancelAction, true);
return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
return requestFutureInternal(subject, headers, body, timeout, cancelAction, true);
return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
}

/**
Expand All @@ -1272,7 +1275,7 @@ public CompletableFuture<Message> requestWithTimeout(String subject, Headers hea
@Override
public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
validateNotNull(message, "Message");
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
}

/**
Expand All @@ -1281,10 +1284,11 @@ public CompletableFuture<Message> requestWithTimeout(Message message, Duration t
@Override
public CompletableFuture<Message> request(Message message) {
validateNotNull(message, "Message");
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false);
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
}

CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) {
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout,
CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
checkPayloadSize(data);

if (isClosed()) {
Expand Down Expand Up @@ -1336,7 +1340,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
responsesAwaiting.put(sub.getSID(), future);
}

publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, true);
publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
statistics.incrementRequestsSent();

return future;
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.nats.client.support.Validator;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -97,31 +96,31 @@ public PublishAck publish(Message message, PublishOptions options) throws IOExce
*/
@Override
public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body) {
return publishAsyncInternal(subject, null, body, null, null, true);
return publishAsyncInternal(subject, null, body, null, true);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<PublishAck> publishAsync(String subject, Headers headers, byte[] body) {
return publishAsyncInternal(subject, headers, body, null, null, true);
return publishAsyncInternal(subject, headers, body, null, true);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body, PublishOptions options) {
return publishAsyncInternal(subject, null, body, options, null, true);
return publishAsyncInternal(subject, null, body, options, true);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<PublishAck> publishAsync(String subject, Headers headers, byte[] body, PublishOptions options) {
return publishAsyncInternal(subject, headers, body, options, null, true);
return publishAsyncInternal(subject, headers, body, options, true);
}

/**
Expand All @@ -130,7 +129,7 @@ public CompletableFuture<PublishAck> publishAsync(String subject, Headers header
@Override
public CompletableFuture<PublishAck> publishAsync(Message message) {
validateNotNull(message, "Message");
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, null, false);
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, false);
}

/**
Expand All @@ -139,7 +138,7 @@ public CompletableFuture<PublishAck> publishAsync(Message message) {
@Override
public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions options) {
validateNotNull(message, "Message");
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, null, false);
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, false);
}

private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) throws IOException, JetStreamApiException {
Expand All @@ -150,19 +149,19 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d
return null;
}

Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo);
Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo, conn.forceFlushOnRequest);
return processPublishResponse(resp, options);
}

private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout, boolean validateSubjectAndReplyTo) {
private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) {
Headers merged = mergePublishOptions(headers, options);

if (jso.isPublishNoAck()) {
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false);
return null;
}

CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, null, CancelAction.COMPLETE, validateSubjectAndReplyTo, conn.forceFlushOnRequest);

return future.thenCompose(resp -> {
try {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CachedStreamInfo(StreamInfo si) {
// ----------------------------------------------------------------------------------------------------
// Create / Init
// ----------------------------------------------------------------------------------------------------
NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) {
conn = connection;

// Get a working version of JetStream Options...
Expand Down Expand Up @@ -246,9 +246,9 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo
}
}

Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws IOException {
Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws IOException {
try {
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo));
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatu
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
String pullSubject = getSubject().replace("*", Long.toString(this.pullSubjectIdHolder.incrementAndGet()));
manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, pullManagerObserver);
connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, true);
connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, connection.forceFlushOnRequest);
return pullSubject;
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/nats/client/OptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ public void testPropertiesCoverageOptions() throws Exception {
props.setProperty(Options.PROP_CLIENT_SIDE_LIMIT_CHECKS, "true"); // deprecated
props.setProperty(Options.PROP_IGNORE_DISCOVERED_SERVERS, "true");
props.setProperty(Options.PROP_NO_RESOLVE_HOSTNAMES, "true");
props.setProperty(PROP_FORCE_FLUSH_ON_REQUEST, "false");

Options o = new Options.Builder(props).build();
_testPropertiesCoverageOptions(o);
Expand All @@ -553,6 +554,7 @@ private static void _testPropertiesCoverageOptions(Options o) {
assertTrue(o.clientSideLimitChecks());
assertTrue(o.isIgnoreDiscoveredServers());
assertTrue(o.isNoResolveHostnames());
assertFalse(o.forceFlushOnRequest());
}

@Test
Expand Down

0 comments on commit 1ceea55

Please sign in to comment.