diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 0f8097be..2dfc9db5 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -493,16 +493,25 @@ export type NextMsgRequest = { */ next_by_subj: string; }; + export type DirectMsgRequest = | SeqMsgRequest | LastForMsgRequest - | NextMsgRequest; + | NextMsgRequest + | StartTimeMsgRequest; // FIXME: new options on the server + +export interface DirectBatchFilter { + multi_last: string[]; +} + export type DirectBatchOptions = { + multi_last: string[]; batch?: number; + + seq?: number; max_bytes?: number; - multi_last: string[]; up_to_seq?: number; up_to_time?: Date | string; }; @@ -803,6 +812,13 @@ export interface SeqMsgRequest { seq: number; } +export interface StartTimeMsgRequest { + /** + * Start time for the message + */ + start_time: Date | string; +} + // FIXME: remove number as it is deprecated export type MsgRequest = SeqMsgRequest | LastForMsgRequest | number; diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index af1a2345..0c57ac58 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -48,7 +48,7 @@ import type { ApiResponse, JetStreamAccountStats, } from "./jsapi_types.ts"; -import { DirectStreamAPIImpl } from "./jsm.ts"; +import { DirectStreamAPIImpl } from "./jsm_direct.ts"; import { JetStreamError } from "./jserrors.ts"; export function toJetStreamClient( diff --git a/jetstream/src/jsm.ts b/jetstream/src/jsm_direct.ts similarity index 100% rename from jetstream/src/jsm.ts rename to jetstream/src/jsm_direct.ts diff --git a/jetstream/tests/jsm_direct_test.ts b/jetstream/tests/jsm_direct_test.ts new file mode 100644 index 00000000..9610c4e3 --- /dev/null +++ b/jetstream/tests/jsm_direct_test.ts @@ -0,0 +1,385 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + assertArrayIncludes, + assertEquals, + assertExists, + assertRejects, +} from "jsr:@std/assert"; + +import { deferred, nuid } from "@nats-io/nats-core"; + +import { + jetstream, + JetStreamApiError, + jetstreamManager, + StorageType, +} from "../src/mod.ts"; +import { + cleanup, + jetstreamServerConf, + notCompatible, + setup, +} from "test_helpers"; + +import type { JetStreamManagerImpl } from "../src/jsclient.ts"; +import type { DirectBatchOptions } from "../src/jsapi_types.ts"; + +Deno.test("direct - msg decode", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const name = nuid.next(); + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + const js = jetstream(nc); + await jsm.streams.add({ name, subjects: [`a.>`], allow_direct: true }); + + await js.publish("a.a", "hello"); + await js.publish("a.a", JSON.stringify({ one: "two", a: [1, 2, 3] })); + + let m = await jsm.direct.getMessage(name, { seq: 1 }); + assertExists(m); + assertEquals(m.string(), "hello"); + + m = await jsm.direct.getMessage(name, { seq: 2 }); + assertExists(m); + assertEquals(m.json(), { + one: "two", + a: [1, 2, 3], + }); + + await cleanup(ns, nc); +}); + +Deno.test("direct - getMessage", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["foo", "bar"], + allow_direct: true, + }); + + const js = jetstream(nc); + await js.publish("foo", "a", { expect: { lastSequence: 0 } }); + await js.publish("foo", "b", { expect: { lastSequence: 1 } }); + await js.publish("foo", "c", { expect: { lastSequence: 2 } }); + await js.publish("bar", "d", { expect: { lastSequence: 3 } }); + await js.publish("foo", "e", { expect: { lastSequence: 4 } }); + + let m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "bar" }); + assertExists(m); + assertEquals(m.seq, 4); + + m = await jsm.direct.getMessage("A", { last_by_subj: "foo" }); + assertExists(m); + assertEquals(m.seq, 5); + + m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "foo" }); + assertExists(m); + assertEquals(m.seq, 1); + + m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "foo" }); + assertExists(m); + assertEquals(m.seq, 5); + + m = await jsm.direct.getMessage("A", { seq: 2, next_by_subj: "foo" }); + assertExists(m); + assertEquals(m.seq, 2); + + await cleanup(ns, nc); +}); + +Deno.test("direct - getMessage at start sequence", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["foo", "bar"], + allow_direct: true, + }); + + const js = jetstream(nc); + await Promise.all([ + js.publish("foo", "a", { expect: { lastSequence: 0 } }), + js.publish("foo", "b", { expect: { lastSequence: 1 } }), + ]); + + let m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "foo" }); + assertEquals(m.seq, 1); + + m = await jsm.direct.getMessage("A", { seq: 1, next_by_subj: "foo" }); + assertEquals(m.seq, 1); + + await cleanup(ns, nc); +}); + +Deno.test("direct - getMessage at next sequence", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["foo", "bar"], + allow_direct: true, + }); + + const js = jetstream(nc); + await Promise.all([ + js.publish("foo", "a"), + js.publish("bar", "x"), + js.publish("foo", "b"), + ]); + + const m = await jsm.direct.getMessage("A", { seq: 2, next_by_subj: "foo" }); + assertEquals(m.seq, 3); + + await assertRejects( + () => { + return jsm.direct.getMessage("A", { seq: 3, next_by_subj: "bar" }); + }, + JetStreamApiError, + "message not found", + ); + + await cleanup(ns, nc); +}); + +Deno.test("direct - getMessage starting time", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + const d = deferred(); + let i = 0; + const timer = setInterval(async () => { + await js.publish(`a.${i}`, new Uint8Array(128)); + i++; + if (i === 8) { + clearInterval(timer); + d.resolve(); + } + }, 250); + + await d; + + async function getDateFor(seq: number): Promise { + const m = await jsm.direct.getMessage("A", { seq: seq }); + return m.time; + } + + const start_time = new Date((await getDateFor(2)).getTime() + 10); + + const m = await jsm.direct.getMessage("A", { start_time }); + assertEquals(m.seq, 3); + + await cleanup(ns, nc); +}); + +Deno.test("direct - get batch", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + await Promise.all([ + js.publish("a.foo", "foo"), + js.publish("a.bar", "bar"), + js.publish("a.baz", "baz"), + js.publish("a.foobar", "foobar"), + ]); + + const iter = await jsm.direct.getBatch("A", { + batch: 3, + multi_last: [">"], + }); + + const buf = []; + for await (const m of iter) { + buf.push(m); + } + assertEquals(buf.length, 3); + + await cleanup(ns, nc); +}); + +Deno.test("direct - batch get multi_last", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + await Promise.all([ + js.publish("a.foo", "foo"), + js.publish("a.bar", "bar"), + js.publish("a.baz", "baz"), + ]); + + const iter = await jsm.direct.getBatch("A", { + multi_last: ["a.foo", "a.baz"], + }); + + const keys = []; + for await (const m of iter) { + keys.push(m.subject); + } + assertEquals(keys.length, 2); + assertArrayIncludes(keys, ["a.foo", "a.baz"]); + + await cleanup(ns, nc); +}); + +Deno.test("direct - batch options", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + const d = deferred(); + let i = 0; + const timer = setInterval(async () => { + await js.publish(`a.${i}`, new Uint8Array(128)); + i++; + if (i === 8) { + clearInterval(timer); + d.resolve(); + } + }, 250); + + await d; + + type tt = { + opts: DirectBatchOptions; + expect: number[]; + }; + + async function assertBatch(tc: tt): Promise { + const iter = await jsm.direct.getBatch("A", tc.opts); + const buf: number[] = []; + for await (const m of iter) { + buf.push(m.seq); + } + assertArrayIncludes(buf, tc.expect); + assertEquals(buf.length, tc.expect.length); + } + + async function getDateFor(seq: number): Promise { + const m = await jsm.direct.getMessage("A", { seq: seq }); + return m.time; + } + + await assertBatch({ + opts: { + //@ts-ignore:/ test + start_time: await getDateFor(3), + batch: 10, + multi_last: [">"], + }, + expect: [1, 2, 3, 4, 5, 6, 7, 8], + }); + + await assertBatch({ + opts: { + up_to_time: await getDateFor(3), + batch: 10, + multi_last: [">"], + }, + expect: [1, 2], + }); + + // max sequence + await assertBatch({ + opts: { + up_to_seq: 3, + multi_last: [">"], + }, + expect: [1, 2, 3], + }); + + // max sequence filtered by size + await assertBatch({ + opts: { + up_to_seq: 3, + max_bytes: 128, + multi_last: [">"], + }, + expect: [1], + }); + + // start sequence + await assertBatch({ + opts: { + seq: 3, + up_to_seq: 5, + multi_last: [">"], + }, + expect: [3, 4, 5], + }); + + await assertBatch({ + opts: { + seq: 3, + up_to_seq: 5, + multi_last: [">"], + }, + expect: [3, 4, 5], + }); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 923178d6..15b56915 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -1172,50 +1172,6 @@ Deno.test("jsm - stream update preserves other value", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - direct getMessage", async () => { - const { ns, nc } = await setup(jetstreamServerConf({})); - - if (await notCompatible(ns, nc, "2.9.0")) { - return; - } - - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["foo", "bar"], - allow_direct: true, - }); - - const js = jetstream(nc); - await js.publish("foo", "a", { expect: { lastSequence: 0 } }); - await js.publish("foo", "b", { expect: { lastSequence: 1 } }); - await js.publish("foo", "c", { expect: { lastSequence: 2 } }); - await js.publish("bar", "d", { expect: { lastSequence: 3 } }); - await js.publish("foo", "e", { expect: { lastSequence: 4 } }); - - let m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "bar" }); - assertExists(m); - assertEquals(m.seq, 4); - - m = await jsm.direct.getMessage("A", { last_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 5); - - m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 1); - - m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 5); - - m = await jsm.direct.getMessage("A", { seq: 2, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 2); - - await cleanup(ns, nc); -}); - Deno.test("jsm - consumer name", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -1931,30 +1887,6 @@ Deno.test("jsm - update from filter_subject to filter_subjects", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - direct msg decode", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const name = nuid.next(); - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - const js = jetstream(nc); - await jsm.streams.add({ name, subjects: [`a.>`], allow_direct: true }); - - await js.publish("a.a", "hello"); - await js.publish("a.a", JSON.stringify({ one: "two", a: [1, 2, 3] })); - - let m = await jsm.direct.getMessage(name, { seq: 1 }); - assertExists(m); - assertEquals(m.string(), "hello"); - - m = await jsm.direct.getMessage(name, { seq: 2 }); - assertExists(m); - assertEquals(m.json(), { - one: "two", - a: [1, 2, 3], - }); - - await cleanup(ns, nc); -}); - Deno.test("jsm - stored msg decode", async () => { const { ns, nc } = await setup(jetstreamServerConf()); const name = nuid.next(); @@ -2659,75 +2591,6 @@ Deno.test("jsm - pause/unpause", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - batch direct get multi_last", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - if (await notCompatible(ns, nc, "2.11.0")) { - return; - } - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["a.>"], - storage: StorageType.Memory, - allow_direct: true, - }); - - const js = jetstream(nc); - await Promise.all([ - js.publish("a.foo", "foo"), - js.publish("a.bar", "bar"), - js.publish("a.baz", "baz"), - ]); - - const iter = await jsm.direct.getBatch("A", { - multi_last: ["a.foo", "a.baz"], - }); - - const keys = []; - for await (const m of iter) { - keys.push(m.subject); - } - assertEquals(keys.length, 2); - assertArrayIncludes(keys, ["a.foo", "a.baz"]); - - await cleanup(ns, nc); -}); - -Deno.test("jsm - batch direct get batch", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - if (await notCompatible(ns, nc, "2.11.0")) { - return; - } - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["a.>"], - storage: StorageType.Memory, - allow_direct: true, - }); - - const js = jetstream(nc); - await Promise.all([ - js.publish("a.foo", "foo"), - js.publish("a.bar", "bar"), - js.publish("a.baz", "baz"), - js.publish("a.foobar", "foobar"), - ]); - - const iter = await jsm.direct.getBatch("A", { - batch: 3, - multi_last: [">"], - }); - - const buf = []; - for await (const m of iter) { - buf.push(m); - } - assertEquals(buf.length, 3); - - await cleanup(ns, nc); -}); - Deno.test("jsm - storage", async () => { const { ns, nc } = await setup(jetstreamServerConf());