Skip to content

Commit

Permalink
Serializable Wrappers (#1156)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 10, 2024
1 parent b4c91ef commit affe6be
Show file tree
Hide file tree
Showing 14 changed files with 593 additions and 134 deletions.
68 changes: 60 additions & 8 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,22 @@
package io.nats.client;

import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.support.JsonParseException;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonValue;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.*;
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readLong;

/**
* Base Consume Options are provided to customize the way the consume and
* fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions.
*/
public class BaseConsumeOptions {
public class BaseConsumeOptions implements JsonSerializable {
public static final int DEFAULT_MESSAGE_COUNT = 500;
public static final int DEFAULT_MESSAGE_COUNT_WHEN_BYTES = 1_000_000;
public static final int DEFAULT_THRESHOLD_PERCENT = 25;
Expand Down Expand Up @@ -47,13 +57,34 @@ protected BaseConsumeOptions(Builder b) {

// validation handled in builder
thresholdPercent = b.thresholdPercent;
expiresIn = b.expiresIn;
noWait = b.noWait;

// if it's not noWait, it must have an expiresIn
// we can't check this in the builder because we can't guarantee order
// so we always default to LONG_UNSET in the builder and check it here.
if (b.expiresIn == ConsumerConfiguration.LONG_UNSET && !noWait) {
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
}
else {
expiresIn = b.expiresIn;
}

// calculated
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
}

@Override
public String toJson() {
StringBuilder sb = beginJson();
addField(sb, MESSAGES, messages);
addField(sb, BYTES, bytes);
addField(sb, EXPIRES_IN, expiresIn);
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
addFldWhenTrue(sb, NO_WAIT, noWait);
return endJson(sb).toString();
}

public long getExpiresInMillis() {
return expiresIn;
}
Expand All @@ -66,6 +97,10 @@ public int getThresholdPercent() {
return thresholdPercent;
}

public boolean isNoWait() {
return noWait;
}

protected static abstract class Builder<B, CO> {
protected int messages = -1;
protected long bytes = 0;
Expand All @@ -75,6 +110,28 @@ protected static abstract class Builder<B, CO> {

protected abstract B getThis();

protected B noWait() {
return getThis();
}

public B json(String json) throws JsonParseException {
return jsonValue(JsonParser.parse(json));
}

/**
* Construct the builder and initialize values from the JsonValue object.
*/
public B jsonValue(JsonValue v) {
messages(readInteger(v, MESSAGES, -1));
bytes(readLong(v, BYTES, -1));
expiresIn(readLong(v, EXPIRES_IN, MIN_EXPIRES_MILLS));
thresholdPercent(readInteger(v, THRESHOLD_PERCENT, -1));
if (readBoolean(v, NO_WAIT, false)) {
noWait();
}
return getThis();
}

protected B messages(int messages) {
this.messages = messages < 1 ? -1 : messages;
return getThis();
Expand All @@ -96,12 +153,7 @@ protected B bytes(long bytes) {
*/
public B expiresIn(long expiresInMillis) {
if (expiresInMillis < 1) { // this is way to clear or reset, just a code guard really
if (noWait) {
expiresIn = ConsumerConfiguration.LONG_UNSET;
}
else {
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
}
expiresIn = ConsumerConfiguration.LONG_UNSET;
}
else if (expiresInMillis < MIN_EXPIRES_MILLS) {
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/FetchConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public long getMaxBytes() {
return bytes;
}

public boolean isNoWait() { return noWait; }

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -100,6 +98,7 @@ public Builder max(int maxBytes, int maxMessages) {
* Set no wait to true
* @return the builder
*/
@Override
public Builder noWait() {
this.noWait = true;
expiresIn = ConsumerConfiguration.LONG_UNSET;
Expand Down
132 changes: 83 additions & 49 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@

import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.support.ApiConstants;
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;
import io.nats.client.support.*;

import java.time.Duration;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -120,49 +117,6 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
}

ConsumerConfiguration(JsonValue v) {
deliverPolicy = DeliverPolicy.get(readString(v, DELIVER_POLICY));
ackPolicy = AckPolicy.get(readString(v, ACK_POLICY));
replayPolicy = ReplayPolicy.get(readString(v, REPLAY_POLICY));

description = readString(v, DESCRIPTION);
durable = readString(v, DURABLE_NAME);
name = readString(v, NAME);
deliverSubject = readString(v, DELIVER_SUBJECT);
deliverGroup = readString(v, DELIVER_GROUP);
sampleFrequency = readString(v, SAMPLE_FREQ);
startTime = readDate(v, OPT_START_TIME);
ackWait = readNanos(v, ACK_WAIT);
idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
maxExpires = readNanos(v, MAX_EXPIRES);
inactiveThreshold = readNanos(v, INACTIVE_THRESHOLD);

startSeq = readLong(v, OPT_START_SEQ);
maxDeliver = readInteger(v, MAX_DELIVER);
rateLimit = readLong(v, RATE_LIMIT_BPS);
maxAckPending = readInteger(v, MAX_ACK_PENDING);
maxPullWaiting = readInteger(v, MAX_WAITING);
maxBatch = readInteger(v, MAX_BATCH);
maxBytes = readInteger(v, MAX_BYTES);
numReplicas = readInteger(v, NUM_REPLICAS);
pauseUntil = readDate(v, PAUSE_UNTIL);

flowControl = readBoolean(v, FLOW_CONTROL, null);
headersOnly = readBoolean(v, HEADERS_ONLY, null);
memStorage = readBoolean(v, MEM_STORAGE, null);

backoff = readNanosList(v, BACKOFF, true);
metadata = readStringStringMap(v, METADATA);

String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (tempFs == null) {
filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS);
}
else {
filterSubjects = Collections.singletonList(tempFs);
}
}

// For the builder
protected ConsumerConfiguration(Builder b)
{
Expand Down Expand Up @@ -203,7 +157,6 @@ protected ConsumerConfiguration(Builder b)

/**
* Returns a JSON representation of this consumer configuration.
*
* @return json consumer configuration json string
*/
public String toJson() {
Expand Down Expand Up @@ -686,8 +639,14 @@ public static class Builder {
private Map<String, String> metadata;
private List<String> filterSubjects;

/**
* Construct the builder
*/
public Builder() {}

/**
* Construct the builder and initialize values with the existing ConsumerConfiguration
*/
public Builder(ConsumerConfiguration cc) {
if (cc != null) {
this.deliverPolicy = cc.deliverPolicy;
Expand Down Expand Up @@ -732,6 +691,81 @@ public Builder(ConsumerConfiguration cc) {
}
}

/**
* Construct the builder and initialize values from the json string.
*/
public Builder json(String json) throws JsonParseException {
return jsonValue(JsonParser.parse(json));
}

/**
* Construct the builder and initialize values from the JsonValue object.
*/
public Builder jsonValue(JsonValue v) {
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
ackPolicy(AckPolicy.get(readString(v, ACK_POLICY)));
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));

description(readString(v, DESCRIPTION));
durable(readString(v, DURABLE_NAME));
name(readString(v, NAME));
deliverSubject(readString(v, DELIVER_SUBJECT));
deliverGroup(readString(v, DELIVER_GROUP));
sampleFrequency(readString(v, SAMPLE_FREQ));
startTime(readDate(v, OPT_START_TIME));
ackWait(readNanos(v, ACK_WAIT));
maxExpires(readNanos(v, MAX_EXPIRES));
inactiveThreshold(readNanos(v, INACTIVE_THRESHOLD));

startSequence(readLong(v, OPT_START_SEQ));
maxDeliver(readLong(v, MAX_DELIVER, INTEGER_UNSET));
rateLimit(readLong(v, RATE_LIMIT_BPS));
maxAckPending(readLong(v, MAX_ACK_PENDING));
maxPullWaiting(readLong(v, MAX_WAITING));
maxBatch(readLong(v, MAX_BATCH));
maxBytes(readLong(v, MAX_BYTES));

Integer r = readInteger(v, NUM_REPLICAS);
if (r != null) {
if (r == 0) {
numReplicas = 0;
}
else {
numReplicas(r);
}
}

pauseUntil(readDate(v, PAUSE_UNTIL));

Duration idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
if (idleHeartbeat != null) {
if (readBoolean(v, FLOW_CONTROL, false)) {
flowControl(idleHeartbeat);
}
else {
idleHeartbeat(idleHeartbeat);
}
}

headersOnly(readBoolean(v, HEADERS_ONLY, null));
memStorage(readBoolean(v, MEM_STORAGE, null));

//noinspection DataFlowIssue readNanosList with false ensures not null;
backoff(readNanosList(v, BACKOFF, false).toArray(new Duration[0]));

metadata(readStringStringMap(v, METADATA));

String fs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (fs == null) {
filterSubjects(readOptionalStringList(v, FILTER_SUBJECTS));
}
else {
filterSubject(fs);
}

return this;
}

/**
* Sets the description
* @param description the description
Expand Down Expand Up @@ -1245,7 +1279,7 @@ public Builder backoff(long... backoffsMillis) {
* @return Builder
*/
public Builder metadata(Map<String, String> metadata) {
this.metadata = metadata == null || metadata.size() == 0 ? null : new HashMap<>(metadata);
this.metadata = metadata == null || metadata.isEmpty() ? null : new HashMap<>(metadata);
return this;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/api/ConsumerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ConsumerInfo(Message msg) {

public ConsumerInfo(JsonValue vConsumerInfo) {
super(vConsumerInfo);
this.configuration = new ConsumerConfiguration(readObject(jv, CONFIG));
this.configuration = ConsumerConfiguration.builder().jsonValue(readObject(jv, CONFIG)).build();

stream = readString(jv, STREAM_NAME);
name = readString(jv, NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@

package io.nats.client.api;

import io.nats.client.support.*;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.*;
import static io.nats.client.support.Validator.emptyAsNull;

public class OrderedConsumerConfiguration {
public class OrderedConsumerConfiguration implements JsonSerializable {

public static String DEFAULT_FILTER_SUBJECT = ">";

Expand All @@ -43,6 +49,44 @@ public OrderedConsumerConfiguration() {
filterSubjects.add(DEFAULT_FILTER_SUBJECT);
}

public OrderedConsumerConfiguration(String json) throws JsonParseException {
this(JsonParser.parse(json));
}

public OrderedConsumerConfiguration(JsonValue v) throws JsonParseException {
this();
List<String> list = readOptionalStringList(v, FILTER_SUBJECTS);
if (list != null) {
filterSubjects(list);
}
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
startSequence(readLong(v, OPT_START_SEQ, ConsumerConfiguration.LONG_UNSET));
startTime(readDate(v, OPT_START_TIME));
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));
headersOnly(readBoolean(v, HEADERS_ONLY, null));
}

/**
* Returns a JSON representation of this ordered consumer configuration.
* @return json ordered consumer configuration json string
*/
public String toJson() {
StringBuilder sb = beginJson();
if (filterSubjects != null && !filterSubjects.isEmpty()) {
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
}
if (deliverPolicy != null) {
JsonUtils.addField(sb, DELIVER_POLICY, deliverPolicy.toString());
}
JsonUtils.addFieldWhenGtZero(sb, OPT_START_SEQ, startSequence);
JsonUtils.addField(sb, OPT_START_TIME, startTime);
if (replayPolicy != null) {
JsonUtils.addField(sb, REPLAY_POLICY, replayPolicy.toString());
}
JsonUtils.addFldWhenTrue(sb, HEADERS_ONLY, headersOnly);
return endJson(sb).toString();
}

/**
* Sets the filter subject of the OrderedConsumerConfiguration.
* @param filterSubject the filter subject
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface ApiConstants {
String ERROR = "error";
String ERRORS = "errors";
String EXPIRES = "expires";
String EXPIRES_IN = "expires_in";
String EXTERNAL = "external";
String FILTER = "filter";
String FILTER_SUBJECT = "filter_subject";
Expand Down Expand Up @@ -191,6 +192,7 @@ public interface ApiConstants {
String SUCCESS = "success";
String TAGS = "tags";
String TEMPLATE_OWNER = "template_owner";
String THRESHOLD_PERCENT = "threshold_percent";
String TIERS = "tiers";
String TIME = "time";
String TIMESTAMP = "ts";
Expand Down
Loading

0 comments on commit affe6be

Please sign in to comment.