Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main for server v2.11 #1239

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
62a73b3
Direct Batch EOD and loop changes
scottf Oct 10, 2024
c83faff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
aaecbff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
1ee791b
Consumer Priority Group Overflow (#1233)
scottf Oct 10, 2024
ad60f23
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
cfa5cbc
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
3dbc5fb
Merge branch 'main' into main-2-11
scottf Oct 11, 2024
d4667b4
Merge branch 'main' into main-2-11
scottf Oct 15, 2024
2b21962
Merge branch 'main' into main-2-11
scottf Oct 18, 2024
93510f2
Refactoring based on testing
scottf Oct 18, 2024
6b8d3df
Removed debug
scottf Oct 21, 2024
ad6694b
Removed debug
scottf Oct 21, 2024
f32791d
Support setting [min] sequence when needed
scottf Oct 21, 2024
6b82bca
In progress waiting for server PR's
scottf Oct 28, 2024
aa97394
Merge branch 'main' into main-2-11
scottf Oct 31, 2024
9cf1e16
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
788c160
Server main branch has all 2.11 features
scottf Nov 1, 2024
7a9e0a5
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
e0df8c0
Fixing removal
scottf Nov 1, 2024
ed87573
MessageBatchGetRequest reform based on adr
scottf Nov 5, 2024
b7379bb
more testing
scottf Nov 5, 2024
e242724
multi last for with batch
scottf Nov 7, 2024
3bfa65c
Merge branch 'refs/heads/main' into main-2-11
scottf Nov 14, 2024
6e9c067
Merge branch 'main' into main-2-11
scottf Nov 14, 2024
c97c535
Added MessageInfo getMessage(String streamName, MessageGetRequest mes…
scottf Nov 15, 2024
151b309
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
050f106
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
9c13f6a
Merge branch 'main' into main-2-11
scottf Nov 22, 2024
6a405ba
Update Doc
scottf Nov 24, 2024
0318b12
tuned request message batch
scottf Nov 25, 2024
592f596
Merge branch 'main' into main-2-11
scottf Nov 26, 2024
96271a1
tuned request message batch
scottf Nov 26, 2024
0aeb173
Merge branch 'main' into main-2-11
scottf Dec 3, 2024
e37d659
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
e34817a
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
800a684
Merge branch 'main' into main-2-11
scottf Dec 18, 2024
c1d1b58
Merge branch 'main' into main-2-11
scottf Jan 6, 2025
6884f7e
Merge branch 'main' into main-2-11
scottf Jan 9, 2025
5108a14
Merge branch 'main' into main-2-11
scottf Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readLong;
import static io.nats.client.support.JsonValueUtils.*;

/**
* Base Consume Options are provided to customize the way the consume and
Expand All @@ -44,9 +45,11 @@ public class BaseConsumeOptions implements JsonSerializable {
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;
protected final String group;
protected final long minPending;
protected final long minAckPending;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected BaseConsumeOptions(Builder b) {
protected BaseConsumeOptions(Builder<?, ?> b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
Expand All @@ -55,6 +58,10 @@ protected BaseConsumeOptions(Builder b) {
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}

this.group = b.group;
this.minPending = b.minPending;
this.minAckPending = b.minAckPending;

// validation handled in builder
thresholdPercent = b.thresholdPercent;
noWait = b.noWait;
Expand Down Expand Up @@ -82,6 +89,9 @@ public String toJson() {
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
addFldWhenTrue(sb, NO_WAIT, noWait);
addField(sb, GROUP, group);
addField(sb, MIN_PENDING, minPending);
addField(sb, MIN_ACK_PENDING, minAckPending);
return endJson(sb).toString();
}

Expand All @@ -101,12 +111,27 @@ public boolean isNoWait() {
return noWait;
}

public String getGroup() {
return group;
}

public long getMinPending() {
return minPending;
}

public long getMinAckPending() {
return minAckPending;
}

protected static abstract class Builder<B, CO> {
protected int messages = -1;
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;
protected String group;
protected long minPending = -1;
protected long minAckPending = -1;

protected abstract B getThis();

Expand Down Expand Up @@ -137,6 +162,9 @@ public B jsonValue(JsonValue jsonValue) {
if (readBoolean(jsonValue, NO_WAIT, false)) {
noWait();
}
group(readStringEmptyAsNull(jsonValue, GROUP));
minPending(readLong(jsonValue, MIN_PENDING, -1));
minAckPending(readLong(jsonValue, MIN_ACK_PENDING, -1));
return getThis();
}

Expand Down Expand Up @@ -190,6 +218,36 @@ public B thresholdPercent(int thresholdPercent) {
return getThis();
}

/**
* Sets the group
* @param group the priority group for this pull
* @return Builder
*/
public B group(String group) {
this.group = group;
return getThis();
}

/**
* When specified, the consumer will only receive messages when the consumer has at least this many pending messages.
* @param minPending the min pending
* @return the builder
*/
public B minPending(long minPending) {
this.minPending = minPending < 1 ? -1 : minPending;
return getThis();
}

/**
* When specified, the consumer will only receive messages when the consumer has at least this many ack pending messages.
* @param minAckPending the min ack pending
* @return the builder
*/
public B minAckPending(long minAckPending) {
this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
return getThis();
}

/**
* Build the options.
* @return the built options
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* JetStream Management context for creation and access to streams and consumers in NATS.
Expand Down Expand Up @@ -260,6 +261,17 @@ public interface JetStreamManagement {
*/
MessageInfo getMessage(String streamName, long seq) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message matching the {@link MessageGetRequest}.
* @param streamName the name of the stream.
* @param messageGetRequest the {@link MessageGetRequest} to get a message
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getMessage(String streamName, MessageGetRequest messageGetRequest) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the last message of the subject.
* @param streamName the name of the stream.
Expand All @@ -282,6 +294,33 @@ public interface JetStreamManagement {
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time.
* <p>
* This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time matching the subject.
* <p>
* This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand All @@ -295,6 +334,46 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a list containing {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a queue used to asynchronously receive {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @param handler the handler used for receiving {@link MessageInfo}
* @return true if all messages were received and properly terminated with a server EOB
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
boolean requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;

/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/MessageInfoHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public interface MessageInfoHandler {
* Called to deliver a {@link MessageInfo} to the handler.
*
* @param messageInfo the received {@link MessageInfo}
* @throws InterruptedException if the thread for this handler is interrupted
*/
void onMessageInfo(MessageInfo messageInfo) throws InterruptedException;
void onMessageInfo(MessageInfo messageInfo);
}
56 changes: 56 additions & 0 deletions src/main/java/io/nats/client/PullRequestOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ public class PullRequestOptions implements JsonSerializable {
private final boolean noWait;
private final Duration expiresIn;
private final Duration idleHeartbeat;
private final String group;
private final long minPending;
private final long minAckPending;

public PullRequestOptions(Builder b) {
this.batchSize = b.batchSize;
this.maxBytes = b.maxBytes;
this.noWait = b.noWait;
this.expiresIn = b.expiresIn;
this.idleHeartbeat = b.idleHeartbeat;
this.group = b.group;
this.minPending = b.minPending < 0 ? -1 : b.minPending;
this.minAckPending = b.minAckPending < 0 ? -1 : b.minAckPending;
}

@Override
Expand All @@ -48,6 +54,10 @@ public String toJson() {
JsonUtils.addFldWhenTrue(sb, NO_WAIT, noWait);
JsonUtils.addFieldAsNanos(sb, EXPIRES, expiresIn);
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);

JsonUtils.addField(sb, GROUP, group);
JsonUtils.addField(sb, MIN_PENDING, minPending);
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
return JsonUtils.endJson(sb).toString();
}

Expand Down Expand Up @@ -91,6 +101,18 @@ public Duration getIdleHeartbeat() {
return idleHeartbeat;
}

public String getGroup() {
return group;
}

public long getMinPending() {
return minPending;
}

public long getMinAckPending() {
return minAckPending;
}

/**
* Creates a builder for the pull options, with batch size since it's always required
* @param batchSize the size of the batch. Must be greater than 0
Expand All @@ -115,6 +137,9 @@ public static class Builder {
private boolean noWait;
private Duration expiresIn;
private Duration idleHeartbeat;
private String group;
private long minPending = -1;
private long minAckPending = -1;

/**
* Set the batch size for the pull
Expand Down Expand Up @@ -195,6 +220,37 @@ public Builder idleHeartbeat(Duration idleHeartbeat) {
return this;
}

/**
* Sets the group
* Replaces any other groups set in the builder
* @param group the priority group for this pull
* @return Builder
*/
public Builder group(String group) {
this.group = group;
return this;
}

/**
* When specified, the pull request will only receive messages when the consumer has at least this many pending messages.
* @param minPending the min pending
* @return the builder
*/
public Builder minPending(long minPending) {
this.minPending = minPending < 1 ? -1 : minPending;
return this;
}

/**
* When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
* @param minAckPending the min ack pending
* @return the builder
*/
public Builder minAckPending(long minAckPending) {
this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
return this;
}

/**
* Build the PullRequestOptions.
* <p>Validates that the batch size is greater than 0</p>
Expand Down
Loading
Loading