Skip to content

Commit

Permalink
unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Jun 3, 2024
1 parent 091dd8a commit 0c61de1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
84 changes: 47 additions & 37 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -703,45 +703,55 @@ public Builder json(String json) throws JsonParseException {
* Construct the builder and initialize values from the JsonValue object.
*/
public Builder jsonValue(JsonValue v) {
deliverPolicy = DeliverPolicy.get(readString(v, DELIVER_POLICY));
ackPolicy = AckPolicy.get(readString(v, ACK_POLICY));
replayPolicy = ReplayPolicy.get(readString(v, REPLAY_POLICY));

description = readString(v, DESCRIPTION);
durable = readString(v, DURABLE_NAME);
name = readString(v, NAME);
deliverSubject = readString(v, DELIVER_SUBJECT);
deliverGroup = readString(v, DELIVER_GROUP);
sampleFrequency = readString(v, SAMPLE_FREQ);
startTime = readDate(v, OPT_START_TIME);
ackWait = readNanos(v, ACK_WAIT);
idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
maxExpires = readNanos(v, MAX_EXPIRES);
inactiveThreshold = readNanos(v, INACTIVE_THRESHOLD);

startSeq = readLong(v, OPT_START_SEQ);
maxDeliver = readInteger(v, MAX_DELIVER);
rateLimit = readLong(v, RATE_LIMIT_BPS);
maxAckPending = readInteger(v, MAX_ACK_PENDING);
maxPullWaiting = readInteger(v, MAX_WAITING);
maxBatch = readInteger(v, MAX_BATCH);
maxBytes = readInteger(v, MAX_BYTES);
numReplicas = readInteger(v, NUM_REPLICAS);
pauseUntil = readDate(v, PAUSE_UNTIL);

flowControl = readBoolean(v, FLOW_CONTROL, null);
headersOnly = readBoolean(v, HEADERS_ONLY, null);
memStorage = readBoolean(v, MEM_STORAGE, null);

backoff = readNanosList(v, BACKOFF, true);
metadata = readStringStringMap(v, METADATA);

String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (tempFs == null) {
filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS);
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
ackPolicy(AckPolicy.get(readString(v, ACK_POLICY)));
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));

description(readString(v, DESCRIPTION));
durable(readString(v, DURABLE_NAME));
name(readString(v, NAME));
deliverSubject(readString(v, DELIVER_SUBJECT));
deliverGroup(readString(v, DELIVER_GROUP));
sampleFrequency(readString(v, SAMPLE_FREQ));
startTime(readDate(v, OPT_START_TIME));
ackWait(readNanos(v, ACK_WAIT));
maxExpires(readNanos(v, MAX_EXPIRES));
inactiveThreshold(readNanos(v, INACTIVE_THRESHOLD));

startSequence(readLong(v, OPT_START_SEQ));
maxDeliver(readLong(v, MAX_DELIVER, INTEGER_UNSET));
rateLimit(readLong(v, RATE_LIMIT_BPS));
maxAckPending(readLong(v, MAX_ACK_PENDING));
maxPullWaiting(readLong(v, MAX_WAITING));
maxBatch(readLong(v, MAX_BATCH));
maxBytes(readLong(v, MAX_BYTES));
numReplicas(readInteger(v, NUM_REPLICAS));
pauseUntil(readDate(v, PAUSE_UNTIL));

Duration idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
if (idleHeartbeat != null) {
if (readBoolean(v, FLOW_CONTROL, false)) {
flowControl(idleHeartbeat);
}
else {
idleHeartbeat(idleHeartbeat);
}
}

headersOnly(readBoolean(v, HEADERS_ONLY, null));
memStorage(readBoolean(v, MEM_STORAGE, null));

//noinspection DataFlowIssue readNanosList with false ensures not null;
backoff(readNanosList(v, BACKOFF, false).toArray(new Duration[0]));

metadata(readStringStringMap(v, METADATA));

String fs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (fs == null) {
filterSubjects(readOptionalStringList(v, FILTER_SUBJECTS));
}
else {
filterSubjects = Collections.singletonList(tempFs);
filterSubject(fs);
}

return this;
Expand Down
18 changes: 14 additions & 4 deletions src/test/java/io/nats/client/api/ConsumerConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@

package io.nats.client.api;

import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonValue;
import io.nats.client.support.SerializableConsumerConfiguration;
import io.nats.client.support.*;
import io.nats.client.utils.TestBase;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -421,5 +418,18 @@ public void testDowngradeFromLongToInt() {
assertEquals(Integer.MAX_VALUE, maxBatch);
assertEquals(Integer.MAX_VALUE, maxBytes);
}

@Test
public void testFlowControlIdleHeartbeatFromJson() throws JsonParseException {
String fc = "{\"deliver_policy\":\"all\",\"ack_policy\":\"explicit\",\"replay_policy\":\"instant\",\"idle_heartbeat\":5678000000,\"flow_control\":true}";
ConsumerConfiguration cc = ConsumerConfiguration.builder().json(fc).build();
assertTrue(cc.isFlowControl());
assertEquals(Duration.ofMillis(5678), cc.getIdleHeartbeat());

String hbOnly = "{\"deliver_policy\":\"all\",\"ack_policy\":\"explicit\",\"replay_policy\":\"instant\",\"idle_heartbeat\":5678000000}";
cc = ConsumerConfiguration.builder().json(hbOnly).build();
assertFalse(cc.isFlowControl());
assertEquals(Duration.ofMillis(5678), cc.getIdleHeartbeat());
}
}

0 comments on commit 0c61de1

Please sign in to comment.