Skip to content

Commit

Permalink
Serialization changed fixed javadocs (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 11, 2024
1 parent affe6be commit cb123c8
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 43 deletions.
22 changes: 15 additions & 7 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,27 @@ protected B noWait() {
return getThis();
}

/**
* Initialize values from the json string.
* @param json the json string to parse
* @return the builder
* @throws JsonParseException if the json is invalid
*/
public B json(String json) throws JsonParseException {
return jsonValue(JsonParser.parse(json));
}

/**
* Construct the builder and initialize values from the JsonValue object.
* Initialize values from the JsonValue object.
* @param jsonValue the json value object
* @return the builder
*/
public B jsonValue(JsonValue v) {
messages(readInteger(v, MESSAGES, -1));
bytes(readLong(v, BYTES, -1));
expiresIn(readLong(v, EXPIRES_IN, MIN_EXPIRES_MILLS));
thresholdPercent(readInteger(v, THRESHOLD_PERCENT, -1));
if (readBoolean(v, NO_WAIT, false)) {
public B jsonValue(JsonValue jsonValue) {
messages(readInteger(jsonValue, MESSAGES, -1));
bytes(readLong(jsonValue, BYTES, -1));
expiresIn(readLong(jsonValue, EXPIRES_IN, MIN_EXPIRES_MILLS));
thresholdPercent(readInteger(jsonValue, THRESHOLD_PERCENT, -1));
if (readBoolean(jsonValue, NO_WAIT, false)) {
noWait();
}
return getThis();
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Key Value Store Management context for creation and access to key value buckets.
Expand Down Expand Up @@ -256,6 +257,13 @@ public interface KeyValue {
*/
List<String> keys() throws IOException, JetStreamApiException, InterruptedException;

/**
* Get a list of keys in the bucket through a LinkedBlockingQueue.
* A KeyResult with isDone being true or an exception signifies there are no
* @return the LinkedBlockingQueue from which to poll
*/
LinkedBlockingQueue<KeyResult> consumeKeys();

/**
* Get the history (list of KeyValueEntry) for a key
* @param key the key
Expand Down
78 changes: 42 additions & 36 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ public Builder() {}

/**
* Construct the builder and initialize values with the existing ConsumerConfiguration
* @param cc the consumer configuration to clone
*/
public Builder(ConsumerConfiguration cc) {
if (cc != null) {
Expand Down Expand Up @@ -692,40 +693,45 @@ public Builder(ConsumerConfiguration cc) {
}

/**
* Construct the builder and initialize values from the json string.
* Initialize values from the json string.
* @param json the json string to parse
* @return the builder
* @throws JsonParseException if the json is invalid
*/
public Builder json(String json) throws JsonParseException {
return jsonValue(JsonParser.parse(json));
}

/**
* Construct the builder and initialize values from the JsonValue object.
* Initialize values from the JsonValue object.
* @param jsonValue the json value object
* @return the builder
*/
public Builder jsonValue(JsonValue v) {
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
ackPolicy(AckPolicy.get(readString(v, ACK_POLICY)));
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));

description(readString(v, DESCRIPTION));
durable(readString(v, DURABLE_NAME));
name(readString(v, NAME));
deliverSubject(readString(v, DELIVER_SUBJECT));
deliverGroup(readString(v, DELIVER_GROUP));
sampleFrequency(readString(v, SAMPLE_FREQ));
startTime(readDate(v, OPT_START_TIME));
ackWait(readNanos(v, ACK_WAIT));
maxExpires(readNanos(v, MAX_EXPIRES));
inactiveThreshold(readNanos(v, INACTIVE_THRESHOLD));

startSequence(readLong(v, OPT_START_SEQ));
maxDeliver(readLong(v, MAX_DELIVER, INTEGER_UNSET));
rateLimit(readLong(v, RATE_LIMIT_BPS));
maxAckPending(readLong(v, MAX_ACK_PENDING));
maxPullWaiting(readLong(v, MAX_WAITING));
maxBatch(readLong(v, MAX_BATCH));
maxBytes(readLong(v, MAX_BYTES));

Integer r = readInteger(v, NUM_REPLICAS);
public Builder jsonValue(JsonValue jsonValue) {
deliverPolicy(DeliverPolicy.get(readString(jsonValue, DELIVER_POLICY)));
ackPolicy(AckPolicy.get(readString(jsonValue, ACK_POLICY)));
replayPolicy(ReplayPolicy.get(readString(jsonValue, REPLAY_POLICY)));

description(readString(jsonValue, DESCRIPTION));
durable(readString(jsonValue, DURABLE_NAME));
name(readString(jsonValue, NAME));
deliverSubject(readString(jsonValue, DELIVER_SUBJECT));
deliverGroup(readString(jsonValue, DELIVER_GROUP));
sampleFrequency(readString(jsonValue, SAMPLE_FREQ));
startTime(readDate(jsonValue, OPT_START_TIME));
ackWait(readNanos(jsonValue, ACK_WAIT));
maxExpires(readNanos(jsonValue, MAX_EXPIRES));
inactiveThreshold(readNanos(jsonValue, INACTIVE_THRESHOLD));

startSequence(readLong(jsonValue, OPT_START_SEQ));
maxDeliver(readLong(jsonValue, MAX_DELIVER, INTEGER_UNSET));
rateLimit(readLong(jsonValue, RATE_LIMIT_BPS));
maxAckPending(readLong(jsonValue, MAX_ACK_PENDING));
maxPullWaiting(readLong(jsonValue, MAX_WAITING));
maxBatch(readLong(jsonValue, MAX_BATCH));
maxBytes(readLong(jsonValue, MAX_BYTES));

Integer r = readInteger(jsonValue, NUM_REPLICAS);
if (r != null) {
if (r == 0) {
numReplicas = 0;
Expand All @@ -735,29 +741,29 @@ public Builder jsonValue(JsonValue v) {
}
}

pauseUntil(readDate(v, PAUSE_UNTIL));
pauseUntil(readDate(jsonValue, PAUSE_UNTIL));

Duration idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
Duration idleHeartbeat = readNanos(jsonValue, IDLE_HEARTBEAT);
if (idleHeartbeat != null) {
if (readBoolean(v, FLOW_CONTROL, false)) {
if (readBoolean(jsonValue, FLOW_CONTROL, false)) {
flowControl(idleHeartbeat);
}
else {
idleHeartbeat(idleHeartbeat);
}
}

headersOnly(readBoolean(v, HEADERS_ONLY, null));
memStorage(readBoolean(v, MEM_STORAGE, null));
headersOnly(readBoolean(jsonValue, HEADERS_ONLY, null));
memStorage(readBoolean(jsonValue, MEM_STORAGE, null));

//noinspection DataFlowIssue readNanosList with false ensures not null;
backoff(readNanosList(v, BACKOFF, false).toArray(new Duration[0]));
backoff(readNanosList(jsonValue, BACKOFF, false).toArray(new Duration[0]));

metadata(readStringStringMap(v, METADATA));
metadata(readStringStringMap(jsonValue, METADATA));

String fs = emptyAsNull(readString(v, FILTER_SUBJECT));
String fs = emptyAsNull(readString(jsonValue, FILTER_SUBJECT));
if (fs == null) {
filterSubjects(readOptionalStringList(v, FILTER_SUBJECTS));
filterSubjects(readOptionalStringList(jsonValue, FILTER_SUBJECTS));
}
else {
filterSubject(fs);
Expand Down
54 changes: 54 additions & 0 deletions src/main/java/io/nats/client/api/KeyResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io.nats.client.api;

public class KeyResult {

private final String key;
private final Exception e;

public KeyResult() {
this.key = null;
this.e = null;
}

public KeyResult(String key) {
this.key = key;
this.e = null;
}

public KeyResult(Exception e) {
this.key = null;
this.e = e;
}

public String getKey() {
return key;
}

public Exception getException() {
return e;
}

public boolean isKey() {
return key != null;
}

public boolean isException() {
return e != null;
}

public boolean isDone() {
return key == null;
}
}
26 changes: 26 additions & 0 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR;
Expand Down Expand Up @@ -289,6 +290,31 @@ public List<String> keys() throws IOException, JetStreamApiException, Interrupte
return list;
}

/**
* {@inheritDoc}
*/
@Override
public LinkedBlockingQueue<KeyResult> consumeKeys() {
LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<>();
try {
visitSubject(readSubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> {
KeyValueOperation op = getOperation(m.getHeaders());
if (op == KeyValueOperation.PUT) {
q.offer(new KeyResult(new BucketAndKey(m).key));
}
});
q.offer(new KeyResult());
}
catch (IOException | JetStreamApiException e) {
q.offer(new KeyResult(e));
}
catch (InterruptedException e) {
q.offer(new KeyResult(e));
Thread.currentThread().interrupt();
}
return q;
}

/**
* {@inheritDoc}
*/
Expand Down
56 changes: 56 additions & 0 deletions src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static io.nats.client.JetStreamOptions.DEFAULT_JS_OPTIONS;
import static io.nats.client.api.KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS;
Expand Down Expand Up @@ -202,6 +204,7 @@ public void testWorkflow() throws Exception {

// should have exactly these 3 keys
assertKeys(kv.keys(), byteKey, stringKey, longKey);
assertKeys(getKeysFromQueue(kv.consumeKeys()), byteKey, stringKey, longKey);

// purge
kv.purge(longKey);
Expand All @@ -215,6 +218,7 @@ public void testWorkflow() throws Exception {

// only 2 keys now
assertKeys(kv.keys(), byteKey, stringKey);
assertKeys(getKeysFromQueue(kv.consumeKeys()), byteKey, stringKey);

kv.purge(byteKey);
byteHistory.clear();
Expand All @@ -227,6 +231,7 @@ public void testWorkflow() throws Exception {

// only 1 key now
assertKeys(kv.keys(), stringKey);
assertKeys(getKeysFromQueue(kv.consumeKeys()), stringKey);

kv.purge(stringKey);
stringHistory.clear();
Expand All @@ -239,6 +244,7 @@ public void testWorkflow() throws Exception {

// no more keys left
assertKeys(kv.keys());
assertKeys(getKeysFromQueue(kv.consumeKeys()));

// clear things
KeyValuePurgeOptions kvpo = KeyValuePurgeOptions.builder().deleteMarkersNoThreshold().build();
Expand Down Expand Up @@ -387,6 +393,15 @@ public void testKeys() throws Exception {

List<String> keys = kv.keys();
assertEquals(10, keys.size());
for (int x = 1; x <= 10; x++) {
assertTrue(keys.contains("k" + x));
}

keys = getKeysFromQueue(kv.consumeKeys());
assertEquals(10, keys.size());
for (int x = 1; x <= 10; x++) {
assertTrue(keys.contains("k" + x));
}

kv.delete("k1");
kv.delete("k3");
Expand All @@ -396,6 +411,8 @@ public void testKeys() throws Exception {

keys = kv.keys();
assertEquals(5, keys.size());
keys = getKeysFromQueue(kv.consumeKeys());
assertEquals(5, keys.size());

for (int x = 2; x <= 10; x += 2) {
assertTrue(keys.contains("k" + x));
Expand All @@ -405,9 +422,48 @@ public void testKeys() throws Exception {
kv.put(keyWithDot, "key has dot");
KeyValueEntry kve = kv.get(keyWithDot);
assertEquals(keyWithDot, kve.getKey());

for (int x = 1; x <= 500; x++) {
kv.put("x" + x, x);
}

keys = kv.keys();
assertEquals(506, keys.size()); // 506 because there are left over keys from other part of test
for (int x = 1; x <= 500; x++) {
assertTrue(keys.contains("x" + x));
}

keys = getKeysFromQueue(kv.consumeKeys());
assertEquals(506, keys.size());
for (int x = 1; x <= 500; x++) {
assertTrue(keys.contains("x" + x));
}
});
}

private static List<String> getKeysFromQueue(LinkedBlockingQueue<KeyResult> q) {
List<String> keys = new ArrayList<>();
try {
boolean notDone = true;
do {
KeyResult r = q.poll(100, TimeUnit.SECONDS);
if (r != null) {
if (r.isDone()) {
notDone = false;
}
else {
keys.add(r.getKey());
}
}
}
while (notDone);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return keys;
}

@Test
public void testMaxHistoryPerKey() throws Exception {
jsServer.run(nc -> {
Expand Down

0 comments on commit cb123c8

Please sign in to comment.