Skip to content

Commit

Permalink
direct batch/last for tests
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 7, 2024
1 parent 54c8d78 commit 1f92f34
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 233 deletions.
4 changes: 2 additions & 2 deletions jetstream/examples/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/

import { createConsumer, fill, initStream } from "../tests/jstest_util.ts";
import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-31";
import { nuid } from "jsr:@nats-io/nats-core@3.0.0-31";
import type { NatsConnection } from "@nats-io/nats-core";
import { nuid } from "@nats-io/nats-core";

export async function setupStreamAndConsumer(
nc: NatsConnection,
Expand Down
72 changes: 36 additions & 36 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,33 +479,50 @@ export type StreamInfoRequestOptions = {
*/
"subjects_filter": string;
} & ApiPagedRequest;

export type MsgRequest = SeqMsgRequest | LastForMsgRequest;

/**
* Request the next stream message by sequence for the specified subject.
* Request the first message matching specified subject found on or after the specified optional sequence.
*/
export type NextMsgRequest = {
/**
* The seq to start looking. If the message under the specified sequence
* matches, it will be returned.
*/
seq: number;
/**
* The subject to look for
*/
next_by_subj: string;
};
export type NextMsgRequest = { seq?: number; next_by_subj: string };

/**
* Retrieves the last message with the given subject
*/
export type LastForMsgRequest = { "last_by_subj": string };

/**
* Stream sequence number of the message to retrieve
*/
export type SeqMsgRequest = { seq: number };

/**
* Start time for the message
*/
export type StartTimeMsgRequest = { start_time: Date | string };

export type DirectMsgRequest =
| SeqMsgRequest
| LastForMsgRequest
| NextMsgRequest;
| NextMsgRequest
| StartTimeMsgRequest;

// FIXME: new options on the server
export type DirectBatchOptions = {
export type StartSeq = { seq?: number };
export type StartTime = { start_time?: Date | string };
export type DirectBatchLimits = {
batch?: number;
max_bytes?: number;
};
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
export type DirectBatchOptions = DirectBatchStartSeq | DirectBatchStartTime;

export type DirectLastFor = {
multi_last: string[];
up_to_seq?: number;
up_to_time?: Date | string;
};
up_to_seq?: number;
} & DirectBatchLimits;

export interface StreamState {
/**
Expand Down Expand Up @@ -789,30 +806,13 @@ export interface Success {

export type SuccessResponse = ApiResponse & Success;

export interface LastForMsgRequest {
/**
* Retrieves the last message for the given subject
*/
"last_by_subj": string;
}

export interface SeqMsgRequest {
/**
* Stream sequence number of the message to retrieve
*/
seq: number;
}

// FIXME: remove number as it is deprecated
export type MsgRequest = SeqMsgRequest | LastForMsgRequest | number;

export interface MsgDeleteRequest extends SeqMsgRequest {
export type MsgDeleteRequest = SeqMsgRequest & {
/**
* Default will securely remove a message and rewrite the data with random data,
* set this to true to only remove the message
*/
"no_erase"?: boolean;
}
};

export interface AccountLimits {
/**
Expand Down
2 changes: 1 addition & 1 deletion jetstream/src/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ import type {
ApiResponse,
JetStreamAccountStats,
} from "./jsapi_types.ts";
import { DirectStreamAPIImpl } from "./jsm.ts";
import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts";
import { DirectStreamAPIImpl } from "./jsm_direct.ts";

export function toJetStreamClient(
nc: NatsConnection | JetStreamClient,
Expand Down
4 changes: 4 additions & 0 deletions jetstream/src/jserrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ export class JetStreamStatus {
isMessageNotFound(): boolean {
return this.code === 404 && this.description === "message not found";
}

isEndOfBatch(): boolean {
return this.code === 204 && this.description === "eob";
}
}

export enum JetStreamApiCodes {
Expand Down
121 changes: 65 additions & 56 deletions jetstream/src/jsm.ts → jetstream/src/jsm_direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import type {
ReviverFn,
} from "@nats-io/nats-core";
import {
createInbox,
Empty,
errors,
QueuedIteratorImpl,
RequestStrategy,
TD,
} from "@nats-io/nats-core/internal";
import type {
DirectBatchOptions,
DirectLastFor,
DirectMsgRequest,
LastForMsgRequest,
} from "./jsapi_types.ts";
Expand Down Expand Up @@ -101,71 +101,75 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
return Promise.resolve(dm);
}

async getBatch(
getBatch(
stream: string,
opts: DirectBatchOptions,
): Promise<QueuedIterator<StoredMsg>> {
opts.batch = opts.batch || 1024;
return this.get(stream, opts);
}

getLastMessagesFor(
stream: string,
opts: DirectLastFor,
): Promise<QueuedIterator<StoredMsg>> {
return this.get(stream, opts);
}

get(
stream: string,
opts: Record<string, unknown>,
): Promise<QueuedIterator<StoredMsg>> {
validateStreamName(stream);
const pre = this.opts.apiPrefix || "$JS.API";
const subj = `${pre}.DIRECT.GET.${stream}`;
if (!Array.isArray(opts.multi_last) || opts.multi_last.length === 0) {
return Promise.reject(
errors.InvalidArgumentError.format("multi_last", "is required"),
);
}
const payload = JSON.stringify(opts, (key, value) => {
if (key === "up_to_time" && value instanceof Date) {
return value.toISOString();
}
return value;
});

const iter = new QueuedIteratorImpl<StoredMsg>();
const inbox = createInbox(this.nc.options.inboxPrefix);
let batchSupported = false;
const sub = this.nc.subscribe(inbox, {
timeout: 10_000,
callback: (err, msg) => {
if (err) {
iter.push(() => {
iter.stop(err);
});
sub.unsubscribe();
return;
}
const status = JetStreamStatus.maybeParseStatus(msg);
if (status) {
iter.push(() => {
status.isEndOfBatch() ? iter.stop() : iter.stop(status.toError());
});
return;
}
if (!batchSupported) {
if (typeof msg.headers?.get("Nats-Num-Pending") !== "string") {
// no batch/max_bytes option was provided, so single response
sub.unsubscribe();
iter.push(() => {
iter.stop();
});
} else {
batchSupported = true;
}
}

const raw = await this.nc.requestMany(
subj,
payload,
{
strategy: RequestStrategy.SentinelMsg,
iter.push(new DirectMsgImpl(msg));
},
);
});

(async () => {
let gotFirst = false;
let badServer = false;
let status: JetStreamStatus | null = null;
for await (const m of raw) {
if (!gotFirst) {
gotFirst = true;
status = JetStreamStatus.maybeParseStatus(m);
if (status) {
break;
}
// inspect the message and make sure that we have a supported server
const v = m.headers?.get("Nats-Num-Pending");
if (v === "") {
badServer = true;
break;
}
}
if (m.data.length === 0) {
break;
}
iter.push(new DirectMsgImpl(m));
const pre = this.opts.apiPrefix || "$JS.API";
const subj = `${pre}.DIRECT.GET.${stream}`;

const payload = JSON.stringify(opts, (key, value) => {
if (
(key === "up_to_time" || key === "start_time") && value instanceof Date
) {
return value.toISOString();
}
//@ts-ignore: term function
iter.push((): void => {
if (badServer) {
throw new Error("batch direct get not supported by the server");
}
if (status) {
throw status.toError();
}
iter.stop();
});
})().catch((err) => {
iter.stop(err);
return value;
});
this.nc.publish(subj, payload, { reply: inbox });

return Promise.resolve(iter);
}
Expand Down Expand Up @@ -205,6 +209,11 @@ export class DirectMsgImpl implements DirectMsg {
return this.header.last(DirectMsgHeaders.Stream);
}

get lastSequence(): number {
const v = this.header.last(DirectMsgHeaders.LastSequence);
return typeof v === "string" ? parseInt(v) : 0;
}

json<T = unknown>(reviver?: ReviverFn): T {
return JSON.parse(new TextDecoder().decode(this.data), reviver);
}
Expand Down
26 changes: 25 additions & 1 deletion jetstream/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import type {
ReviverFn,
} from "@nats-io/nats-core/internal";

import type { DeliverPolicy, ReplayPolicy } from "./jsapi_types.ts";
import type {
DeliverPolicy,
DirectLastFor,
ReplayPolicy,
} from "./jsapi_types.ts";

import type {
ConsumerConfig,
Expand Down Expand Up @@ -800,6 +804,20 @@ export interface DirectStreamAPI {
stream: string,
opts: DirectBatchOptions,
): Promise<QueuedIterator<StoredMsg>>;

/**
* Retrieves the last message for each subject in the filter.
* If no filter is specified, a maximum of 1024 subjects are returned.
* Care should be given on the specified filters to ensure that
* the results match what the client is expecting and to avoid missing
* expected data.
* @param stream
* @param opts
*/
getLastMessagesFor(
stream: string,
opts: DirectLastFor,
): Promise<QueuedIterator<StoredMsg>>;
}

/**
Expand Down Expand Up @@ -851,6 +869,11 @@ export interface DirectMsg extends StoredMsg {
* The name of the Stream storing message
*/
stream: string;

/**
* Previous sequence delivered to the client
*/
lastSequence: number;
}

/**
Expand Down Expand Up @@ -966,6 +989,7 @@ export enum DirectMsgHeaders {
Sequence = "Nats-Sequence",
TimeStamp = "Nats-Time-Stamp",
Subject = "Nats-Subject",
LastSequence = "Nats-Last-Sequence",
}

export enum RepublishHeaders {
Expand Down
Loading

0 comments on commit 1f92f34

Please sign in to comment.