From 29d09b5059217b6b626c71dc2d89797ab9892aba Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 6 Nov 2024 21:18:44 -0600 Subject: [PATCH] direct batch/last for tests Signed-off-by: Alberto Ricart --- jetstream/examples/util.ts | 4 +- jetstream/src/jsapi_types.ts | 72 ++--- jetstream/src/jsclient.ts | 2 +- jetstream/src/jserrors.ts | 4 + jetstream/src/{jsm.ts => jsm_direct.ts} | 121 ++++---- jetstream/src/types.ts | 26 +- jetstream/tests/jsm_direct_test.ts | 358 ++++++++++++++++++++++++ jetstream/tests/jsm_test.ts | 137 --------- migration.md | 1 + 9 files changed, 492 insertions(+), 233 deletions(-) rename jetstream/src/{jsm.ts => jsm_direct.ts} (72%) create mode 100644 jetstream/tests/jsm_direct_test.ts diff --git a/jetstream/examples/util.ts b/jetstream/examples/util.ts index c0820f99..a1ad1418 100644 --- a/jetstream/examples/util.ts +++ b/jetstream/examples/util.ts @@ -14,8 +14,8 @@ */ import { createConsumer, fill, initStream } from "../tests/jstest_util.ts"; -import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-31"; -import { nuid } from "jsr:@nats-io/nats-core@3.0.0-31"; +import type { NatsConnection } from "@nats-io/nats-core"; +import { nuid } from "@nats-io/nats-core"; export async function setupStreamAndConsumer( nc: NatsConnection, diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 0f8097be..4d35db41 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -479,33 +479,50 @@ export type StreamInfoRequestOptions = { */ "subjects_filter": string; } & ApiPagedRequest; + +export type MsgRequest = SeqMsgRequest | LastForMsgRequest; + /** - * Request the next stream message by sequence for the specified subject. + * Request the first message matching specified subject found on or after the specified optional sequence. */ -export type NextMsgRequest = { - /** - * The seq to start looking. If the message under the specified sequence - * matches, it will be returned. - */ - seq: number; - /** - * The subject to look for - */ - next_by_subj: string; -}; +export type NextMsgRequest = { seq?: number; next_by_subj: string }; + +/** + * Retrieves the last message with the given subject + */ +export type LastForMsgRequest = { "last_by_subj": string }; + +/** + * Stream sequence number of the message to retrieve + */ +export type SeqMsgRequest = { seq: number }; + +/** + * Start time for the message + */ +export type StartTimeMsgRequest = { start_time: Date | string }; + export type DirectMsgRequest = | SeqMsgRequest | LastForMsgRequest - | NextMsgRequest; + | NextMsgRequest + | StartTimeMsgRequest; -// FIXME: new options on the server -export type DirectBatchOptions = { +export type StartSeq = { seq?: number }; +export type StartTime = { start_time?: Date | string }; +export type DirectBatchLimits = { batch?: number; max_bytes?: number; +}; +export type DirectBatchStartSeq = StartSeq & DirectBatchLimits; +export type DirectBatchStartTime = StartTime & DirectBatchLimits; +export type DirectBatchOptions = DirectBatchStartSeq | DirectBatchStartTime; + +export type DirectLastFor = { multi_last: string[]; - up_to_seq?: number; up_to_time?: Date | string; -}; + up_to_seq?: number; +} & DirectBatchLimits; export interface StreamState { /** @@ -789,30 +806,13 @@ export interface Success { export type SuccessResponse = ApiResponse & Success; -export interface LastForMsgRequest { - /** - * Retrieves the last message for the given subject - */ - "last_by_subj": string; -} - -export interface SeqMsgRequest { - /** - * Stream sequence number of the message to retrieve - */ - seq: number; -} - -// FIXME: remove number as it is deprecated -export type MsgRequest = SeqMsgRequest | LastForMsgRequest | number; - -export interface MsgDeleteRequest extends SeqMsgRequest { +export type MsgDeleteRequest = SeqMsgRequest & { /** * Default will securely remove a message and rewrite the data with random data, * set this to true to only remove the message */ "no_erase"?: boolean; -} +}; export interface AccountLimits { /** diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index 4f6c0ab4..611813e8 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -53,8 +53,8 @@ import type { ApiResponse, JetStreamAccountStats, } from "./jsapi_types.ts"; -import { DirectStreamAPIImpl } from "./jsm.ts"; import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts"; +import { DirectStreamAPIImpl } from "./jsm_direct.ts"; export function toJetStreamClient( nc: NatsConnection | JetStreamClient, diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts index f7da88f5..343376a2 100644 --- a/jetstream/src/jserrors.ts +++ b/jetstream/src/jserrors.ts @@ -172,6 +172,10 @@ export class JetStreamStatus { isMessageNotFound(): boolean { return this.code === 404 && this.description === "message not found"; } + + isEndOfBatch(): boolean { + return this.code === 204 && this.description === "eob"; + } } export enum JetStreamApiCodes { diff --git a/jetstream/src/jsm.ts b/jetstream/src/jsm_direct.ts similarity index 72% rename from jetstream/src/jsm.ts rename to jetstream/src/jsm_direct.ts index 6d9531d2..4613a1d4 100644 --- a/jetstream/src/jsm.ts +++ b/jetstream/src/jsm_direct.ts @@ -30,14 +30,14 @@ import type { ReviverFn, } from "@nats-io/nats-core"; import { + createInbox, Empty, - errors, QueuedIteratorImpl, - RequestStrategy, TD, } from "@nats-io/nats-core/internal"; import type { DirectBatchOptions, + DirectLastFor, DirectMsgRequest, LastForMsgRequest, } from "./jsapi_types.ts"; @@ -101,71 +101,75 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl return Promise.resolve(dm); } - async getBatch( + getBatch( stream: string, opts: DirectBatchOptions, + ): Promise> { + opts.batch = opts.batch || 1024; + return this.get(stream, opts); + } + + getLastMessagesFor( + stream: string, + opts: DirectLastFor, + ): Promise> { + return this.get(stream, opts); + } + + get( + stream: string, + opts: Record, ): Promise> { validateStreamName(stream); - const pre = this.opts.apiPrefix || "$JS.API"; - const subj = `${pre}.DIRECT.GET.${stream}`; - if (!Array.isArray(opts.multi_last) || opts.multi_last.length === 0) { - return Promise.reject( - errors.InvalidArgumentError.format("multi_last", "is required"), - ); - } - const payload = JSON.stringify(opts, (key, value) => { - if (key === "up_to_time" && value instanceof Date) { - return value.toISOString(); - } - return value; - }); const iter = new QueuedIteratorImpl(); + const inbox = createInbox(this.nc.options.inboxPrefix); + let batchSupported = false; + const sub = this.nc.subscribe(inbox, { + timeout: 10_000, + callback: (err, msg) => { + if (err) { + iter.push(() => { + iter.stop(err); + }); + sub.unsubscribe(); + return; + } + const status = JetStreamStatus.maybeParseStatus(msg); + if (status) { + iter.push(() => { + status.isEndOfBatch() ? iter.stop() : iter.stop(status.toError()); + }); + return; + } + if (!batchSupported) { + if (typeof msg.headers?.get("Nats-Num-Pending") !== "string") { + // no batch/max_bytes option was provided, so single response + sub.unsubscribe(); + iter.push(() => { + iter.stop(); + }); + } else { + batchSupported = true; + } + } - const raw = await this.nc.requestMany( - subj, - payload, - { - strategy: RequestStrategy.SentinelMsg, + iter.push(new DirectMsgImpl(msg)); }, - ); + }); - (async () => { - let gotFirst = false; - let badServer = false; - let status: JetStreamStatus | null = null; - for await (const m of raw) { - if (!gotFirst) { - gotFirst = true; - status = JetStreamStatus.maybeParseStatus(m); - if (status) { - break; - } - // inspect the message and make sure that we have a supported server - const v = m.headers?.get("Nats-Num-Pending"); - if (v === "") { - badServer = true; - break; - } - } - if (m.data.length === 0) { - break; - } - iter.push(new DirectMsgImpl(m)); + const pre = this.opts.apiPrefix || "$JS.API"; + const subj = `${pre}.DIRECT.GET.${stream}`; + + const payload = JSON.stringify(opts, (key, value) => { + if ( + (key === "up_to_time" || key === "start_time") && value instanceof Date + ) { + return value.toISOString(); } - //@ts-ignore: term function - iter.push((): void => { - if (badServer) { - throw new Error("batch direct get not supported by the server"); - } - if (status) { - throw status.toError(); - } - iter.stop(); - }); - })().catch((err) => { - iter.stop(err); + return value; }); + this.nc.publish(subj, payload, { reply: inbox }); return Promise.resolve(iter); } @@ -205,6 +209,11 @@ export class DirectMsgImpl implements DirectMsg { return this.header.last(DirectMsgHeaders.Stream); } + get lastSequence(): number { + const v = this.header.last(DirectMsgHeaders.LastSequence); + return typeof v === "string" ? parseInt(v) : 0; + } + json(reviver?: ReviverFn): T { return JSON.parse(new TextDecoder().decode(this.data), reviver); } diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 957cf574..95e15ed7 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -21,7 +21,11 @@ import type { ReviverFn, } from "@nats-io/nats-core/internal"; -import type { DeliverPolicy, ReplayPolicy } from "./jsapi_types.ts"; +import type { + DeliverPolicy, + DirectLastFor, + ReplayPolicy, +} from "./jsapi_types.ts"; import type { ConsumerConfig, @@ -800,6 +804,20 @@ export interface DirectStreamAPI { stream: string, opts: DirectBatchOptions, ): Promise>; + + /** + * Retrieves the last message for each subject in the filter. + * If no filter is specified, a maximum of 1024 subjects are returned. + * Care should be given on the specified filters to ensure that + * the results match what the client is expecting and to avoid missing + * expected data. + * @param stream + * @param opts + */ + getLastMessagesFor( + stream: string, + opts: DirectLastFor, + ): Promise>; } /** @@ -851,6 +869,11 @@ export interface DirectMsg extends StoredMsg { * The name of the Stream storing message */ stream: string; + + /** + * Previous sequence delivered to the client + */ + lastSequence: number; } /** @@ -966,6 +989,7 @@ export enum DirectMsgHeaders { Sequence = "Nats-Sequence", TimeStamp = "Nats-Time-Stamp", Subject = "Nats-Subject", + LastSequence = "Nats-Last-Sequence", } export enum RepublishHeaders { diff --git a/jetstream/tests/jsm_direct_test.ts b/jetstream/tests/jsm_direct_test.ts new file mode 100644 index 00000000..f6eda460 --- /dev/null +++ b/jetstream/tests/jsm_direct_test.ts @@ -0,0 +1,358 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + assertArrayIncludes, + assertEquals, + assertRejects, +} from "jsr:@std/assert"; + +import { deferred, delay } from "@nats-io/nats-core"; + +import { + jetstream, + JetStreamError, + jetstreamManager, + StorageType, +} from "../src/mod.ts"; +import { + cleanup, + jetstreamServerConf, + notCompatible, + setup, +} from "test_helpers"; + +import type { JetStreamManagerImpl } from "../src/jsclient.ts"; +import type { DirectBatchOptions, DirectLastFor } from "../src/jsapi_types.ts"; +import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; + +Deno.test("direct - decoder", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a", "b"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + await js.publish("a", "hello world"); + await js.publish("b", JSON.stringify({ hello: "world" })); + + await t.step("string", async () => { + const m = await jsm.direct.getMessage("A", { seq: 1 }); + assertEquals(m.string(), "hello world"); + }); + + await t.step("json", async () => { + const m = await jsm.direct.getMessage("A", { seq: 2 }); + assertEquals(m.json(), { hello: "world" }); + }); + + await cleanup(ns, nc); +}); + +Deno.test("direct - get", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.9.0")) { + return; + } + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>", "b.>", "z.a"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + const d = deferred(); + let i = 0; + const timer = setInterval(async () => { + i++; + await js.publish(`a.${i}`, ""); + await delay(50); + await js.publish(`b.${i}`, ""); + await delay(50); + await js.publish(`z.a`, new Uint8Array(15)); + if (i === 8) { + clearInterval(timer); + d.resolve(); + } + }, 250); + await d; + + await assertRejects( + () => { + return jsm.direct.getMessage("A", { seq: 0 }); + }, + JetStreamError, + "empty request", + ); + + await t.step("seq", async () => { + const m = await jsm.direct.getMessage("A", { seq: 1 }); + assertEquals(m.seq, 1); + assertEquals(m.subject, "a.1"); + }); + + await t.step("first with subject", async () => { + const m = await jsm.direct.getMessage("A", { next_by_subj: "z.a" }); + assertEquals(m.seq, 3); + }); + + await t.step("next with subject from sequence", async () => { + const m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "z.a" }); + assertEquals(m.seq, 6); + }); + + await t.step("second with subject", async () => { + const m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "z.a" }); + assertEquals(m.seq, 6); + assertEquals(m.subject, "z.a"); + }); + + await t.step("start_time", async () => { + const start_time = (await jsm.direct.getMessage("A", { seq: 3 })).time; + const m = await jsm.direct.getMessage("A", { start_time }); + assertEquals(m.seq, 3); + assertEquals(m.subject, "z.a"); + }); + + await t.step("last_by_subject", async () => { + const m = await jsm.direct.getMessage("A", { last_by_subj: "z.a" }); + assertEquals(m.seq, 24); + assertEquals(m.subject, "z.a"); + }); + + await cleanup(ns, nc); +}); + +Deno.test("direct - batch", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const nci = nc as NatsConnectionImpl; + const jsm = await jetstreamManager(nci) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + const d = deferred(); + let i = 0; + const timer = setInterval(async () => { + i++; + await js.publish(`a.a`, new Uint8Array(i)); + if (i === 8) { + clearInterval(timer); + d.resolve(); + } + }, 250); + await d; + + type tt = { + opts: DirectBatchOptions; + expect: number[]; + }; + + async function assertBatch(tc: tt, debug = false): Promise { + if (debug) { + nci.options.debug = true; + } + const iter = await jsm.direct.getBatch("A", tc.opts); + const buf: number[] = []; + for await (const m of iter) { + buf.push(m.seq); + } + if (debug) { + nci.options.debug = false; + } + assertArrayIncludes(buf, tc.expect); + assertEquals(buf.length, tc.expect.length); + } + + async function getDateFor(seq: number): Promise { + const m = await jsm.direct.getMessage("A", { seq: seq }); + assertEquals(m.seq, seq); + return m.time; + } + + await t.step("fails without any option in addition to batch", async () => { + await assertRejects( + () => { + return assertBatch({ + opts: { + batch: 3, + }, + expect: [], + }); + }, + JetStreamError, + "empty request", + ); + }); + + await t.step("start sequence", () => { + return assertBatch({ + //@ts-ignore: test + opts: { + batch: 3, + seq: 3, + }, + expect: [3, 4, 5], + }); + }); + + await t.step("start sequence are mutually exclusive start_time", async () => { + const start_time = await getDateFor(3); + await assertRejects( + () => { + return assertBatch({ + //@ts-ignore: test + opts: { + seq: 100, + start_time, + }, + expect: [3, 4, 5], + }); + }, + JetStreamError, + "bad request", + ); + }); + + await t.step("start_time", async () => { + const start_time = await getDateFor(3); + await assertBatch({ + //@ts-ignore: test + opts: { + start_time, + batch: 10, + }, + expect: [3, 4, 5, 6, 7, 8], + }); + }); + + await t.step("max_bytes", async () => { + await assertBatch({ + //@ts-ignore: test + opts: { + seq: 1, + max_bytes: 4, + }, + expect: [1], + }); + }); + await cleanup(ns, nc); +}); + +Deno.test("direct - last message for", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const nci = nc as NatsConnectionImpl; + const jsm = await jetstreamManager(nci) as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a", "b", "z"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = jetstream(nc); + await Promise.all([ + js.publish("a", "1"), + js.publish("a", "2"), + js.publish("a", "last a"), + js.publish("b", "1"), + js.publish("b", "last b"), + js.publish("z", "last z"), + ]); + + type tt = { + opts: DirectLastFor; + expect: number[]; + }; + + async function assertBatch(tc: tt, debug = false): Promise { + if (debug) { + nci.options.debug = true; + } + const iter = await jsm.direct.getLastMessagesFor("A", tc.opts); + const buf: number[] = []; + for await (const m of iter) { + buf.push(m.seq); + } + if (debug) { + nci.options.debug = false; + } + assertArrayIncludes(buf, tc.expect); + assertEquals(buf.length, tc.expect.length); + } + + async function getDateFor(seq: number): Promise { + const m = await jsm.direct.getMessage("A", { seq: seq }); + assertEquals(m.seq, seq); + return m.time; + } + + await t.step("not matched filter", async () => { + await assertRejects( + async () => { + await assertBatch({ opts: { multi_last: ["c"] }, expect: [] }); + }, + JetStreamError, + "no results", + ); + }); + + await t.step("single filter", async () => { + await assertBatch({ opts: { multi_last: ["a"] }, expect: [3] }); + }); + + await t.step("multiple filter", async () => { + await assertBatch({ + opts: { multi_last: ["a", "b", "z"] }, + expect: [3, 5, 6], + }); + }); + + // FIXME: expected [1, 2, 3] + await t.step("up_to_time", async () => { + const up_to_time = await getDateFor(4); + await assertBatch( + { opts: { up_to_time, multi_last: ["a", "b", "z"] }, expect: [3, 5, 6] }, + ); + }); + + // FIXME: unexpected - [1, 2] + await t.step("up_to_seq", async () => { + await assertBatch( + { opts: { up_to_seq: 2, multi_last: ["a", "b", "z"] }, expect: [2] }, + ); + }); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 923178d6..15b56915 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -1172,50 +1172,6 @@ Deno.test("jsm - stream update preserves other value", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - direct getMessage", async () => { - const { ns, nc } = await setup(jetstreamServerConf({})); - - if (await notCompatible(ns, nc, "2.9.0")) { - return; - } - - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["foo", "bar"], - allow_direct: true, - }); - - const js = jetstream(nc); - await js.publish("foo", "a", { expect: { lastSequence: 0 } }); - await js.publish("foo", "b", { expect: { lastSequence: 1 } }); - await js.publish("foo", "c", { expect: { lastSequence: 2 } }); - await js.publish("bar", "d", { expect: { lastSequence: 3 } }); - await js.publish("foo", "e", { expect: { lastSequence: 4 } }); - - let m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "bar" }); - assertExists(m); - assertEquals(m.seq, 4); - - m = await jsm.direct.getMessage("A", { last_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 5); - - m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 1); - - m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 5); - - m = await jsm.direct.getMessage("A", { seq: 2, next_by_subj: "foo" }); - assertExists(m); - assertEquals(m.seq, 2); - - await cleanup(ns, nc); -}); - Deno.test("jsm - consumer name", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -1931,30 +1887,6 @@ Deno.test("jsm - update from filter_subject to filter_subjects", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - direct msg decode", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const name = nuid.next(); - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - const js = jetstream(nc); - await jsm.streams.add({ name, subjects: [`a.>`], allow_direct: true }); - - await js.publish("a.a", "hello"); - await js.publish("a.a", JSON.stringify({ one: "two", a: [1, 2, 3] })); - - let m = await jsm.direct.getMessage(name, { seq: 1 }); - assertExists(m); - assertEquals(m.string(), "hello"); - - m = await jsm.direct.getMessage(name, { seq: 2 }); - assertExists(m); - assertEquals(m.json(), { - one: "two", - a: [1, 2, 3], - }); - - await cleanup(ns, nc); -}); - Deno.test("jsm - stored msg decode", async () => { const { ns, nc } = await setup(jetstreamServerConf()); const name = nuid.next(); @@ -2659,75 +2591,6 @@ Deno.test("jsm - pause/unpause", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - batch direct get multi_last", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - if (await notCompatible(ns, nc, "2.11.0")) { - return; - } - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["a.>"], - storage: StorageType.Memory, - allow_direct: true, - }); - - const js = jetstream(nc); - await Promise.all([ - js.publish("a.foo", "foo"), - js.publish("a.bar", "bar"), - js.publish("a.baz", "baz"), - ]); - - const iter = await jsm.direct.getBatch("A", { - multi_last: ["a.foo", "a.baz"], - }); - - const keys = []; - for await (const m of iter) { - keys.push(m.subject); - } - assertEquals(keys.length, 2); - assertArrayIncludes(keys, ["a.foo", "a.baz"]); - - await cleanup(ns, nc); -}); - -Deno.test("jsm - batch direct get batch", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - if (await notCompatible(ns, nc, "2.11.0")) { - return; - } - const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; - await jsm.streams.add({ - name: "A", - subjects: ["a.>"], - storage: StorageType.Memory, - allow_direct: true, - }); - - const js = jetstream(nc); - await Promise.all([ - js.publish("a.foo", "foo"), - js.publish("a.bar", "bar"), - js.publish("a.baz", "baz"), - js.publish("a.foobar", "foobar"), - ]); - - const iter = await jsm.direct.getBatch("A", { - batch: 3, - multi_last: [">"], - }); - - const buf = []; - for await (const m of iter) { - buf.push(m); - } - assertEquals(buf.length, 3); - - await cleanup(ns, nc); -}); - Deno.test("jsm - storage", async () => { const { ns, nc } = await setup(jetstreamServerConf()); diff --git a/migration.md b/migration.md index 2493446f..5d0bb451 100644 --- a/migration.md +++ b/migration.md @@ -120,6 +120,7 @@ To use JetStream, you must install and import `@nats/jetstream`. `StreamNotFound`, `JetStreamNotEnabled` are subtypes of the above. For API calls where the server could return an error, these are `JetStreamAPIError` and contain all the information returned by the server. +- MsgRequest for Stream#getMessage() removed deprecated number argument. ## Changes to KV