Skip to content

Commit

Permalink
Multiple Filter Subjects Review (#984)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 29, 2023
1 parent f60a689 commit e3e5bad
Show file tree
Hide file tree
Showing 18 changed files with 360 additions and 298 deletions.
97 changes: 49 additions & 48 deletions README.md

Large diffs are not rendered by default.

30 changes: 21 additions & 9 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,25 @@ protected SubscribeOptions(Builder builder, boolean isPull,

stream = validateStreamName(builder.stream, bind); // required when bind mode

// read the names and do basic validation
String temp = validateConsumerName(
validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch),
false);
String durable = validateDurable(
validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch),
false);
// read the consumer names and do basic validation
// A1. validate name input
String temp = validateMustMatchIfBothSupplied(
builder.name,
builder.cc == null ? null : builder.cc.getName(),
JsSoNameMismatch);
// B1. Must be a valid consumer name if supplied
temp = validateConsumerName(temp, false);

// A2. validate durable input
String durable = validateMustMatchIfBothSupplied(
builder.durable,
builder.cc == null ? null : builder.cc.getDurable(),
JsSoDurableMismatch);

// B2. Must be a valid consumer name if supplied
durable = validateDurable(durable, false);

// C. name must match durable if both supplied
name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch);

if (bind && name == null) {
Expand Down Expand Up @@ -105,7 +117,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.ackPolicy(AckPolicy.None)
.maxDeliver(1)
.ackWait(Duration.ofHours(22))
.name(temp)
.name(name)
.memStorage(true)
.numReplicas(1);

Expand All @@ -119,7 +131,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.durable(durable)
.deliverSubject(deliverSubject)
.deliverGroup(deliverGroup)
.name(temp)
.name(name)
.build();
}
}
Expand Down
107 changes: 50 additions & 57 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final String name;
protected final String deliverSubject;
protected final String deliverGroup;
protected final List<String> filterSubjects;
protected final String sampleFrequency;
protected final ZonedDateTime startTime;
protected final Duration ackWait;
Expand All @@ -86,6 +85,7 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final Boolean memStorage;
protected final List<Duration> backoff;
protected final Map<String, String> metadata;
protected final List<String> filterSubjects;

protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.deliverPolicy = cc.deliverPolicy;
Expand All @@ -96,7 +96,6 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
this.filterSubjects = cc.filterSubjects;
this.sampleFrequency = cc.sampleFrequency;
this.startTime = cc.startTime;
this.ackWait = cc.ackWait;
Expand All @@ -116,6 +115,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.memStorage = cc.memStorage;
this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff);
this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata);
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
}

ConsumerConfiguration(JsonValue v) {
Expand All @@ -128,15 +128,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
name = readString(v, NAME);
deliverSubject = readString(v, DELIVER_SUBJECT);
deliverGroup = readString(v, DELIVER_GROUP);
String tempFs = readString(v, FILTER_SUBJECT);
if (tempFs != null) {
filterSubjects = Collections.singletonList(tempFs);
}
else {
filterSubjects = readStringList(v, FILTER_SUBJECTS);
}
sampleFrequency = readString(v, SAMPLE_FREQ);

startTime = readDate(v, OPT_START_TIME);
ackWait = readNanos(v, ACK_WAIT);
idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
Expand All @@ -158,6 +150,14 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {

backoff = readNanosList(v, BACKOFF, true);
metadata = readStringStringMap(v, METADATA);

String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (tempFs == null) {
filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS);
}
else {
filterSubjects = Collections.singletonList(tempFs);
}
}

// For the builder
Expand All @@ -172,7 +172,6 @@ protected ConsumerConfiguration(Builder b)
this.name = b.name;
this.startTime = b.startTime;
this.ackWait = b.ackWait;
this.filterSubjects = b.filterSubjects;
this.sampleFrequency = b.sampleFrequency;
this.deliverSubject = b.deliverSubject;
this.deliverGroup = b.deliverGroup;
Expand All @@ -195,6 +194,7 @@ protected ConsumerConfiguration(Builder b)

this.backoff = b.backoff;
this.metadata = b.metadata;
this.filterSubjects = b.filterSubjects;
}

/**
Expand All @@ -216,12 +216,6 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
if (filterSubjects.size() > 1) {
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
}
else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
}
JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
Expand All @@ -237,6 +231,14 @@ else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
JsonUtils.addField(sb, MEM_STORAGE, memStorage);
JsonUtils.addField(sb, METADATA, metadata);
if (filterSubjects != null) {
if (filterSubjects.size() > 1) {
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
}
else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
}
}
return endJson(sb).toString();
}

Expand Down Expand Up @@ -329,21 +331,31 @@ public long getMaxDeliver() {
}

/**
* Gets the first filter subject of this consumer configuration. Will be null if there are none.
* Gets the filter subject of this consumer configuration.
* With the introduction of multiple filter subjects, this method will
* return null if there are not exactly one filter subjects
* @return the first filter subject.
*/
public String getFilterSubject() {
return filterSubjects.isEmpty() ? null : filterSubjects.get(0);
return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
}

/**
* Gets the filter subjects as a list. May be empty, but won't be null
* Gets the filter subjects as a list. May be null, otherwise won't be empty
* @return the list
*/
public List<String> getFilterSubjects() {
return filterSubjects;
}

