Skip to content

Commit

Permalink
Simplification state, better thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Aug 14, 2023
1 parent 4882a62 commit a7358de
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
7 changes: 3 additions & 4 deletions src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di
private void checkState() throws IOException {
if (lastConsumer != null) {
if (ordered) {
if (!lastConsumer.finished) {
if (!lastConsumer.finished.get()) {
throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time.");
}
}
if (lastConsumer.finished && !lastConsumer.stopped) {
if (lastConsumer.finished.get() && !lastConsumer.stopped.get()) {
lastConsumer.lenientClose(); // finished, might as well make sure the sub is closed.
}
}
Expand Down Expand Up @@ -182,7 +182,6 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds.");
}

//noinspection resource I close it manually down below
con = new NatsMessageConsumerBase(cachedConsumerInfo);
con.initSub(subscribe(null, null));
con.sub._pull(PullRequestOptions.builder(1)
Expand All @@ -197,7 +196,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
}
finally {
try {
con.finished = true;
con.finished.set(true);
con.close();
}
catch (Exception e) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
@Override
public Message nextMessage() throws InterruptedException, JetStreamStatusCheckedException {
try {
if (finished) {
if (finished.get()) {
return null;
}

Expand All @@ -55,7 +55,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
if (m == null) {
// if there are no messages in the internal cache AND there are no more pending,
// they all have been read and we can go ahead and close the subscription.
finished = true;
finished.set(true);
lenientClose();
}
return m;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsIterableConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsum
public Message nextMessage(Duration timeout) throws InterruptedException, JetStreamStatusCheckedException {
try {
Message msg = sub.nextMessage(timeout);
if (msg != null && stopped && pmm.noMorePending()) {
finished = true;
if (msg != null && stopped.get() && pmm.noMorePending()) {
finished.set(true);
}
return msg;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager

MessageHandler mh = userMessageHandler == null ? null : msg -> {
userMessageHandler.onMessage(msg);
if (stopped && pmm.noMorePending()) {
finished = true;
if (stopped.get() && pmm.noMorePending()) {
finished.set(true);
}
};
initSub(subscriptionMaker.subscribe(mh, userDispatcher));
Expand All @@ -62,7 +62,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager

@Override
public void pendingUpdated() {
if (!stopped && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)))
if (!stopped.get() && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)))
{
sub._pull(rePullPro, false, this);
}
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
import io.nats.client.api.ConsumerInfo;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

class NatsMessageConsumerBase implements MessageConsumer {
protected NatsJetStreamPullSubscription sub;
protected PullMessageManager pmm;
protected boolean stopped;
protected boolean finished;
protected final AtomicBoolean stopped;
protected final AtomicBoolean finished;
protected ConsumerInfo cachedConsumerInfo;

NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo) {
this.cachedConsumerInfo = cachedConsumerInfo;
this.stopped = new AtomicBoolean(false);
this.finished = new AtomicBoolean(false);
}

void initSub(NatsJetStreamPullSubscription sub) {
Expand All @@ -39,14 +42,14 @@ void initSub(NatsJetStreamPullSubscription sub) {
* {@inheritDoc}
*/
public boolean isStopped() {
return stopped;
return stopped.get();
}

/**
* {@inheritDoc}
*/
public boolean isFinished() {
return finished;
return finished.get();
}

/**
Expand Down Expand Up @@ -74,7 +77,7 @@ public ConsumerInfo getCachedConsumerInfo() {
*/
@Override
public void stop() {
stopped = true;
stopped.set(true);
}

@Override
Expand All @@ -84,8 +87,8 @@ public void close() throws Exception {

protected void lenientClose() {
try {
if (!stopped || sub.isActive()) {
stopped = true;
if (!stopped.get() || sub.isActive()) {
stopped.set(true);
if (sub.getNatsDispatcher() != null) {
sub.getDispatcher().unsubscribe(sub);
}
Expand Down

0 comments on commit a7358de

Please sign in to comment.