Skip to content

Commit

Permalink
Refactor object and key-value store watch methods
Browse files Browse the repository at this point in the history
Update the `watch` methods in ObjectStore and KeyValue classes to use `ObjectWatchInfo` and `KvWatchEntry` types. Removed use of null signals, replaced with property `isUpdate` to indicate new entries. Added relevant tests to validate the changes.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Oct 16, 2024
1 parent ddbc6bf commit 5a357ef
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 37 deletions.
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# TODO

## BUGS

- ObjectStore should report the number of entries in it.
4 changes: 3 additions & 1 deletion kv/src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,10 +858,12 @@ export class Bucket implements KV, KvRemove {
let isUpdate = content === KvWatchInclude.UpdatesOnly || count === 0;

qi._data = oc;
let i = 0;
const iter = await oc.consume({
callback: (m) => {
if (!isUpdate) {
isUpdate = qi.received >= count;
i++;
isUpdate = i >= count;
}
const e = this.jmToWatchEntry(m, isUpdate);
if (ignoreDeletes && e.operation === "DEL") {
Expand Down
31 changes: 31 additions & 0 deletions kv/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2161,3 +2161,34 @@ Deno.test("kv - sourced", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - watch isUpdate", async () => {
const { ns, nc } = await _setup(
connect,
jetstreamServerConf({}),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

const js = jetstream(nc);
const kvm = await new Kvm(js);
const kv = await kvm.create("A");
await kv.put("a", "hello");
await kv.delete("a");

const iter = await kv.watch({ ignoreDeletes: true });
const done = (async () => {
for await (const e of iter) {
if (e.key === "b") {
assertEquals(e.isUpdate, true);
break;
}
}
})();
await kv.put("b", "hello");

await done;

await cleanup(ns, nc);
});
10 changes: 10 additions & 0 deletions migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,13 @@ const service = await svc.add({
// other manipulation as per service api...
```
### Watch

Object.watch() now returns an `ObjectWatchInfo` which is an `ObjectInfo` but adding the property
`isUpdate` this property is now true when the watch is notifying of a new entry. Note that previously
the iterator would yield `ObjectInfo | null`, the `null` signal has been removed. This means that
when doing a watch on an empty ObjectStore you won't get an update notification until an actual value
arrives.


76 changes: 41 additions & 35 deletions obj/src/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
* limitations under the License.
*/

import type {
MsgHdrs,
NatsConnection,
NatsError,
QueuedIterator,
} from "@nats-io/nats-core/internal";
import {
Base64UrlPaddedCodec,
DataBuffer,
Expand All @@ -26,23 +32,6 @@ import {
SHA256,
} from "@nats-io/nats-core/internal";

import type {
MsgHdrs,
NatsConnection,
NatsError,
QueuedIterator,
} from "@nats-io/nats-core/internal";

import {
DeliverPolicy,
DiscardPolicy,
JsHeaders,
ListerImpl,
PubHeaders,
StoreCompression,
toJetStreamClient,
} from "@nats-io/jetstream/internal";

import type {
ConsumerConfig,
JetStreamClient,
Expand All @@ -59,6 +48,15 @@ import type {
StreamInfoRequestOptions,
StreamListResponse,
} from "@nats-io/jetstream/internal";
import {
DeliverPolicy,
DiscardPolicy,
JsHeaders,
ListerImpl,
PubHeaders,
StoreCompression,
toJetStreamClient,
} from "@nats-io/jetstream/internal";

import type {
ObjectInfo,
Expand All @@ -69,6 +67,7 @@ import type {
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
ObjectWatchInfo,
} from "./types.ts";

export const osPrefix = "OBJ_";
Expand Down Expand Up @@ -340,13 +339,12 @@ export class ObjectStoreImpl implements ObjectStore {
const iter = await this.watch({
ignoreDeletes: true,
includeHistory: true,
//@ts-ignore: hidden
historyOnly: true,
});

// historyOnly will stop the iterator
for await (const info of iter) {
// watch will give a null when it has initialized
// for us that is the hint we are done
if (info === null) {
break;
}
buf.push(info);
}
return Promise.resolve(buf);
Expand Down Expand Up @@ -803,19 +801,17 @@ export class ObjectStoreImpl implements ObjectStore {
ignoreDeletes?: boolean;
includeHistory?: boolean;
}
> = {}): Promise<QueuedIterator<ObjectInfo | null>> {
> = {}): Promise<QueuedIterator<ObjectWatchInfo>> {
opts.includeHistory = opts.includeHistory ?? false;
opts.ignoreDeletes = opts.ignoreDeletes ?? false;
let initialized = false;
const qi = new QueuedIteratorImpl<ObjectInfo | null>();
// @ts-ignore: not exposed
const historyOnly = opts.historyOnly ?? false;
const qi = new QueuedIteratorImpl<ObjectWatchInfo>();
const subj = this._metaSubjectAll();
try {
await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj });
} catch (err) {
if ((err as NatsError).code === "404") {
qi.push(null);
initialized = true;
} else {
if ((err as NatsError).code !== "404") {
qi.stop(err as Error);
}
}
Expand All @@ -827,28 +823,38 @@ export class ObjectStoreImpl implements ObjectStore {
} else {
// FIXME: Go's implementation doesn't seem correct - if history is not desired
// the watch should only be giving notifications on new entries
initialized = true;
cc.deliver_policy = DeliverPolicy.New;
}

const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
const info = await oc.info(true);
const count = info.num_pending;
let isUpdate = cc.deliver_policy === DeliverPolicy.New || count === 0;
qi._data = oc;

let i = 0;
const iter = await oc.consume({
callback: (jm: JsMsg) => {
const oi = jm.json<ObjectInfo>();
if (!isUpdate) {
i++;
isUpdate = i >= count;
}
const oi = jm.json<ObjectWatchInfo>();
oi.isUpdate = isUpdate;
if (oi.deleted && opts.ignoreDeletes === true) {
// do nothing
} else {
qi.push(oi);
}
if (jm.info?.pending === 0 && !initialized) {
initialized = true;
qi.push(null);
if (historyOnly && i === count) {
iter.stop();
}
},
});

if (historyOnly && count === 0) {
iter.stop();
}

iter.closed().then(() => {
qi.push(() => {
qi.stop();
Expand Down
6 changes: 5 additions & 1 deletion obj/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ export type ObjectStoreMeta = {
metadata?: Record<string, string>;
};

export interface ObjectWatchInfo extends ObjectInfo {
isUpdate: boolean;
}

export interface ObjectInfo extends ObjectStoreMeta {
/**
* The name of the bucket where the object is stored.
Expand Down Expand Up @@ -316,7 +320,7 @@ export interface ObjectStore {
includeHistory?: boolean;
}
>,
): Promise<QueuedIterator<ObjectInfo | null>>;
): Promise<QueuedIterator<ObjectWatchInfo>>;

/**
* Seals the object store preventing any further modifications.
Expand Down
42 changes: 42 additions & 0 deletions obj/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,48 @@ Deno.test("objectstore - list", async () => {
await cleanup(ns, nc);
});

Deno.test("objectstore - list no updates", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const objm = new Objm(nc);
const os = await objm.create("test");

let infos = await os.list();
assertEquals(infos.length, 0);

await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0)));
infos = await os.list();
assertEquals(infos.length, 1);

await cleanup(ns, nc);
});

Deno.test("objectstore - watch isUpdate", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const objm = new Objm(nc);
const os = await objm.create("test");
await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0)));

const watches = await os.watch();
await os.put({ name: "b" }, readableStreamFrom(new Uint8Array(0)));

for await (const e of watches) {
if (e.name === "b") {
assertEquals(e.isUpdate, true);
break;
} else {
assertEquals(e.isUpdate, false);
}
}

await cleanup(ns, nc);
});

Deno.test("objectstore - watch initially empty", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
Expand Down

0 comments on commit 5a357ef

Please sign in to comment.