Skip to content

Commit

Permalink
Add filtering to KV keys methods (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 6, 2024
1 parent b6e62f4 commit 197c04f
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 18 deletions.
44 changes: 43 additions & 1 deletion src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,55 @@ public interface KeyValue {
*/
List<String> keys() throws IOException, JetStreamApiException, InterruptedException;

/**
* Get a list of the keys in a bucket filtered by a
* subject-like string, for instance "key" or "key.foo.*" or "key.&gt;"
* @param filter the subject like key filter
* @return List of keys
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException;

/**
* Get a list of the keys in a bucket filtered by
* subject-like strings, for instance "aaa.*", "bbb.*;"
* @param filters the subject like key filters
* @return List of keys
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException;

/**
* Get a list of keys in the bucket through a LinkedBlockingQueue.
* A KeyResult with isDone being true or an exception signifies there are no
* A KeyResult with isDone being true or an exception signifies there are no more keys
* @return the LinkedBlockingQueue from which to poll
*/
LinkedBlockingQueue<KeyResult> consumeKeys();

/**
* Get a list of keys in the bucket through a LinkedBlockingQueue filtered by a
* subject-like string, for instance "key" or "key.foo.*" or "key.&gt;"
* A KeyResult with isDone being true or an exception signifies there are no more keys
* @param filter the subject like key filter
* @return the LinkedBlockingQueue from which to poll
*/
LinkedBlockingQueue<KeyResult> consumeKeys(String filter);

/**
* Get a list of keys in the bucket through a LinkedBlockingQueue filtered by
* subject-like strings, for instance "aaa.*", "bbb.*;"
* A KeyResult with isDone being true or an exception signifies there are no more keys
* @param filters the subject like key filters
* @return the LinkedBlockingQueue from which to poll
*/
LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters);

/**
* Get the history (list of KeyValueEntry) for a key
* @param key the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.*;
import static io.nats.client.support.NatsJetStreamConstants.DEFAULT_FILTER_SUBJECT;
import static io.nats.client.support.Validator.emptyAsNull;

public class OrderedConsumerConfiguration implements JsonSerializable {

public static String DEFAULT_FILTER_SUBJECT = ">";

private final List<String> filterSubjects;
private DeliverPolicy deliverPolicy;
private Long startSequence;
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/nats/client/impl/NatsFeatureBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

import static io.nats.client.support.NatsJetStreamConstants.JS_NO_MESSAGE_FOUND_ERR;

Expand Down Expand Up @@ -70,19 +72,30 @@ protected MessageInfo _getBySeq(long seq) throws IOException, JetStreamApiExcept
}

protected void visitSubject(String subject, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
visitSubject(Collections.singletonList(subject), deliverPolicy, headersOnly, ordered, handler);
}

protected void visitSubject(List<String> subjects, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
ConsumerConfiguration.Builder ccb = ConsumerConfiguration.builder()
.ackPolicy(AckPolicy.None)
.deliverPolicy(deliverPolicy)
.headersOnly(headersOnly);

if (subjects.size() == 1) {
ccb.filterSubject(subjects.get(0));
}
else {
ccb.filterSubjects(subjects);
}

PushSubscribeOptions pso = PushSubscribeOptions.builder()
.stream(streamName)
.ordered(ordered)
.configuration(
ConsumerConfiguration.builder()
.ackPolicy(AckPolicy.None)
.deliverPolicy(deliverPolicy)
.headersOnly(headersOnly)
.build())
.configuration(ccb.build())
.build();

Duration timeout = js.jso.getRequestTimeout();
JetStreamSubscription sub = js.subscribe(subject, pso);
JetStreamSubscription sub = js.subscribe(null, pso);
try {
boolean lastWasNull = false;
long pending = sub.getConsumerInfo().getCalculatedPending();
Expand Down
43 changes: 39 additions & 4 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import java.util.concurrent.LinkedBlockingQueue;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR;
import static io.nats.client.support.NatsJetStreamConstants.JS_WRONG_LAST_SEQUENCE;
import static io.nats.client.support.NatsJetStreamConstants.*;
import static io.nats.client.support.NatsKeyValueUtil.*;
import static io.nats.client.support.Validator.*;

Expand Down Expand Up @@ -280,8 +279,26 @@ public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long from
*/
@Override
public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
return _keys(Collections.singletonList(readSubject(DEFAULT_FILTER_SUBJECT)));
}

@Override
public List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException {
return _keys(Collections.singletonList(readSubject(filter)));
}

