Skip to content

Commit

Permalink
aroc naming
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Aug 20, 2024
1 parent 9218640 commit f1a5db4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ NatsMessage pop(Duration timeout) throws InterruptedException {
// Only works in single reader mode, because we want to maintain order.
// accumulate reads off the concurrent queue one at a time, so if multiple
// readers are present, you could get out of order message delivery.
NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration timeout)
NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
throws InterruptedException {

if (!this.singleReaderMode) {
Expand All @@ -278,7 +278,7 @@ NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration time

long size = msg.getSizeInBytes();

if (maxMessagesToAccumulate <= 1 || size >= maxSize) {
if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
this.sizeInBytes.addAndGet(-size);
this.length.decrementAndGet();
return msg;
Expand All @@ -291,7 +291,7 @@ NatsMessage accumulate(long maxSize, long maxMessagesToAccumulate, Duration time
NatsMessage next = this.queue.peek();
if (next != null && !isPoison(next)) {
long s = next.getSizeInBytes();
if (maxSize < 0 || (size + s) < maxSize) { // keep going
if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
size += s;
count++;

Expand Down
9 changes: 4 additions & 5 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,20 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st

@Override
public void run() {
Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectWait = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through
Duration outgoingTimeout = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectTimeout = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through

try {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
StatisticsCollector stats = this.connection.getNatsStatistics();
int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER;

while (this.running.get()) {
NatsMessage msg;
if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), maxAccumulate, reconnectWait);
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
}
else {
msg = this.outgoing.accumulate(sendBufferLength.get(), maxAccumulate, waitForMessage);
msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout);
}
if (msg != null) {
sendMessageBatch(msg, dataPort, stats);
Expand Down

0 comments on commit f1a5db4

Please sign in to comment.