From 5a357ef128bd65efd7419eea8b9035b2405b4f99 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 15 Oct 2024 18:06:10 -0500 Subject: [PATCH] Refactor object and key-value store watch methods Update the `watch` methods in ObjectStore and KeyValue classes to use `ObjectWatchInfo` and `KvWatchEntry` types. Removed use of null signals, replaced with property `isUpdate` to indicate new entries. Added relevant tests to validate the changes. Signed-off-by: Alberto Ricart --- TODO.md | 2 + kv/src/kv.ts | 4 +- kv/tests/kv_test.ts | 31 ++++++++++++++ migration.md | 10 +++++ obj/src/objectstore.ts | 76 +++++++++++++++++++---------------- obj/src/types.ts | 6 ++- obj/tests/objectstore_test.ts | 42 +++++++++++++++++++ 7 files changed, 134 insertions(+), 37 deletions(-) diff --git a/TODO.md b/TODO.md index 99031196..554f1b93 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,5 @@ # TODO ## BUGS + +- ObjectStore should report the number of entries in it. diff --git a/kv/src/kv.ts b/kv/src/kv.ts index 9f3d94b1..4f1839d2 100644 --- a/kv/src/kv.ts +++ b/kv/src/kv.ts @@ -858,10 +858,12 @@ export class Bucket implements KV, KvRemove { let isUpdate = content === KvWatchInclude.UpdatesOnly || count === 0; qi._data = oc; + let i = 0; const iter = await oc.consume({ callback: (m) => { if (!isUpdate) { - isUpdate = qi.received >= count; + i++; + isUpdate = i >= count; } const e = this.jmToWatchEntry(m, isUpdate); if (ignoreDeletes && e.operation === "DEL") { diff --git a/kv/tests/kv_test.ts b/kv/tests/kv_test.ts index 8d0625d3..4e2a92d4 100644 --- a/kv/tests/kv_test.ts +++ b/kv/tests/kv_test.ts @@ -2161,3 +2161,34 @@ Deno.test("kv - sourced", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - watch isUpdate", async () => { + const { ns, nc } = await _setup( + connect, + jetstreamServerConf({}), + ); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + + const js = jetstream(nc); + const kvm = await new Kvm(js); + const kv = await kvm.create("A"); + await kv.put("a", "hello"); + await kv.delete("a"); + + const iter = await kv.watch({ ignoreDeletes: true }); + const done = (async () => { + for await (const e of iter) { + if (e.key === "b") { + assertEquals(e.isUpdate, true); + break; + } + } + })(); + await kv.put("b", "hello"); + + await done; + + await cleanup(ns, nc); +}); diff --git a/migration.md b/migration.md index 404ff81e..9c3dcb74 100644 --- a/migration.md +++ b/migration.md @@ -168,3 +168,13 @@ const service = await svc.add({ // other manipulation as per service api... ``` + +### Watch + +Object.watch() now returns an `ObjectWatchInfo` which is an `ObjectInfo` but adding the property +`isUpdate` this property is now true when the watch is notifying of a new entry. Note that previously +the iterator would yield `ObjectInfo | null`, the `null` signal has been removed. This means that +when doing a watch on an empty ObjectStore you won't get an update notification until an actual value +arrives. + + diff --git a/obj/src/objectstore.ts b/obj/src/objectstore.ts index bfaaf749..f3d97729 100644 --- a/obj/src/objectstore.ts +++ b/obj/src/objectstore.ts @@ -13,6 +13,12 @@ * limitations under the License. */ +import type { + MsgHdrs, + NatsConnection, + NatsError, + QueuedIterator, +} from "@nats-io/nats-core/internal"; import { Base64UrlPaddedCodec, DataBuffer, @@ -26,23 +32,6 @@ import { SHA256, } from "@nats-io/nats-core/internal"; -import type { - MsgHdrs, - NatsConnection, - NatsError, - QueuedIterator, -} from "@nats-io/nats-core/internal"; - -import { - DeliverPolicy, - DiscardPolicy, - JsHeaders, - ListerImpl, - PubHeaders, - StoreCompression, - toJetStreamClient, -} from "@nats-io/jetstream/internal"; - import type { ConsumerConfig, JetStreamClient, @@ -59,6 +48,15 @@ import type { StreamInfoRequestOptions, StreamListResponse, } from "@nats-io/jetstream/internal"; +import { + DeliverPolicy, + DiscardPolicy, + JsHeaders, + ListerImpl, + PubHeaders, + StoreCompression, + toJetStreamClient, +} from "@nats-io/jetstream/internal"; import type { ObjectInfo, @@ -69,6 +67,7 @@ import type { ObjectStoreOptions, ObjectStorePutOpts, ObjectStoreStatus, + ObjectWatchInfo, } from "./types.ts"; export const osPrefix = "OBJ_"; @@ -340,13 +339,12 @@ export class ObjectStoreImpl implements ObjectStore { const iter = await this.watch({ ignoreDeletes: true, includeHistory: true, + //@ts-ignore: hidden + historyOnly: true, }); + + // historyOnly will stop the iterator for await (const info of iter) { - // watch will give a null when it has initialized - // for us that is the hint we are done - if (info === null) { - break; - } buf.push(info); } return Promise.resolve(buf); @@ -803,19 +801,17 @@ export class ObjectStoreImpl implements ObjectStore { ignoreDeletes?: boolean; includeHistory?: boolean; } - > = {}): Promise> { + > = {}): Promise> { opts.includeHistory = opts.includeHistory ?? false; opts.ignoreDeletes = opts.ignoreDeletes ?? false; - let initialized = false; - const qi = new QueuedIteratorImpl(); + // @ts-ignore: not exposed + const historyOnly = opts.historyOnly ?? false; + const qi = new QueuedIteratorImpl(); const subj = this._metaSubjectAll(); try { await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj }); } catch (err) { - if ((err as NatsError).code === "404") { - qi.push(null); - initialized = true; - } else { + if ((err as NatsError).code !== "404") { qi.stop(err as Error); } } @@ -827,28 +823,38 @@ export class ObjectStoreImpl implements ObjectStore { } else { // FIXME: Go's implementation doesn't seem correct - if history is not desired // the watch should only be giving notifications on new entries - initialized = true; cc.deliver_policy = DeliverPolicy.New; } const oc = await this.js.consumers.getPushConsumer(this.stream, cc); + const info = await oc.info(true); + const count = info.num_pending; + let isUpdate = cc.deliver_policy === DeliverPolicy.New || count === 0; qi._data = oc; - + let i = 0; const iter = await oc.consume({ callback: (jm: JsMsg) => { - const oi = jm.json(); + if (!isUpdate) { + i++; + isUpdate = i >= count; + } + const oi = jm.json(); + oi.isUpdate = isUpdate; if (oi.deleted && opts.ignoreDeletes === true) { // do nothing } else { qi.push(oi); } - if (jm.info?.pending === 0 && !initialized) { - initialized = true; - qi.push(null); + if (historyOnly && i === count) { + iter.stop(); } }, }); + if (historyOnly && count === 0) { + iter.stop(); + } + iter.closed().then(() => { qi.push(() => { qi.stop(); diff --git a/obj/src/types.ts b/obj/src/types.ts index 4002d8c3..440e99f6 100644 --- a/obj/src/types.ts +++ b/obj/src/types.ts @@ -56,6 +56,10 @@ export type ObjectStoreMeta = { metadata?: Record; }; +export interface ObjectWatchInfo extends ObjectInfo { + isUpdate: boolean; +} + export interface ObjectInfo extends ObjectStoreMeta { /** * The name of the bucket where the object is stored. @@ -316,7 +320,7 @@ export interface ObjectStore { includeHistory?: boolean; } >, - ): Promise>; + ): Promise>; /** * Seals the object store preventing any further modifications. diff --git a/obj/tests/objectstore_test.ts b/obj/tests/objectstore_test.ts index d89980e2..bd00750f 100644 --- a/obj/tests/objectstore_test.ts +++ b/obj/tests/objectstore_test.ts @@ -414,6 +414,48 @@ Deno.test("objectstore - list", async () => { await cleanup(ns, nc); }); +Deno.test("objectstore - list no updates", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const objm = new Objm(nc); + const os = await objm.create("test"); + + let infos = await os.list(); + assertEquals(infos.length, 0); + + await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0))); + infos = await os.list(); + assertEquals(infos.length, 1); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - watch isUpdate", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const objm = new Objm(nc); + const os = await objm.create("test"); + await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0))); + + const watches = await os.watch(); + await os.put({ name: "b" }, readableStreamFrom(new Uint8Array(0))); + + for await (const e of watches) { + if (e.name === "b") { + assertEquals(e.isUpdate, true); + break; + } else { + assertEquals(e.isUpdate, false); + } + } + + await cleanup(ns, nc); +}); + Deno.test("objectstore - watch initially empty", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf({})); if (await notCompatible(ns, nc, "2.6.3")) {