Skip to content

Commit

Permalink
Issue 109 remove pull batch size limitation (#642)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored May 17, 2022
1 parent 95b9c81 commit b87bfff
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Iterator;
import java.util.List;

import static io.nats.client.support.Validator.validatePullBatchSize;
import static io.nats.client.support.Validator.validateGtZero;

public class NatsJetStreamPullSubscription extends NatsJetStreamSubscription {

Expand Down Expand Up @@ -100,7 +100,7 @@ public void pullExpiresIn(int batchSize, long expiresInMillis) {
}

private void _pull(int batchSize, boolean noWait, Duration expiresIn) {
int batch = validatePullBatchSize(batchSize);
int batch = validateGtZero(batchSize, "Pull batch size");
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
connection.publish(publishSubject, getSubject(), getPullJson(batch, noWait, expiresIn));
connection.lenientFlushBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

public interface NatsJetStreamConstants {
/**
* The maximum pull size
* The maximum pull size [NO LONGER ENFORCED]
*/
@Deprecated
int MAX_PULL_SIZE = 256;

/**
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/io/nats/client/support/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.MAX_HISTORY_PER_KEY;
import static io.nats.client.support.NatsJetStreamConstants.MAX_PULL_SIZE;

public abstract class Validator {
private Validator() {
Expand Down Expand Up @@ -174,13 +173,6 @@ public static String validateNonWildcardKvKey(String s, String label, boolean re
});
}

public static int validatePullBatchSize(int pullBatchSize) {
if (pullBatchSize < 1 || pullBatchSize > MAX_PULL_SIZE) {
throw new IllegalArgumentException("Pull Batch Size must be between 1 and " + MAX_PULL_SIZE + " inclusive [" + pullBatchSize + "]");
}
return pullBatchSize;
}

public static long validateMaxConsumers(long max) {
return validateGtZeroOrMinus1(max, "Max Consumers");
}
Expand Down Expand Up @@ -256,6 +248,13 @@ public static void validateNotNull(Object o, String fieldName) {
}
}

public static int validateGtZero(int i, String label) {
if (i < 1) {
throw new IllegalArgumentException(label + " must be greater than zero");
}
return i;
}

public static long validateGtZeroOrMinus1(long l, String label) {
if (zeroOrLtMinus1(l)) {
throw new IllegalArgumentException(label + " must be greater than zero or -1 for unlimited");
Expand Down
17 changes: 7 additions & 10 deletions src/test/java/io/nats/client/support/ValidatorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import static io.nats.client.support.NatsConstants.EMPTY;
import static io.nats.client.support.NatsJetStreamConstants.MAX_PULL_SIZE;
import static io.nats.client.support.Validator.*;
import static io.nats.client.utils.ResourceUtils.dataAsLines;
import static io.nats.client.utils.TestBase.*;
Expand Down Expand Up @@ -79,15 +78,6 @@ public void testValidateDurable() {
allowedNotRequiredEmptyAsNull(Validator::validateDurable, Arrays.asList(null, EMPTY));
}

@Test
public void testValidatePullBatchSize() {
assertEquals(1, validatePullBatchSize(1));
assertEquals(MAX_PULL_SIZE, validatePullBatchSize(MAX_PULL_SIZE));
assertThrows(IllegalArgumentException.class, () -> validatePullBatchSize(0));
assertThrows(IllegalArgumentException.class, () -> validatePullBatchSize(-1));
assertThrows(IllegalArgumentException.class, () -> validatePullBatchSize(MAX_PULL_SIZE + 1));
}

@Test
public void testValidateMaxConsumers() {
assertEquals(1, validateMaxConsumers(1));
Expand Down Expand Up @@ -304,6 +294,13 @@ public void testZeroOrLtMinus1() {
assertFalse(zeroOrLtMinus1(-1));
}

@Test
public void testValidateGtZero() {
assertEquals(1, validateGtZero(1, "test"));
assertThrows(IllegalArgumentException.class, () -> validateGtZero(0, "test"));
assertThrows(IllegalArgumentException.class, () -> validateGtZero(-1, "test"));
}

@Test
public void testValidateGtZeroOrMinus1() {
assertEquals(1, validateGtZeroOrMinus1(1, "test"));
Expand Down

0 comments on commit b87bfff

Please sign in to comment.