@Override
public List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException {
List<String> readSubjectFilters = new ArrayList<>(filters.size());
for (String f : filters) {
readSubjectFilters.add(readSubject(f));
}
return _keys(readSubjectFilters);
}

private List<String> _keys(List<String> readSubjectFilters) throws IOException, JetStreamApiException, InterruptedException {
List<String> list = new ArrayList<>();
visitSubject(readSubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> {
visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
KeyValueOperation op = getOperation(m.getHeaders());
if (op == KeyValueOperation.PUT) {
list.add(new BucketAndKey(m).key);
Expand All @@ -295,9 +312,27 @@ public List<String> keys() throws IOException, JetStreamApiException, Interrupte
*/
@Override
public LinkedBlockingQueue<KeyResult> consumeKeys() {
return _consumeKeys(Collections.singletonList(readSubject(DEFAULT_FILTER_SUBJECT)));
}

@Override
public LinkedBlockingQueue<KeyResult> consumeKeys(String filter) {
return _consumeKeys(Collections.singletonList(readSubject(filter)));
}

@Override
public LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters) {
List<String> readSubjectFilters = new ArrayList<>(filters.size());
for (String f : filters) {
readSubjectFilters.add(readSubject(f));
}
return _consumeKeys(readSubjectFilters);
}

private LinkedBlockingQueue<KeyResult> _consumeKeys(List<String> readSubjectFilters) {
LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<>();
try {
visitSubject(readSubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> {
visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
KeyValueOperation op = getOperation(m.getHeaders());
if (op == KeyValueOperation.PUT) {
q.offer(new KeyResult(new BucketAndKey(m).key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,6 @@ public interface NatsJetStreamConstants {
int JS_CONSUMER_NOT_FOUND_ERR = 10014;
int JS_NO_MESSAGE_FOUND_ERR = 10037;
int JS_WRONG_LAST_SEQUENCE = 10071;

String DEFAULT_FILTER_SUBJECT = ">";
}
36 changes: 32 additions & 4 deletions src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class KeyValueTests extends JetStreamTestBase {
public void testWorkflow() throws Exception {
long now = ZonedDateTime.now().toEpochSecond();

String byteKey = "byteKey" + variant();
String stringKey = "stringKey" + variant();
String longKey = "longKey" + variant();
String byteKey = "key.byte" + variant();
String stringKey = "key.string" + variant();
String longKey = "key.long" + variant();
String notFoundKey = "notFound" + variant();
String byteValue1 = "Byte Value 1";
String byteValue2 = "Byte Value 2";
Expand Down Expand Up @@ -202,9 +202,15 @@ public void testWorkflow() throws Exception {
status = kvm.getStatus(bucket);
assertState(status, 8, 9);

// should have exactly these 3 keys
assertKeys(kv.keys(), byteKey, stringKey, longKey);
assertKeys(kv.keys("key.>"), byteKey, stringKey, longKey);
assertKeys(kv.keys(byteKey), byteKey);
assertKeys(kv.keys(Arrays.asList(longKey, stringKey)), longKey, stringKey);

assertKeys(getKeysFromQueue(kv.consumeKeys()), byteKey, stringKey, longKey);
assertKeys(getKeysFromQueue(kv.consumeKeys("key.>")), byteKey, stringKey, longKey);
assertKeys(getKeysFromQueue(kv.consumeKeys(byteKey)), byteKey);
assertKeys(getKeysFromQueue(kv.consumeKeys(Arrays.asList(longKey, stringKey))), longKey, stringKey);

// purge
kv.purge(longKey);
Expand Down Expand Up @@ -1645,4 +1651,26 @@ public void testKeyValueTransform() throws Exception {
// assertNull(kv2.get(key2));
});
}


@Test
public void testSubjectFiltersAgainst209OptOut() throws Exception {
jsServer.run(nc -> {
KeyValueManagement kvm = nc.keyValueManagement();

String bucket = bucket();
kvm.create(KeyValueConfiguration.builder()
.name(bucket)
.storageType(StorageType.Memory)
.build());

JetStreamOptions jso = JetStreamOptions.builder().optOut290ConsumerCreate(true).build();
KeyValueOptions kvo = KeyValueOptions.builder().jetStreamOptions(jso).build();
KeyValue kv = nc.keyValue(bucket, kvo);
kv.put("one", 1);
kv.put("two", 2);
assertKeys(kv.keys(Arrays.asList("one", "two")), "one", "two");
});
}
}

0 comments on commit 197c04f

Please sign in to comment.