Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] KvWatchEntry now reports if the entry is an update or not. #69

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kv/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type {
KvOptions,
KvPutOptions,
KvStatus,
KvWatchEntry,
KvWatchOptions,
RoKV,
} from "./types.ts";
Expand Down
62 changes: 23 additions & 39 deletions kv/src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import {
compare,
deferred,
Empty,
ErrorCode,
Feature,
Expand Down Expand Up @@ -81,6 +80,7 @@ import type {
KvPutOptions,
KvRemove,
KvStatus,
KvWatchEntry,
KvWatchOptions,
} from "./types.ts";

Expand Down Expand Up @@ -555,9 +555,9 @@ export class Bucket implements KV, KvRemove {
return new KvStoredEntryImpl(this.bucket, this.prefixLen, sm);
}

jmToEntry(jm: JsMsg): KvEntry {
jmToWatchEntry(jm: JsMsg, isUpdate: boolean): KvWatchEntry {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
return new KvJsMsgEntryImpl(this.bucket, key, jm);
return new KvJsMsgEntryImpl(this.bucket, key, jm, isUpdate);
}

async create(k: string, data: Payload): Promise<number> {
Expand Down Expand Up @@ -667,22 +667,17 @@ export class Bucket implements KV, KvRemove {
async purgeDeletes(
olderMillis: number = 30 * 60 * 1000,
): Promise<PurgeResponse> {
const done = deferred();
const buf: KvEntry[] = [];
const i = await this.watch({
const i = await this.history({
key: ">",
initializedFn: () => {
done.resolve();
},
});
(async () => {
await (async () => {
for await (const e of i) {
if (e.operation === "DEL" || e.operation === "PURGE") {
buf.push(e);
}
}
})().then();
await done;
i.stop();
const min = Date.now() - olderMillis;
const proms = buf.map((e) => {
Expand Down Expand Up @@ -783,12 +778,12 @@ export class Bucket implements KV, KvRemove {

async history(
opts: { key?: string | string[]; headers_only?: boolean } = {},
): Promise<QueuedIterator<KvEntry>> {
): Promise<QueuedIterator<KvWatchEntry>> {
const k = opts.key ?? ">";
const co = {} as ConsumerConfig;
co.headers_only = opts.headers_only || false;

const qi = new QueuedIteratorImpl<KvEntry>();
const qi = new QueuedIteratorImpl<KvWatchEntry>();
const fn = () => {
qi.stop();
};
Expand All @@ -797,15 +792,14 @@ export class Bucket implements KV, KvRemove {
const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
qi._data = oc;
const info = await oc.info(true);

if (info.num_pending === 0) {
qi.push(fn);
return qi;
}

const iter = await oc.consume({
callback: (m) => {
const e = this.jmToEntry(m);
const e = this.jmToWatchEntry(m, false);
qi.push(e);
qi.received++;
if (m.info.pending === 0) {
Expand Down Expand Up @@ -837,9 +831,9 @@ export class Bucket implements KV, KvRemove {

async watch(
opts: KvWatchOptions = {},
): Promise<QueuedIterator<KvEntry>> {
): Promise<QueuedIterator<KvWatchEntry>> {
const k = opts.key ?? ">";
const qi = new QueuedIteratorImpl<KvEntry>();
const qi = new QueuedIteratorImpl<KvWatchEntry>();
const co = {} as Partial<ConsumerConfig>;
co.headers_only = opts.headers_only || false;

Expand All @@ -851,8 +845,6 @@ export class Bucket implements KV, KvRemove {
}
const ignoreDeletes = opts.ignoreDeletes === true;

let fn = opts.initializedFn;

const cc = this._buildCC(k, content, co);
cc.name = `KV_WATCHER_${nuid.next()}`;
if (opts.resumeFromRevision && opts.resumeFromRevision > 0) {
Expand All @@ -863,34 +855,20 @@ export class Bucket implements KV, KvRemove {
const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
const info = await oc.info(true);
const count = info.num_pending;
if (count === 0 && fn) {
try {
fn();
} catch (_err) {
// ignoring
} finally {
fn = undefined;
}
}
let isUpdate = content === KvWatchInclude.UpdatesOnly || count === 0;

qi._data = oc;
const iter = await oc.consume({
callback: (m) => {
const e = this.jmToEntry(m);
if (!isUpdate) {
isUpdate = qi.received >= count;
}
const e = this.jmToWatchEntry(m, isUpdate);
if (ignoreDeletes && e.operation === "DEL") {
return;
}
qi.push(e);
qi.received++;

// count could have changed or has already been received
if (
fn && (count > 0 && qi.received >= count || m.info.pending === 0)
) {
//@ts-ignore: we are injecting an unexpected type
qi.push(fn);
fn = undefined;
}
},
});

Expand Down Expand Up @@ -1106,15 +1084,17 @@ class KvStoredEntryImpl implements KvEntry {
}
}

class KvJsMsgEntryImpl implements KvEntry {
class KvJsMsgEntryImpl implements KvEntry, KvWatchEntry {
bucket: string;
key: string;
sm: JsMsg;
update: boolean;

constructor(bucket: string, key: string, sm: JsMsg) {
constructor(bucket: string, key: string, sm: JsMsg, isUpdate: boolean) {
this.bucket = bucket;
this.key = key;
this.sm = sm;
this.update = isUpdate;
}

get value(): Uint8Array {
Expand Down Expand Up @@ -1145,6 +1125,10 @@ class KvJsMsgEntryImpl implements KvEntry {
return this.sm.data.length;
}

get isUpdate(): boolean {
return this.update;
}

json<T>(): T {
return this.sm.json();
}
Expand Down
1 change: 1 addition & 0 deletions kv/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type {
KvOptions,
KvPutOptions,
KvStatus,
KvWatchEntry,
KvWatchOptions,
RoKV,
} from "./internal_mod.ts";
Expand Down
15 changes: 8 additions & 7 deletions kv/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export interface KvEntry {
string(): string;
}

export interface KvWatchEntry extends KvEntry {
isUpdate: boolean;
}

/**
* An interface for encoding and decoding values
* before they are stored or returned to the client.
Expand Down Expand Up @@ -232,11 +236,6 @@ export type KvWatchOptions = {
* Notification should only include entry headers
*/
headers_only?: boolean;
/**
* A callback that notifies when the watch has yielded all the initial values.
* Subsequent notifications are updates since the initial watch was established.
*/
initializedFn?: () => void;
/**
* Skips notifying deletes.
* @default: false
Expand Down Expand Up @@ -270,15 +269,17 @@ export interface RoKV {
* Note you can specify multiple keys if running on server 2.10.x or better.
* @param opts
*/
history(opts?: { key?: string | string[] }): Promise<QueuedIterator<KvEntry>>;
history(
opts?: { key?: string | string[] },
): Promise<QueuedIterator<KvWatchEntry>>;

/**
* Returns an iterator that will yield KvEntry updates as they happen.
* @param opts
*/
watch(
opts?: KvWatchOptions,
): Promise<QueuedIterator<KvEntry>>;
): Promise<QueuedIterator<KvWatchEntry>>;

/**
* @deprecated - this api is removed.
Expand Down
Loading