diff --git a/kv/src/internal_mod.ts b/kv/src/internal_mod.ts index eca34b36..76ab353a 100644 --- a/kv/src/internal_mod.ts +++ b/kv/src/internal_mod.ts @@ -8,6 +8,7 @@ export type { KvOptions, KvPutOptions, KvStatus, + KvWatchEntry, KvWatchOptions, RoKV, } from "./types.ts"; diff --git a/kv/src/kv.ts b/kv/src/kv.ts index 68794092..9f3d94b1 100644 --- a/kv/src/kv.ts +++ b/kv/src/kv.ts @@ -15,7 +15,6 @@ import { compare, - deferred, Empty, ErrorCode, Feature, @@ -81,6 +80,7 @@ import type { KvPutOptions, KvRemove, KvStatus, + KvWatchEntry, KvWatchOptions, } from "./types.ts"; @@ -555,9 +555,9 @@ export class Bucket implements KV, KvRemove { return new KvStoredEntryImpl(this.bucket, this.prefixLen, sm); } - jmToEntry(jm: JsMsg): KvEntry { + jmToWatchEntry(jm: JsMsg, isUpdate: boolean): KvWatchEntry { const key = this.decodeKey(jm.subject.substring(this.prefixLen)); - return new KvJsMsgEntryImpl(this.bucket, key, jm); + return new KvJsMsgEntryImpl(this.bucket, key, jm, isUpdate); } async create(k: string, data: Payload): Promise { @@ -667,22 +667,17 @@ export class Bucket implements KV, KvRemove { async purgeDeletes( olderMillis: number = 30 * 60 * 1000, ): Promise { - const done = deferred(); const buf: KvEntry[] = []; - const i = await this.watch({ + const i = await this.history({ key: ">", - initializedFn: () => { - done.resolve(); - }, }); - (async () => { + await (async () => { for await (const e of i) { if (e.operation === "DEL" || e.operation === "PURGE") { buf.push(e); } } })().then(); - await done; i.stop(); const min = Date.now() - olderMillis; const proms = buf.map((e) => { @@ -783,12 +778,12 @@ export class Bucket implements KV, KvRemove { async history( opts: { key?: string | string[]; headers_only?: boolean } = {}, - ): Promise> { + ): Promise> { const k = opts.key ?? ">"; const co = {} as ConsumerConfig; co.headers_only = opts.headers_only || false; - const qi = new QueuedIteratorImpl(); + const qi = new QueuedIteratorImpl(); const fn = () => { qi.stop(); }; @@ -797,7 +792,6 @@ export class Bucket implements KV, KvRemove { const oc = await this.js.consumers.getPushConsumer(this.stream, cc); qi._data = oc; const info = await oc.info(true); - if (info.num_pending === 0) { qi.push(fn); return qi; @@ -805,7 +799,7 @@ export class Bucket implements KV, KvRemove { const iter = await oc.consume({ callback: (m) => { - const e = this.jmToEntry(m); + const e = this.jmToWatchEntry(m, false); qi.push(e); qi.received++; if (m.info.pending === 0) { @@ -837,9 +831,9 @@ export class Bucket implements KV, KvRemove { async watch( opts: KvWatchOptions = {}, - ): Promise> { + ): Promise> { const k = opts.key ?? ">"; - const qi = new QueuedIteratorImpl(); + const qi = new QueuedIteratorImpl(); const co = {} as Partial; co.headers_only = opts.headers_only || false; @@ -851,8 +845,6 @@ export class Bucket implements KV, KvRemove { } const ignoreDeletes = opts.ignoreDeletes === true; - let fn = opts.initializedFn; - const cc = this._buildCC(k, content, co); cc.name = `KV_WATCHER_${nuid.next()}`; if (opts.resumeFromRevision && opts.resumeFromRevision > 0) { @@ -863,34 +855,20 @@ export class Bucket implements KV, KvRemove { const oc = await this.js.consumers.getPushConsumer(this.stream, cc); const info = await oc.info(true); const count = info.num_pending; - if (count === 0 && fn) { - try { - fn(); - } catch (_err) { - // ignoring - } finally { - fn = undefined; - } - } + let isUpdate = content === KvWatchInclude.UpdatesOnly || count === 0; qi._data = oc; const iter = await oc.consume({ callback: (m) => { - const e = this.jmToEntry(m); + if (!isUpdate) { + isUpdate = qi.received >= count; + } + const e = this.jmToWatchEntry(m, isUpdate); if (ignoreDeletes && e.operation === "DEL") { return; } qi.push(e); qi.received++; - - // count could have changed or has already been received - if ( - fn && (count > 0 && qi.received >= count || m.info.pending === 0) - ) { - //@ts-ignore: we are injecting an unexpected type - qi.push(fn); - fn = undefined; - } }, }); @@ -1106,15 +1084,17 @@ class KvStoredEntryImpl implements KvEntry { } } -class KvJsMsgEntryImpl implements KvEntry { +class KvJsMsgEntryImpl implements KvEntry, KvWatchEntry { bucket: string; key: string; sm: JsMsg; + update: boolean; - constructor(bucket: string, key: string, sm: JsMsg) { + constructor(bucket: string, key: string, sm: JsMsg, isUpdate: boolean) { this.bucket = bucket; this.key = key; this.sm = sm; + this.update = isUpdate; } get value(): Uint8Array { @@ -1145,6 +1125,10 @@ class KvJsMsgEntryImpl implements KvEntry { return this.sm.data.length; } + get isUpdate(): boolean { + return this.update; + } + json(): T { return this.sm.json(); } diff --git a/kv/src/mod.ts b/kv/src/mod.ts index ec1e81f9..15f07525 100644 --- a/kv/src/mod.ts +++ b/kv/src/mod.ts @@ -8,6 +8,7 @@ export type { KvOptions, KvPutOptions, KvStatus, + KvWatchEntry, KvWatchOptions, RoKV, } from "./internal_mod.ts"; diff --git a/kv/src/types.ts b/kv/src/types.ts index 107ec915..d963c6f6 100644 --- a/kv/src/types.ts +++ b/kv/src/types.ts @@ -44,6 +44,10 @@ export interface KvEntry { string(): string; } +export interface KvWatchEntry extends KvEntry { + isUpdate: boolean; +} + /** * An interface for encoding and decoding values * before they are stored or returned to the client. @@ -232,11 +236,6 @@ export type KvWatchOptions = { * Notification should only include entry headers */ headers_only?: boolean; - /** - * A callback that notifies when the watch has yielded all the initial values. - * Subsequent notifications are updates since the initial watch was established. - */ - initializedFn?: () => void; /** * Skips notifying deletes. * @default: false @@ -270,7 +269,9 @@ export interface RoKV { * Note you can specify multiple keys if running on server 2.10.x or better. * @param opts */ - history(opts?: { key?: string | string[] }): Promise>; + history( + opts?: { key?: string | string[] }, + ): Promise>; /** * Returns an iterator that will yield KvEntry updates as they happen. @@ -278,7 +279,7 @@ export interface RoKV { */ watch( opts?: KvWatchOptions, - ): Promise>; + ): Promise>; /** * @deprecated - this api is removed. diff --git a/kv/tests/kv_test.ts b/kv/tests/kv_test.ts index 7fd6a16f..ac429fec 100644 --- a/kv/tests/kv_test.ts +++ b/kv/tests/kv_test.ts @@ -52,7 +52,7 @@ import { assertThrows, } from "jsr:@std/assert"; -import type { KV, KvEntry, KvOptions } from "../src/types.ts"; +import type { KV, KvEntry, KvOptions, KvWatchEntry } from "../src/types.ts"; import type { Bucket } from "../src/mod.ts"; @@ -382,30 +382,20 @@ Deno.test("kv - history and watch cleanup", async () => { await bucket.put("b", Empty); await bucket.put("c", Empty); - let h = await bucket.history(); + const h = await bucket.history(); for await (const _e of h) { // aborted break; } - h = await bucket.history(); - for await (const _e of h) { - // nothing - } - - let w = await bucket.watch({}); - for await (const _ of w) { - // aborted - break; - } - - w = await bucket.watch({ - initializedFn: () => { - w.stop(); - }, - }); - for await (const _ of w) { - // nothing + const w = await bucket.watch({}); + setTimeout(() => { + bucket.put("hello", "world"); + }, 250); + for await (const e of w) { + if (e.isUpdate) { + break; + } } // need to give some time for promises to be resolved @@ -1176,38 +1166,17 @@ Deno.test("kv - initialized watch empty", async () => { const js = jetstream(nc); const b = await new Kvm(js).create("a") as Bucket; - const d = deferred(); - await b.watch({ - initializedFn: () => { - d.resolve(); - }, - }); - - await d; - await cleanup(ns, nc); -}); - -Deno.test("kv - initialized watch with messages", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - const js = jetstream(nc); - - const b = await new Kvm(js).create("a") as Bucket; - await b.put("A", Empty); - await b.put("B", Empty); - await b.put("C", Empty); - const d = deferred(); - const iter = await b.watch({ - initializedFn: () => { - d.resolve(); - }, - }); - - (async () => { + const iter = await b.watch(); + const done = (async () => { for await (const _e of iter) { - // ignore + // nothing } - })().then(); - await d; + })(); + + await delay(250); + assertEquals(0, iter.getReceived()); + iter.stop(); + await done; await cleanup(ns, nc); }); @@ -1220,28 +1189,28 @@ Deno.test("kv - initialized watch with modifications", async () => { await b.put("A", Empty); await b.put("B", Empty); await b.put("C", Empty); - const d = deferred(); + setTimeout(async () => { for (let i = 0; i < 100; i++) { await b.put(i.toString(), Empty); } }); - const iter = await b.watch({ - initializedFn: () => { - d.resolve(iter.getProcessed()); - }, - }); + const iter = await b.watch(); + + let history = 0; // we are expecting 103 const lock = Lock(103); (async () => { - for await (const _e of iter) { + for await (const e of iter) { + if (!e.isUpdate) { + history++; + } lock.unlock(); } })().then(); - const when = await d; // we don't really know when this happened - assert(103 > when); + assert(103 > history); await lock; //@ts-ignore: testing @@ -1253,35 +1222,6 @@ Deno.test("kv - initialized watch with modifications", async () => { await cleanup(ns, nc); }); -Deno.test("kv - watch init callback exceptions terminate the iterator", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - const js = jetstream(nc); - - const b = await new Kvm(js).create("a") as Bucket; - for (let i = 0; i < 10; i++) { - await b.put(i.toString(), Empty); - } - const iter = await b.watch({ - initializedFn: () => { - throw new Error("crash"); - }, - }); - - const d = deferred(); - try { - await (async () => { - for await (const _e of iter) { - // awaiting the iterator - } - })(); - } catch (err) { - d.resolve(err as Error); - } - const err = await d; - assertEquals(err.message, "crash"); - await cleanup(ns, nc); -}); - Deno.test("kv - get revision", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf({})); const js = jetstream(nc); @@ -1746,26 +1686,25 @@ Deno.test("kv - watch updates only", async () => { await kv.put("a", "a"); await kv.put("b", "b"); - const d = deferred(); const iter = await kv.watch({ include: KvWatchInclude.UpdatesOnly, - initializedFn: () => { - d.resolve(); - }, }); - const notifications: string[] = []; - (async () => { + const notifications: KvWatchEntry[] = []; + const done = (async () => { for await (const e of iter) { - notifications.push(e.key); + notifications.push(e); + if (e.isUpdate) { + break; + } } - })().then(); - await d; + })(); await kv.put("c", "c"); - await delay(1000); + await done; assertEquals(notifications.length, 1); - assertEquals(notifications[0], "c"); + assertEquals(notifications[0].isUpdate, true); + assertEquals(notifications[0].key, "c"); await cleanup(ns, nc); }); @@ -1780,21 +1719,19 @@ Deno.test("kv - watch multiple keys", async () => { await kv.put("b", "b"); await kv.put("c", "c"); - const d = deferred(); const iter = await kv.watch({ key: ["a", "c"], - initializedFn: () => { - d.resolve(); - }, }); const notifications: string[] = []; - (async () => { + await (async () => { for await (const e of iter) { notifications.push(e.key); + if (e.delta === 0) { + break; + } } - })().then(); - await d; + })(); assertEquals(notifications.length, 2); assertArrayIncludes(notifications, ["a", "c"]); diff --git a/migration.md b/migration.md index 813e46e4..404ff81e 100644 --- a/migration.md +++ b/migration.md @@ -123,6 +123,17 @@ await kvm.create("mykv"); await kvm.open("mykv"); ``` +### KvWatchOption.initializedFn has been removed + +Previous versions of `Kv.watch()` allowed the client to specify a function that +was called when the watch was done providing history values. In this version, +you can find out if a watch is yielding an update by examining into +`KvEntry.isUpdate`. Note that an empty Kv will not yield any watch information. +You can test for this initial condition, by getting the status of the KV, and +inspecting the `values` property, which will state the number of entries in the +Kv. Also note that watches with the option to do updates only, cannot notify +until there's an update. + ## Changes to ObjectStore > [!CAUTION]