From 5635f512173475f66c195198e693d5fe2efad708 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 11 Jun 2024 15:12:00 -0400 Subject: [PATCH] Serialization changed fixed javadocs --- .../io/nats/client/BaseConsumeOptions.java | 22 ++++-- src/main/java/io/nats/client/KeyValue.java | 8 ++ .../client/api/ConsumerConfiguration.java | 78 ++++++++++--------- .../java/io/nats/client/api/KeyResult.java | 54 +++++++++++++ .../io/nats/client/impl/NatsKeyValue.java | 26 +++++++ .../io/nats/client/impl/KeyValueTests.java | 56 +++++++++++++ 6 files changed, 201 insertions(+), 43 deletions(-) create mode 100644 src/main/java/io/nats/client/api/KeyResult.java diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index 638cb8b7b..58a54000a 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -114,19 +114,27 @@ protected B noWait() { return getThis(); } + /** + * Initialize values from the json string. + * @param json the json string to parse + * @return the builder + * @throws JsonParseException if the json is invalid + */ public B json(String json) throws JsonParseException { return jsonValue(JsonParser.parse(json)); } /** - * Construct the builder and initialize values from the JsonValue object. + * Initialize values from the JsonValue object. + * @param jsonValue the json value object + * @return the builder */ - public B jsonValue(JsonValue v) { - messages(readInteger(v, MESSAGES, -1)); - bytes(readLong(v, BYTES, -1)); - expiresIn(readLong(v, EXPIRES_IN, MIN_EXPIRES_MILLS)); - thresholdPercent(readInteger(v, THRESHOLD_PERCENT, -1)); - if (readBoolean(v, NO_WAIT, false)) { + public B jsonValue(JsonValue jsonValue) { + messages(readInteger(jsonValue, MESSAGES, -1)); + bytes(readLong(jsonValue, BYTES, -1)); + expiresIn(readLong(jsonValue, EXPIRES_IN, MIN_EXPIRES_MILLS)); + thresholdPercent(readInteger(jsonValue, THRESHOLD_PERCENT, -1)); + if (readBoolean(jsonValue, NO_WAIT, false)) { noWait(); } return getThis(); diff --git a/src/main/java/io/nats/client/KeyValue.java b/src/main/java/io/nats/client/KeyValue.java index 28bd71aba..9ed12aae8 100644 --- a/src/main/java/io/nats/client/KeyValue.java +++ b/src/main/java/io/nats/client/KeyValue.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; /** * Key Value Store Management context for creation and access to key value buckets. @@ -256,6 +257,13 @@ public interface KeyValue { */ List keys() throws IOException, JetStreamApiException, InterruptedException; + /** + * Get a list of keys in the bucket through a LinkedBlockingQueue. + * A KeyResult with isDone being true or an exception signifies there are no + * @return the LinkedBlockingQueue from which to poll + */ + LinkedBlockingQueue consumeKeys(); + /** * Get the history (list of KeyValueEntry) for a key * @param key the key diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java index 03f4210fb..ab3c753a0 100644 --- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java @@ -646,6 +646,7 @@ public Builder() {} /** * Construct the builder and initialize values with the existing ConsumerConfiguration + * @param cc the consumer configuration to clone */ public Builder(ConsumerConfiguration cc) { if (cc != null) { @@ -692,40 +693,45 @@ public Builder(ConsumerConfiguration cc) { } /** - * Construct the builder and initialize values from the json string. + * Initialize values from the json string. + * @param json the json string to parse + * @return the builder + * @throws JsonParseException if the json is invalid */ public Builder json(String json) throws JsonParseException { return jsonValue(JsonParser.parse(json)); } /** - * Construct the builder and initialize values from the JsonValue object. + * Initialize values from the JsonValue object. + * @param jsonValue the json value object + * @return the builder */ - public Builder jsonValue(JsonValue v) { - deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY))); - ackPolicy(AckPolicy.get(readString(v, ACK_POLICY))); - replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY))); - - description(readString(v, DESCRIPTION)); - durable(readString(v, DURABLE_NAME)); - name(readString(v, NAME)); - deliverSubject(readString(v, DELIVER_SUBJECT)); - deliverGroup(readString(v, DELIVER_GROUP)); - sampleFrequency(readString(v, SAMPLE_FREQ)); - startTime(readDate(v, OPT_START_TIME)); - ackWait(readNanos(v, ACK_WAIT)); - maxExpires(readNanos(v, MAX_EXPIRES)); - inactiveThreshold(readNanos(v, INACTIVE_THRESHOLD)); - - startSequence(readLong(v, OPT_START_SEQ)); - maxDeliver(readLong(v, MAX_DELIVER, INTEGER_UNSET)); - rateLimit(readLong(v, RATE_LIMIT_BPS)); - maxAckPending(readLong(v, MAX_ACK_PENDING)); - maxPullWaiting(readLong(v, MAX_WAITING)); - maxBatch(readLong(v, MAX_BATCH)); - maxBytes(readLong(v, MAX_BYTES)); - - Integer r = readInteger(v, NUM_REPLICAS); + public Builder jsonValue(JsonValue jsonValue) { + deliverPolicy(DeliverPolicy.get(readString(jsonValue, DELIVER_POLICY))); + ackPolicy(AckPolicy.get(readString(jsonValue, ACK_POLICY))); + replayPolicy(ReplayPolicy.get(readString(jsonValue, REPLAY_POLICY))); + + description(readString(jsonValue, DESCRIPTION)); + durable(readString(jsonValue, DURABLE_NAME)); + name(readString(jsonValue, NAME)); + deliverSubject(readString(jsonValue, DELIVER_SUBJECT)); + deliverGroup(readString(jsonValue, DELIVER_GROUP)); + sampleFrequency(readString(jsonValue, SAMPLE_FREQ)); + startTime(readDate(jsonValue, OPT_START_TIME)); + ackWait(readNanos(jsonValue, ACK_WAIT)); + maxExpires(readNanos(jsonValue, MAX_EXPIRES)); + inactiveThreshold(readNanos(jsonValue, INACTIVE_THRESHOLD)); + + startSequence(readLong(jsonValue, OPT_START_SEQ)); + maxDeliver(readLong(jsonValue, MAX_DELIVER, INTEGER_UNSET)); + rateLimit(readLong(jsonValue, RATE_LIMIT_BPS)); + maxAckPending(readLong(jsonValue, MAX_ACK_PENDING)); + maxPullWaiting(readLong(jsonValue, MAX_WAITING)); + maxBatch(readLong(jsonValue, MAX_BATCH)); + maxBytes(readLong(jsonValue, MAX_BYTES)); + + Integer r = readInteger(jsonValue, NUM_REPLICAS); if (r != null) { if (r == 0) { numReplicas = 0; @@ -735,11 +741,11 @@ public Builder jsonValue(JsonValue v) { } } - pauseUntil(readDate(v, PAUSE_UNTIL)); + pauseUntil(readDate(jsonValue, PAUSE_UNTIL)); - Duration idleHeartbeat = readNanos(v, IDLE_HEARTBEAT); + Duration idleHeartbeat = readNanos(jsonValue, IDLE_HEARTBEAT); if (idleHeartbeat != null) { - if (readBoolean(v, FLOW_CONTROL, false)) { + if (readBoolean(jsonValue, FLOW_CONTROL, false)) { flowControl(idleHeartbeat); } else { @@ -747,17 +753,17 @@ public Builder jsonValue(JsonValue v) { } } - headersOnly(readBoolean(v, HEADERS_ONLY, null)); - memStorage(readBoolean(v, MEM_STORAGE, null)); + headersOnly(readBoolean(jsonValue, HEADERS_ONLY, null)); + memStorage(readBoolean(jsonValue, MEM_STORAGE, null)); //noinspection DataFlowIssue readNanosList with false ensures not null; - backoff(readNanosList(v, BACKOFF, false).toArray(new Duration[0])); + backoff(readNanosList(jsonValue, BACKOFF, false).toArray(new Duration[0])); - metadata(readStringStringMap(v, METADATA)); + metadata(readStringStringMap(jsonValue, METADATA)); - String fs = emptyAsNull(readString(v, FILTER_SUBJECT)); + String fs = emptyAsNull(readString(jsonValue, FILTER_SUBJECT)); if (fs == null) { - filterSubjects(readOptionalStringList(v, FILTER_SUBJECTS)); + filterSubjects(readOptionalStringList(jsonValue, FILTER_SUBJECTS)); } else { filterSubject(fs); diff --git a/src/main/java/io/nats/client/api/KeyResult.java b/src/main/java/io/nats/client/api/KeyResult.java new file mode 100644 index 000000000..2a3daf061 --- /dev/null +++ b/src/main/java/io/nats/client/api/KeyResult.java @@ -0,0 +1,54 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package io.nats.client.api; + +public class KeyResult { + + private final String key; + private final Exception e; + + public KeyResult() { + this.key = null; + this.e = null; + } + + public KeyResult(String key) { + this.key = key; + this.e = null; + } + + public KeyResult(Exception e) { + this.key = null; + this.e = e; + } + + public String getKey() { + return key; + } + + public Exception getException() { + return e; + } + + public boolean isKey() { + return key != null; + } + + public boolean isException() { + return e != null; + } + + public boolean isDone() { + return key == null; + } +} diff --git a/src/main/java/io/nats/client/impl/NatsKeyValue.java b/src/main/java/io/nats/client/impl/NatsKeyValue.java index 763d13635..ed6d201bd 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValue.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValue.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import static io.nats.client.support.NatsConstants.DOT; import static io.nats.client.support.NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR; @@ -289,6 +290,31 @@ public List keys() throws IOException, JetStreamApiException, Interrupte return list; } + /** + * {@inheritDoc} + */ + @Override + public LinkedBlockingQueue consumeKeys() { + LinkedBlockingQueue q = new LinkedBlockingQueue<>(); + try { + visitSubject(readSubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> { + KeyValueOperation op = getOperation(m.getHeaders()); + if (op == KeyValueOperation.PUT) { + q.offer(new KeyResult(new BucketAndKey(m).key)); + } + }); + q.offer(new KeyResult()); + } + catch (IOException | JetStreamApiException e) { + q.offer(new KeyResult(e)); + } + catch (InterruptedException e) { + q.offer(new KeyResult(e)); + Thread.currentThread().interrupt(); + } + return q; + } + /** * {@inheritDoc} */ diff --git a/src/test/java/io/nats/client/impl/KeyValueTests.java b/src/test/java/io/nats/client/impl/KeyValueTests.java index 51d8828f7..7107c38f0 100644 --- a/src/test/java/io/nats/client/impl/KeyValueTests.java +++ b/src/test/java/io/nats/client/impl/KeyValueTests.java @@ -23,6 +23,8 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static io.nats.client.JetStreamOptions.DEFAULT_JS_OPTIONS; import static io.nats.client.api.KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS; @@ -202,6 +204,7 @@ public void testWorkflow() throws Exception { // should have exactly these 3 keys assertKeys(kv.keys(), byteKey, stringKey, longKey); + assertKeys(getKeysFromQueue(kv.consumeKeys()), byteKey, stringKey, longKey); // purge kv.purge(longKey); @@ -215,6 +218,7 @@ public void testWorkflow() throws Exception { // only 2 keys now assertKeys(kv.keys(), byteKey, stringKey); + assertKeys(getKeysFromQueue(kv.consumeKeys()), byteKey, stringKey); kv.purge(byteKey); byteHistory.clear(); @@ -227,6 +231,7 @@ public void testWorkflow() throws Exception { // only 1 key now assertKeys(kv.keys(), stringKey); + assertKeys(getKeysFromQueue(kv.consumeKeys()), stringKey); kv.purge(stringKey); stringHistory.clear(); @@ -239,6 +244,7 @@ public void testWorkflow() throws Exception { // no more keys left assertKeys(kv.keys()); + assertKeys(getKeysFromQueue(kv.consumeKeys())); // clear things KeyValuePurgeOptions kvpo = KeyValuePurgeOptions.builder().deleteMarkersNoThreshold().build(); @@ -387,6 +393,15 @@ public void testKeys() throws Exception { List keys = kv.keys(); assertEquals(10, keys.size()); + for (int x = 1; x <= 10; x++) { + assertTrue(keys.contains("k" + x)); + } + + keys = getKeysFromQueue(kv.consumeKeys()); + assertEquals(10, keys.size()); + for (int x = 1; x <= 10; x++) { + assertTrue(keys.contains("k" + x)); + } kv.delete("k1"); kv.delete("k3"); @@ -396,6 +411,8 @@ public void testKeys() throws Exception { keys = kv.keys(); assertEquals(5, keys.size()); + keys = getKeysFromQueue(kv.consumeKeys()); + assertEquals(5, keys.size()); for (int x = 2; x <= 10; x += 2) { assertTrue(keys.contains("k" + x)); @@ -405,9 +422,48 @@ public void testKeys() throws Exception { kv.put(keyWithDot, "key has dot"); KeyValueEntry kve = kv.get(keyWithDot); assertEquals(keyWithDot, kve.getKey()); + + for (int x = 1; x <= 500; x++) { + kv.put("x" + x, x); + } + + keys = kv.keys(); + assertEquals(506, keys.size()); // 506 because there are left over keys from other part of test + for (int x = 1; x <= 500; x++) { + assertTrue(keys.contains("x" + x)); + } + + keys = getKeysFromQueue(kv.consumeKeys()); + assertEquals(506, keys.size()); + for (int x = 1; x <= 500; x++) { + assertTrue(keys.contains("x" + x)); + } }); } + private static List getKeysFromQueue(LinkedBlockingQueue q) { + List keys = new ArrayList<>(); + try { + boolean notDone = true; + do { + KeyResult r = q.poll(100, TimeUnit.SECONDS); + if (r != null) { + if (r.isDone()) { + notDone = false; + } + else { + keys.add(r.getKey()); + } + } + } + while (notDone); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return keys; + } + @Test public void testMaxHistoryPerKey() throws Exception { jsServer.run(nc -> {