/**
* Whether there are multiple filter subjects for this consumer configuration.
* @return true if there are multiple filter subjects
*/
public boolean hasMultipleFilterSubjects() {
return filterSubjects != null && filterSubjects.size() > 1;
}

/**
* Gets the replay policy of this consumer configuration.
* @return the replay policy.
Expand Down Expand Up @@ -635,7 +647,6 @@ public static class Builder {
private String name;
private String deliverSubject;
private String deliverGroup;
private List<String> filterSubjects = new ArrayList<>();
private String sampleFrequency;

private ZonedDateTime startTime;
Expand All @@ -659,6 +670,7 @@ public static class Builder {

private List<Duration> backoff;
private Map<String, String> metadata;
private List<String> filterSubjects;

public Builder() {}

Expand All @@ -673,7 +685,6 @@ public Builder(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
this.sampleFrequency = cc.sampleFrequency;

this.startTime = cc.startTime;
Expand Down Expand Up @@ -701,6 +712,9 @@ public Builder(ConsumerConfiguration cc) {
if (cc.metadata != null) {
this.metadata = new HashMap<>(cc.metadata);
}
if (cc.filterSubjects != null) {
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
}
}
}

Expand Down Expand Up @@ -848,21 +862,25 @@ public Builder maxDeliver(long maxDeliver) {

/**
* Sets the filter subject of the ConsumerConfiguration.
* Replaces any other filter subjects set in the builder
* @param filterSubject the filter subject
* @return Builder
*/
public Builder filterSubject(String filterSubject) {
this.filterSubjects.clear();
if (!nullOrEmpty(filterSubject)) {
this.filterSubjects.add(filterSubject);
if (nullOrEmpty(filterSubject)) {
this.filterSubjects = null;
}
else {
this.filterSubjects = Collections.singletonList(filterSubject);
}
return this;
}


/**
* Sets the filter subjects of the ConsumerConfiguration.
* @param filterSubjects the array of filter subjects
* Replaces any other filter subjects set in the builder
* @param filterSubjects one or more filter subjects
* @return Builder
*/
public Builder filterSubjects(String... filterSubjects) {
Expand All @@ -871,18 +889,22 @@ public Builder filterSubjects(String... filterSubjects) {

/**
* Sets the filter subjects of the ConsumerConfiguration.
* Replaces any other filter subjects set in the builder
* @param filterSubjects the list of filter subjects
* @return Builder
*/
public Builder filterSubjects(List<String> filterSubjects) {
this.filterSubjects.clear();
this.filterSubjects = new ArrayList<>();
if (filterSubjects != null) {
for (String fs : filterSubjects) {
if (!nullOrEmpty(fs)) {
this.filterSubjects.add(fs);
}
}
}
if (this.filterSubjects.isEmpty()) {
this.filterSubjects = null;
}
return this;
}

Expand Down Expand Up @@ -1255,36 +1277,7 @@ public PullSubscribeOptions buildPullSubscribeOptions(String stream) {

@Override
public String toString() {
return "ConsumerConfiguration{" +
"description='" + description + '\'' +
", durable='" + durable + '\'' +
", name='" + name + '\'' +
", deliverPolicy=" + deliverPolicy +
", deliverSubject='" + deliverSubject + '\'' +
", deliverGroup='" + deliverGroup + '\'' +
", startSeq=" + startSeq +
", startTime=" + startTime +
", ackPolicy=" + ackPolicy +
", ackWait=" + ackWait +
", maxDeliver=" + maxDeliver +
", filterSubjects='" + String.join(",", filterSubjects) + '\'' +
", replayPolicy=" + replayPolicy +
", sampleFrequency='" + sampleFrequency + '\'' +
", rateLimit=" + rateLimit +
", maxAckPending=" + maxAckPending +
", idleHeartbeat=" + idleHeartbeat +
", flowControl=" + flowControl +
", maxPullWaiting=" + maxPullWaiting +
", maxBatch=" + maxBatch +
", maxBytes=" + maxBytes +
", maxExpires=" + maxExpires +
", numReplicas=" + numReplicas +
", headersOnly=" + headersOnly +
", memStorage=" + memStorage +
", inactiveThreshold=" + inactiveThreshold +
", backoff=" + backoff +
", metadata=" + metadata +
'}';
return "ConsumerConfiguration " + toJson();
}

protected static int getOrUnset(Integer val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) {
}

public String getFilterSubject() {
return filterSubjects.get(0);
return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
}

public List<String> getFilterSubjects() {
return filterSubjects;
}

public boolean hasMultipleFilterSubjects() {
return filterSubjects != null && filterSubjects.size() > 1;
}

public DeliverPolicy getDeliverPolicy() {
return deliverPolicy;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/api/SourceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readValue;
import static io.nats.client.support.Validator.listsAreEquivalent;
import static io.nats.client.support.Validator.consumerFilterSubjectsAreEquivalent;

public abstract class SourceBase implements JsonSerializable {
private final String name;
Expand Down Expand Up @@ -194,7 +194,7 @@ public boolean equals(Object o) {
if (!Objects.equals(filterSubject, that.filterSubject))
return false;
if (!Objects.equals(external, that.external)) return false;
return listsAreEquivalent(subjectTransforms, that.subjectTransforms);
return consumerFilterSubjectsAreEquivalent(subjectTransforms, that.subjectTransforms);
}

@Override
Expand Down
Loading

0 comments on commit e3e5bad

Please sign in to comment.