From bc229c995b14f1634e5bf2a188c0b1bc70935d5a Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 28 Oct 2024 15:08:06 -0500 Subject: [PATCH] wip - removing NatsError in favor of easier more generic APIs and testing with instanceof. Signed-off-by: Alberto Ricart --- core/deno.json | 2 +- core/package.json | 2 +- core/src/core.ts | 152 +-------------------------- core/src/errors.ts | 118 +++++++++++++++++++-- core/src/internal_mod.ts | 5 - core/src/mod.ts | 4 - core/src/protocol.ts | 3 +- core/src/types.ts | 3 +- core/src/version.ts | 17 ++- core/tests/basics_test.ts | 13 ++- jetstream/deno.json | 4 +- jetstream/examples/util.ts | 4 +- jetstream/import_map.json | 4 +- jetstream/package.json | 4 +- jetstream/src/consumer.ts | 78 +++++--------- jetstream/src/internal_mod.ts | 8 +- jetstream/src/jsapi_types.ts | 6 +- jetstream/src/jsbaseclient_api.ts | 11 +- jetstream/src/jsclient.ts | 3 +- jetstream/src/jserrors.ts | 127 ++++++++++++++++++---- jetstream/src/jsm.ts | 28 ++++- jetstream/src/jsmsg.ts | 5 +- jetstream/src/jsutil.ts | 144 ------------------------- jetstream/src/mod.ts | 11 +- jetstream/src/pushconsumer.ts | 34 ++++-- jetstream/src/types.ts | 9 +- jetstream/tests/jsm_test.ts | 17 ++- kv/deno.json | 6 +- kv/import_map.json | 8 +- kv/package.json | 6 +- kv/src/kv.ts | 26 +++-- kv/tests/kv_test.ts | 21 +++- migration.md | 9 ++ obj/deno.json | 6 +- obj/import_map.json | 8 +- obj/package.json | 6 +- obj/src/objectstore.ts | 17 ++- services/deno.json | 4 +- services/import_map.json | 4 +- services/package.json | 4 +- services/src/service.ts | 5 +- services/src/types.ts | 18 +++- transport-deno/deno.json | 4 +- transport-deno/src/version.ts | 17 ++- transport-node/package.json | 4 +- transport-node/src/node_transport.ts | 29 +++-- transport-node/src/version.ts | 17 ++- transport-node/tests/basics_test.js | 6 +- transport-node/tests/tls_test.js | 8 +- 49 files changed, 524 insertions(+), 525 deletions(-) diff --git a/core/deno.json b/core/deno.json index a7270a7d..d9ac8870 100644 --- a/core/deno.json +++ b/core/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-30", + "version": "3.0.0-31", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" diff --git a/core/package.json b/core/package.json index 28eccbb1..6f1c77e0 100644 --- a/core/package.json +++ b/core/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-30", + "version": "3.0.0-31", "files": [ "lib/", "LICENSE", diff --git a/core/src/core.ts b/core/src/core.ts index 1a1ae41e..bbae012f 100644 --- a/core/src/core.ts +++ b/core/src/core.ts @@ -44,156 +44,6 @@ export enum DebugEvents { ClientInitiatedReconnect = "client initiated reconnect", } -export enum ErrorCode { - // emitted by the client - // ApiError = "BAD API", - BadAuthentication = "BAD_AUTHENTICATION", - BadCreds = "BAD_CREDS", - BadHeader = "BAD_HEADER", - BadJson = "BAD_JSON", - BadPayload = "BAD_PAYLOAD", - BadSubject = "BAD_SUBJECT", - Cancelled = "CANCELLED", - ConnectionClosed = "CONNECTION_CLOSED", - ConnectionDraining = "CONNECTION_DRAINING", - ConnectionRefused = "CONNECTION_REFUSED", - ConnectionTimeout = "CONNECTION_TIMEOUT", - Disconnect = "DISCONNECT", - InvalidOption = "INVALID_OPTION", - InvalidPayload = "INVALID_PAYLOAD", - MaxPayloadExceeded = "MAX_PAYLOAD_EXCEEDED", - NoResponders = "503", - NotFunction = "NOT_FUNC", - RequestError = "REQUEST_ERROR", - ServerOptionNotAvailable = "SERVER_OPT_NA", - SubClosed = "SUB_CLOSED", - SubDraining = "SUB_DRAINING", - Timeout = "TIMEOUT", - Tls = "TLS", - Unknown = "UNKNOWN_ERROR", - WssRequired = "WSS_REQUIRED", - - // jetstream - JetStreamInvalidAck = "JESTREAM_INVALID_ACK", - JetStream404NoMessages = "404", - JetStream408RequestTimeout = "408", - //@deprecated: use JetStream409 - JetStream409MaxAckPendingExceeded = "409", - JetStream409 = "409", - JetStreamNotEnabled = "503", - JetStreamIdleHeartBeat = "IDLE_HEARTBEAT", - - // emitted by the server - AuthorizationViolation = "AUTHORIZATION_VIOLATION", - AuthenticationExpired = "AUTHENTICATION_EXPIRED", - ProtocolError = "NATS_PROTOCOL_ERR", - PermissionsViolation = "PERMISSIONS_VIOLATION", - AuthenticationTimeout = "AUTHENTICATION_TIMEOUT", - AccountExpired = "ACCOUNT_EXPIRED", -} - -export function isNatsError(err: NatsError | Error): err is NatsError { - return typeof (err as NatsError).code === "string"; -} - -export class Messages { - messages: Map; - - constructor() { - this.messages = new Map(); - this.messages.set( - ErrorCode.InvalidPayload, - "Invalid payload type - payloads can be 'binary', 'string', or 'json'", - ); - this.messages.set(ErrorCode.BadJson, "Bad JSON"); - this.messages.set( - ErrorCode.WssRequired, - "TLS is required, therefore a secure websocket connection is also required", - ); - } - - static getMessage(s: string): string { - return messages.getMessage(s); - } - - getMessage(s: string): string { - return this.messages.get(s) || s; - } -} - -// safari doesn't support static class members -const messages: Messages = new Messages(); - -export type ApiError = { - /** - * Status code - */ - code: number; - /** - * JetStream Error Code - */ - err_code: number; - /** - * Error description - */ - description: string; -}; - -export class NatsError extends Error { - // TODO: on major version this should change to a number/enum - code: string; - permissionContext?: { operation: string; subject: string; queue?: string }; - chainedError?: Error; - // these are for supporting jetstream - api_error?: ApiError; - - /** - * @param {String} message - * @param {String} code - * @param {Error} [chainedError] - * - * @api private - */ - constructor(message: string, code: string, chainedError?: Error) { - super(message); - this.name = "NatsError"; - this.message = message; - this.code = code; - this.chainedError = chainedError; - } - - static errorForCode(code: string, chainedError?: Error): NatsError { - const m = Messages.getMessage(code); - return new NatsError(m, code, chainedError); - } - - isAuthError(): boolean { - return this.code === ErrorCode.AuthenticationExpired || - this.code === ErrorCode.AuthorizationViolation || - this.code === ErrorCode.AccountExpired; - } - - isAuthTimeout(): boolean { - return this.code === ErrorCode.AuthenticationTimeout; - } - - isPermissionError(): boolean { - return this.code === ErrorCode.PermissionsViolation; - } - - isProtocolError(): boolean { - return this.code === ErrorCode.ProtocolError; - } - - isJetStreamError(): boolean { - return this.api_error !== undefined; - } - - jsError(): ApiError | null { - return this.api_error ? this.api_error : null; - } -} - export type MsgCallback = (err: Error | null, msg: T) => void; /** @@ -488,6 +338,8 @@ export interface NatsConnection { * Publishes using the subject of the specified message, specifying the * data, headers and reply found in the message if any. * @param msg + * @throws InvalidSubjectError + * @throws InvalidOptionError, */ publishMessage(msg: Msg): void; diff --git a/core/src/errors.ts b/core/src/errors.ts index be019e36..85ee1578 100644 --- a/core/src/errors.ts +++ b/core/src/errors.ts @@ -13,6 +13,13 @@ * limitations under the License. */ +/** + * Represents an error that is thrown when an invalid subject is encountered. + * This class extends the built-in Error object. + * + * @class + * @extends Error + */ export class InvalidSubjectError extends Error { constructor(subject: string, options?: ErrorOptions) { super(`illegal subject: '${subject}'`, options); @@ -20,6 +27,15 @@ export class InvalidSubjectError extends Error { } } +/** + * Represents an error for invalid header scenarios. + * + * Instances of this error are thrown when a header is detected + * as having an invalid name or value. + * + * @param {string} message - A message providing details about the invalid header. + * @param {ErrorOptions} [options] - Optional error options. + */ export class InvalidHeaderError extends Error { constructor(message: string, options?: ErrorOptions) { super(`invalid header: ${message}`, options); @@ -27,6 +43,13 @@ export class InvalidHeaderError extends Error { } } +/** + * A custom error class to signify invalid options provided to an API function or + * to a ConnectionOption that doesn't match server settings. + * + * @param {string} message - A message providing details about the invalid option. + * @param {ErrorOptions} [options] - Optional error options. + */ export class InvalidOptionError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -47,6 +70,18 @@ export class InvalidOptionError extends Error { } } +/** + * InvalidOperationError is a custom error class that extends the standard Error object. + * It represents an error that occurs when an invalid operation is attempted on one of + * objects returned by the API. For example, trying to iterate on an object that was + * configured with a callback. + * + * @class InvalidOperationError + * @extends {Error} + * + * @param {string} message - The error message that explains the reason for the error. + * @param {ErrorOptions} [options] - Optional parameter to provide additional error options. + */ export class InvalidOperationError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -54,6 +89,11 @@ export class InvalidOperationError extends Error { } } +/** + * Represents an error indicating that user authentication has expired. + * This error is typically thrown when a user attempts to access a connection + * but their authentication credentials have expired. + */ export class UserAuthenticationExpiredError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -69,6 +109,12 @@ export class UserAuthenticationExpiredError extends Error { } } +/** + * Represents an error related to authorization issues. + * Note that these could represent an authorization violation, + * or that the account authentication configuration has expired, + * or an authentication timeout. + */ export class AuthorizationError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -94,6 +140,15 @@ export class AuthorizationError extends Error { } } +/** + * Class representing an error thrown when an operation is attempted on a closed connection. + * + * This error is intended to signal that a connection-related operation could not be completed + * because the connection is no longer open or has been terminated. + * + * @class + * @extends Error + */ export class ClosedConnectionError extends Error { constructor() { super("closed connection"); @@ -101,6 +156,16 @@ export class ClosedConnectionError extends Error { } } +/** + * The `ConnectionDrainingError` class represents a specific type of error + * that occurs when a connection is being drained. + * + * This error is typically used in scenarios where connections need to be + * gracefully closed or when they are transitioning to an inactive state. + * + * The error message is set to "connection draining" and the error name is + * overridden to "DrainingConnectionError". + */ export class ConnectionDrainingError extends Error { constructor() { super("connection draining"); @@ -108,6 +173,13 @@ export class ConnectionDrainingError extends Error { } } +/** + * Represents an error that occurs when a network connection fails. + * Extends the built-in Error class to provide additional context for connection-related issues. + * + * @param {string} message - A human-readable description of the error. + * @param {ErrorOptions} [options] - Optional settings for customizing the error behavior. + */ export class ConnectionError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -115,6 +187,14 @@ export class ConnectionError extends Error { } } +/** + * Represents an error encountered during protocol operations. + * This class extends the built-in `Error` class, providing a specific + * error type called `ProtocolError`. + * + * @param {string} message - A descriptive message describing the error. + * @param {ErrorOptions} [options] - Optional parameters to include additional details. + */ export class ProtocolError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -122,6 +202,12 @@ export class ProtocolError extends Error { } } +/** + * Class representing an error that occurs during an request operation + * (such as TimeoutError, or NoRespondersError, or some other error). + * + * @extends Error + */ export class RequestError extends Error { constructor(message = "", options?: ErrorOptions) { super(message, options); @@ -129,6 +215,14 @@ export class RequestError extends Error { } } +/** + * TimeoutError is a custom error class that extends the built-in Error class. + * It is used to represent an error that occurs when an operation exceeds a + * predefined time limit. + * + * @class + * @extends {Error} + */ export class TimeoutError extends Error { constructor(options?: ErrorOptions) { super("timeout", options); @@ -136,6 +230,16 @@ export class TimeoutError extends Error { } } +/** + * NoRespondersError is an error thrown when no responders (no service is + * subscribing to the subject) are found for a given subject. This error + * is typically found as the cause for a RequestError + * + * @extends Error + * + * @param {string} subject - The subject for which no responders were found. + * @param {ErrorOptions} [options] - Optional error options. + */ export class NoRespondersError extends Error { subject: string; constructor(subject: string, options?: ErrorOptions) { @@ -145,13 +249,13 @@ export class NoRespondersError extends Error { } } -export class ServerError extends Error { - constructor(message: string, options?: ErrorOptions) { - super(message, options); - this.name = "ServerError"; - } -} - +/** + * Class representing a Permission Violation Error. + * It provides information about the operation (either "publish" or "subscription") + * and the subject used for the operation and the optional queue (if a subscription). + * + * This error is terminal for a subscription. + */ export class PermissionViolationError extends Error { operation: "publish" | "subscription"; subject: string; diff --git a/core/src/internal_mod.ts b/core/src/internal_mod.ts index db8b4ba8..b536ba1d 100644 --- a/core/src/internal_mod.ts +++ b/core/src/internal_mod.ts @@ -81,7 +81,6 @@ export { Empty } from "./types.ts"; export { extractProtocolMessage, protoLen } from "./transport.ts"; export type { - ApiError, Auth, Authenticator, CallbackFn, @@ -119,11 +118,8 @@ export type { export { createInbox, DebugEvents, - ErrorCode, Events, - isNatsError, Match, - NatsError, RequestStrategy, syncIterator, } from "./core.ts"; @@ -153,7 +149,6 @@ export { PermissionViolationError, ProtocolError, RequestError, - ServerError, TimeoutError, UserAuthenticationExpiredError, } from "./errors.ts"; diff --git a/core/src/mod.ts b/core/src/mod.ts index 6229ea92..0313ec7e 100644 --- a/core/src/mod.ts +++ b/core/src/mod.ts @@ -29,7 +29,6 @@ export { deferred, delay, Empty, - ErrorCode, Events, headers, InvalidOptionError, @@ -39,7 +38,6 @@ export { millis, MsgHdrsImpl, nanos, - NatsError, nkeyAuthenticator, nkeys, NoRespondersError, @@ -49,7 +47,6 @@ export { ProtocolError, RequestError, RequestStrategy, - ServerError, syncIterator, TimeoutError, tokenAuthenticator, @@ -59,7 +56,6 @@ export { } from "./internal_mod.ts"; export type { - ApiError, Auth, Authenticator, Backoff, diff --git a/core/src/protocol.ts b/core/src/protocol.ts index 96ac7fb1..79bd6911 100644 --- a/core/src/protocol.ts +++ b/core/src/protocol.ts @@ -59,7 +59,6 @@ import { PermissionViolationError, ProtocolError, RequestError, - ServerError, UserAuthenticationExpiredError, } from "./errors.ts"; @@ -890,7 +889,7 @@ export class ProtocolHandler implements Dispatcher { } if (this.info && len > this.info.max_payload) { - throw new ServerError(`server 'max_payload' exceeded`); + throw new InvalidOptionError("payload size exceeded"); } this.outBytes += len; this.outMsgs++; diff --git a/core/src/types.ts b/core/src/types.ts index 632ab30c..e1925a52 100644 --- a/core/src/types.ts +++ b/core/src/types.ts @@ -12,7 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -export type { ApiError, Dispatcher, MsgHdrs, QueuedIterator } from "./core.ts"; -export { NatsError } from "./core.ts"; +export type { Dispatcher, MsgHdrs, QueuedIterator } from "./core.ts"; export { Empty } from "./encoders.ts"; diff --git a/core/src/version.ts b/core/src/version.ts index 14911460..e916ad01 100644 --- a/core/src/version.ts +++ b/core/src/version.ts @@ -1,2 +1,17 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // This file is generated - do not edit -export const version = "3.0.0-30"; +export const version = "3.0.0-31"; diff --git a/core/tests/basics_test.ts b/core/tests/basics_test.ts index 4a49eb26..26b65572 100644 --- a/core/tests/basics_test.ts +++ b/core/tests/basics_test.ts @@ -54,7 +54,6 @@ import { InvalidSubjectError, ProtocolError, RequestError, - ServerError, TimeoutError, } from "../src/errors.ts"; @@ -700,16 +699,16 @@ Deno.test("basics - max_payload errors", async () => { () => { nc.publish("foo", big); }, - ServerError, - `'max_payload' exceeded`, + InvalidOptionError, + `payload size exceeded`, ); assertRejects( () => { return nc.request("foo", big); }, - ServerError, - `'max_payload' exceeded`, + InvalidOptionError, + `payload size exceeded`, ); const d = deferred(); @@ -726,8 +725,8 @@ Deno.test("basics - max_payload errors", async () => { () => { m.respond(big); }, - ServerError, - `'max_payload' exceeded`, + InvalidOptionError, + `payload size exceeded`, ); break; } diff --git a/jetstream/deno.json b/jetstream/deno.json index 098cfe63..7cdb14e6 100644 --- a/jetstream/deno.json +++ b/jetstream/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-15", + "version": "3.0.0-18", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31" } } diff --git a/jetstream/examples/util.ts b/jetstream/examples/util.ts index f767f5b7..c0820f99 100644 --- a/jetstream/examples/util.ts +++ b/jetstream/examples/util.ts @@ -14,8 +14,8 @@ */ import { createConsumer, fill, initStream } from "../tests/jstest_util.ts"; -import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-30"; -import { nuid } from "jsr:@nats-io/nats-core@3.0.0-30"; +import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-31"; +import { nuid } from "jsr:@nats-io/nats-core@3.0.0-31"; export async function setupStreamAndConsumer( nc: NatsConnection, diff --git a/jetstream/import_map.json b/jetstream/import_map.json index bafe819c..9f2a97bb 100644 --- a/jetstream/import_map.json +++ b/jetstream/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-30/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-31/internal", "test_helpers": "../test_helpers/mod.ts", "@std/io": "jsr:@std/io@0.224.0" } diff --git a/jetstream/package.json b/jetstream/package.json index f75421ff..a339b52c 100644 --- a/jetstream/package.json +++ b/jetstream/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-15", + "version": "3.0.0-18", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients", "dependencies": { - "@nats-io/nats-core": "~3.0.0-30" + "@nats-io/nats-core": "~3.0.0-31" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 9f98ddb6..2ae38d47 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -16,7 +16,6 @@ import type { CallbackFn, Delay, - MsgHdrs, MsgImpl, QueuedIterator, Status, @@ -31,13 +30,11 @@ import { Events, IdleHeartbeatMonitor, nanos, - NatsError, nuid, QueuedIteratorImpl, timeout, } from "@nats-io/nats-core/internal"; import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { isHeartbeatMsg } from "./jsutil.ts"; import type { JsMsg } from "./jsmsg.ts"; import { toJsMsg } from "./jsmsg.ts"; @@ -48,7 +45,6 @@ import type { PullOptions, } from "./jsapi_types.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; -import { ConsumerDebugEvents, ConsumerEvents, JsHeaders } from "./types.ts"; import type { ConsumeMessages, ConsumeOptions, @@ -63,6 +59,8 @@ import type { OrderedConsumerOptions, PullConsumerOptions, } from "./types.ts"; +import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; +import { JetStreamStatus } from "./jserrors.ts"; enum PullConsumerType { Unset = -1, @@ -169,20 +167,19 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl const isProtocol = msg.subject === this.inbox; if (isProtocol) { - if (isHeartbeatMsg(msg)) { - const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer"); - const natsLastStream = msg.headers?.get("Nats-Last-Stream"); - this.notify(ConsumerDebugEvents.Heartbeat, { - natsLastConsumer, - natsLastStream, - }); + const status = new JetStreamStatus(msg); + status.debug(); + + if (status.isIdleHeartbeat()) { + this.notify(ConsumerDebugEvents.Heartbeat, status.parseHeartbeat()); return; } - const code = msg.headers?.code; - const description = msg.headers?.description?.toLowerCase() || - "unknown"; - const { msgsLeft, bytesLeft } = this.parseDiscard(msg.headers); - if (msgsLeft > 0 || bytesLeft > 0) { + const code = status.code; + const description = status.description; + + const { msgsLeft, bytesLeft } = status.parseDiscard(); + console.log("pending", msgsLeft, bytesLeft); + if ((msgsLeft && msgsLeft > 0) || (bytesLeft && bytesLeft > 0)) { this.pending.msgs -= msgsLeft; this.pending.bytes -= bytesLeft; this.pending.requests--; @@ -200,10 +197,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl // we got a bad request - no progress here switch (code) { case 400: - this.stop(new NatsError(description, `${code}`)); + this.stop(status.toError()); return; case 409: { - const err = this.handle409(code, description); + const err = this.handle409(status); if (err) { this.stop(err); return; @@ -341,22 +338,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl /** * Handle the notification of 409 error and whether * it should reject the operation by returning an Error or null - * @param code - * @param description + * @param status */ - handle409(code: number, description: string): Error | null { - const e = description === "consumer deleted" - ? ConsumerEvents.ConsumerDeleted - : ConsumerEvents.ExceededLimit; - this.notify(e, { code, description }); + handle409(status: JetStreamStatus): Error | null { + const { code, description } = status; + if (status.isConsumerDeleted()) { + this.notify(ConsumerEvents.ConsumerDeleted, { code, description }); + } else if (status.isExceededLimit()) { + this.notify(ConsumerEvents.ExceededLimit, { code, description }); + } if (!this.isConsume) { - // terminate the fetch/next - return new NatsError(description, `${code}`); - } else if ( - e === ConsumerEvents.ConsumerDeleted && this.abortOnMissingResource - ) { - // terminate the consume if abortOnMissingResource - return new NatsError(description, `${code}`); + return status.toError(); + } + if (status.isConsumerDeleted() && this.abortOnMissingResource) { + return status.toError(); } return null; } @@ -568,25 +563,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl return { batch, max_bytes, idle_heartbeat, expires }; } - parseDiscard( - headers?: MsgHdrs, - ): { msgsLeft: number; bytesLeft: number } { - const discard = { - msgsLeft: 0, - bytesLeft: 0, - }; - const msgsLeft = headers?.get(JsHeaders.PendingMessagesHdr); - if (msgsLeft) { - discard.msgsLeft = parseInt(msgsLeft); - } - const bytesLeft = headers?.get(JsHeaders.PendingBytesHdr); - if (bytesLeft) { - discard.bytesLeft = parseInt(bytesLeft); - } - - return discard; - } - trackTimeout(t: Timeout) { this.timeout = t; } diff --git a/jetstream/src/internal_mod.ts b/jetstream/src/internal_mod.ts index 34ac7d9f..e169bb57 100644 --- a/jetstream/src/internal_mod.ts +++ b/jetstream/src/internal_mod.ts @@ -12,7 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -export { checkJsError, isFlowControlMsg, isHeartbeatMsg } from "./jsutil.ts"; export { AdvisoryKind, @@ -138,3 +137,10 @@ export { export type { DeliveryInfo, StreamInfoRequestOptions } from "./jsapi_types.ts"; export { ListerImpl } from "./jslister.ts"; + +export { + isMessageNotFound, + JetStreamApiCodes, + JetStreamApiError, + JetStreamError, +} from "./jserrors.ts"; diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 51bbf836..0f8097be 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -13,7 +13,7 @@ * limitations under the License. */ -import type { ApiError, Nanos } from "@nats-io/nats-core"; +import type { Nanos } from "@nats-io/nats-core"; import { nanos } from "@nats-io/nats-core"; export interface ApiPaged { @@ -31,7 +31,7 @@ export interface ApiResponse { error?: ApiError; } -export interface JsApiError { +export interface ApiError { /** * HTTP like error code in the 300 to 500 range */ @@ -43,7 +43,7 @@ export interface JsApiError { /** * The NATS error code unique to each kind of error */ - err_code?: number; + err_code: number; } /** diff --git a/jetstream/src/jsbaseclient_api.ts b/jetstream/src/jsbaseclient_api.ts index 76ad2d53..daa0c994 100644 --- a/jetstream/src/jsbaseclient_api.ts +++ b/jetstream/src/jsbaseclient_api.ts @@ -17,14 +17,15 @@ import { backoff, delay, Empty, - ErrorCode, extend, + NoRespondersError, + RequestError, + TimeoutError, } from "@nats-io/nats-core/internal"; import type { Msg, NatsConnection, NatsConnectionImpl, - NatsError, RequestOptions, } from "@nats-io/nats-core/internal"; import type { ApiResponse } from "./jsapi_types.ts"; @@ -111,9 +112,11 @@ export class BaseApiClientImpl { ); return this.parseJsResponse(m); } catch (err) { - const ne = err as NatsError; + const { cause } = err as Error; if ( - (ne.code === "503" || ne.code === ErrorCode.Timeout) && + err instanceof RequestError && + (cause instanceof TimeoutError || + cause instanceof NoRespondersError) && i + 1 < retries ) { await delay(bo.backoff(i)); diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index c40b0567..f094e8aa 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -34,7 +34,7 @@ import type { StreamAPI, Streams, } from "./types.ts"; -import { headers } from "@nats-io/nats-core/internal"; +import { headers, RequestError } from "@nats-io/nats-core/internal"; import type { Msg, @@ -49,7 +49,6 @@ import type { JetStreamAccountStats, } from "./jsapi_types.ts"; import { DirectStreamAPIImpl } from "./jsm.ts"; -import { RequestError } from "../../core/src/mod.ts"; import { JetStreamError } from "./jserrors.ts"; export function toJetStreamClient( diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts index 13666cab..1d547f36 100644 --- a/jetstream/src/jserrors.ts +++ b/jetstream/src/jserrors.ts @@ -14,21 +14,17 @@ */ import type { Msg } from "@nats-io/nats-core"; +import { JsHeaders } from "./types.ts"; +import type { ApiError } from "./jsapi_types.ts"; -export type ApiError = { - /** - * Status code - */ +export class JetStreamStatusError extends Error { code: number; - /** - * JetStream Error Code - */ - err_code: number; - /** - * Error description - */ - description: string; -}; + constructor(message: string, code: number, opts?: ErrorOptions) { + super(message, opts); + this.code = code; + this.name = "JetStreamStatusError"; + } +} export class JetStreamStatus { msg: Msg; @@ -39,39 +35,121 @@ export class JetStreamStatus { this._description = ""; } - get status(): number { + toError(): JetStreamStatusError { + return new JetStreamStatusError(this.description, this.code); + } + + debug() { + console.log({ + message: this.description, + status: this.code, + headers: this.msg.headers, + }); + } + + get code(): number { return this.msg.headers?.code || 0; } get description(): string { if (this._description === "") { - this._description = this.msg.headers?.description?.toLowerCase() || ""; + this._description = this.msg.headers?.description?.toLowerCase() || + "unknown"; } return this._description; } + isIdleHeartbeat(): boolean { + return this.code === 100 && this.description === "idle heartbeat"; + } + + isFlowControlRequest(): boolean { + return this.code === 100 && this.description === "flowcontrol request"; + } + + parseHeartbeat(): + | { natsLastConsumer: number; natsLastStream: number } + | null { + if (this.isIdleHeartbeat()) { + return { + natsLastConsumer: parseInt( + this.msg.headers?.get("Nats-Last-Consumer") || "0", + ), + natsLastStream: parseInt( + this.msg.headers?.get("Nats-Last-Stream") || "0", + ), + }; + } + return null; + } + + isRequestTimeout(): boolean { + return this.code === 408 && this.description === "request timeout"; + } + + parseDiscard(): { msgsLeft: number; bytesLeft: number } { + const discard = { + msgsLeft: 0, + bytesLeft: 0, + }; + const msgsLeft = this.msg.headers?.get(JsHeaders.PendingMessagesHdr); + if (msgsLeft) { + discard.msgsLeft = parseInt(msgsLeft); + } + const bytesLeft = this.msg.headers?.get(JsHeaders.PendingBytesHdr); + if (bytesLeft) { + discard.bytesLeft = parseInt(bytesLeft); + } + + return discard; + } + isBadRequest() { - return this.status === 400; + return this.code === 400; } isConsumerDeleted() { - return this.status === 409 && this.description === "consumer deleted"; + return this.code === 409 && this.description === "consumer deleted"; } isStreamDeleted(): boolean { - return this.status === 409 && this.description === "stream deleted"; + return this.code === 409 && this.description === "stream deleted"; } isIdleHeartbeatMissed(): boolean { - return this.status === 409 && this.description === "idle heartbeats missed"; + return this.code === 409 && this.description === "idle heartbeats missed"; } isMaxWaitingExceeded(): boolean { - return this.status === 409 && this.description === "exceeded maxwaiting"; + return this.code === 409 && this.description === "exceeded maxwaiting"; } isConsumerIsPushBased(): boolean { - return this.status === 409 && this.description === "consumer is push based"; + return this.code === 409 && this.description === "consumer is push based"; + } + + isExceededMaxWaiting(): boolean { + return this.code === 409 && + this.description.includes("exceeded maxwaiting"); + } + + isExceededMaxRequestBatch(): boolean { + return this.code === 409 && + this.description.includes("exceeded maxrequestbatch"); + } + + isExceededMaxExpires(): boolean { + return this.code === 409 && + this.description.includes("exceeded maxrequestexpires"); + } + + isExceededLimit(): boolean { + return this.isExceededMaxExpires() || this.isExceededMaxWaiting() || + this.isExceededMaxRequestBatch(); + } + + isMessageNotFound(): boolean { + return this.code === 404 && this.description === "message not found"; } } @@ -86,6 +164,13 @@ export enum JetStreamApiCodes { ConsumerNotFound = 10014, StreamNotFound = 10059, JetStreamNotEnabledForAccount = 10039, + StreamWrongLastSequence = 10071, + NoMessageFound = 10037, +} + +export function isMessageNotFound(err: Error): boolean { + return err instanceof JetStreamApiError && + err.code === JetStreamApiCodes.NoMessageFound; } export class JetStreamApiError extends Error { diff --git a/jetstream/src/jsm.ts b/jetstream/src/jsm.ts index 9cf213c9..2354d473 100644 --- a/jetstream/src/jsm.ts +++ b/jetstream/src/jsm.ts @@ -40,7 +40,12 @@ import type { DirectMsgRequest, LastForMsgRequest, } from "./jsapi_types.ts"; -import { checkJsError, validateStreamName } from "./jsutil.ts"; +import { validateStreamName } from "./jsutil.ts"; +import { + JetStreamApiCodes, + JetStreamApiError, + JetStreamStatus, +} from "./jserrors.ts"; export class DirectStreamAPIImpl extends BaseApiClientImpl implements DirectStreamAPI { @@ -72,11 +77,24 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl payload, ); - // response is not a JS.API response - const err = checkJsError(r); - if (err) { - return Promise.reject(err); + if (r.headers?.code !== 0) { + const status = new JetStreamStatus(r); + if (status.isMessageNotFound()) { + // this so to simplify things that handle a non-existing messages + // as null (such as KV). + return Promise.reject( + new JetStreamApiError( + { + code: status.code, + err_code: JetStreamApiCodes.NoMessageFound, + description: status.description, + }, + ), + ); + } + return Promise.reject(status.toError()); } + const dm = new DirectMsgImpl(r); return Promise.resolve(dm); } diff --git a/jetstream/src/jsmsg.ts b/jetstream/src/jsmsg.ts index 64cd4b3d..1b5bc70b 100644 --- a/jetstream/src/jsmsg.ts +++ b/jetstream/src/jsmsg.ts @@ -17,7 +17,6 @@ import type { Msg, MsgHdrs, MsgImpl, - NatsError, ProtocolHandler, RequestOptions, } from "@nats-io/nats-core/internal"; @@ -255,13 +254,13 @@ export class JsMsgImpl implements JsMsg { }, ); } catch (err) { - r.cancel(err as NatsError); + r.cancel(err as Error); } try { await Promise.race([r.timer, r.deferred]); d.resolve(true); } catch (err) { - r.cancel(err as NatsError); + r.cancel(err as Error); d.reject(err); } } else { diff --git a/jetstream/src/jsutil.ts b/jetstream/src/jsutil.ts index 830babbf..03d950de 100644 --- a/jetstream/src/jsutil.ts +++ b/jetstream/src/jsutil.ts @@ -12,20 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { - Empty, - ErrorCode, - headers, - MsgImpl, - NatsError, -} from "@nats-io/nats-core/internal"; - -import type { - Msg, - MsgArg, - MsgHdrsImpl, - Publisher, -} from "@nats-io/nats-core/internal"; export function validateDurableName(name?: string) { return minValidation("durable", name); @@ -91,133 +77,3 @@ export function validName(name = ""): string { } return ""; } - -/** - * Returns true if the message is a flow control message - * @param msg - */ -export function isFlowControlMsg(msg: Msg): boolean { - if (msg.data.length > 0) { - return false; - } - const h = msg.headers; - if (!h) { - return false; - } - return h.code >= 100 && h.code < 200; -} - -/** - * Returns true if the message is a heart beat message - * @param msg - */ -export function isHeartbeatMsg(msg: Msg): boolean { - return isFlowControlMsg(msg) && msg.headers?.description === "Idle Heartbeat"; -} - -export function newJsErrorMsg( - code: number, - description: string, - subject: string, -): Msg { - const h = headers(code, description) as MsgHdrsImpl; - - const arg = { hdr: 1, sid: 0, size: 0 } as MsgArg; - const msg = new MsgImpl(arg, Empty, {} as Publisher); - msg._headers = h; - msg._subject = subject; - - return msg; -} - -export function checkJsError(msg: Msg): NatsError | null { - // JS error only if no payload - otherwise assume it is application data - if (msg.data.length !== 0) { - return null; - } - const h = msg.headers; - if (!h) { - return null; - } - return checkJsErrorCode(h.code, h.description); -} - -export enum Js409Errors { - MaxBatchExceeded = "exceeded maxrequestbatch of", - MaxExpiresExceeded = "exceeded maxrequestexpires of", - MaxBytesExceeded = "exceeded maxrequestmaxbytes of", - MaxMessageSizeExceeded = "message size exceeds maxbytes", - PushConsumer = "consumer is push based", - MaxWaitingExceeded = "exceeded maxwaiting", // not terminal - IdleHeartbeatMissed = "idle heartbeats missed", - ConsumerDeleted = "consumer deleted", - // FIXME: consumer deleted - instead of no responder (terminal error) - // leadership changed - -} - -let MAX_WAITING_FAIL = false; -export function setMaxWaitingToFail(tf: boolean) { - MAX_WAITING_FAIL = tf; -} - -export function isTerminal409(err: NatsError): boolean { - if (err.code !== ErrorCode.JetStream409) { - return false; - } - const fatal = [ - Js409Errors.MaxBatchExceeded, - Js409Errors.MaxExpiresExceeded, - Js409Errors.MaxBytesExceeded, - Js409Errors.MaxMessageSizeExceeded, - Js409Errors.PushConsumer, - Js409Errors.IdleHeartbeatMissed, - Js409Errors.ConsumerDeleted, - ]; - if (MAX_WAITING_FAIL) { - fatal.push(Js409Errors.MaxWaitingExceeded); - } - - return fatal.find((s) => { - return err.message.indexOf(s) !== -1; - }) !== undefined; -} - -export function checkJsErrorCode( - code: number, - description = "", -): NatsError | null { - if (code < 300) { - return null; - } - description = description.toLowerCase(); - switch (code) { - case 404: - // 404 for jetstream will provide different messages ensure we - // keep whatever the server returned - return new NatsError(description, ErrorCode.JetStream404NoMessages); - case 408: - return new NatsError(description, ErrorCode.JetStream408RequestTimeout); - case 409: { - // the description can be exceeded max waiting or max ack pending, which are - // recoverable, but can also be terminal errors where the request exceeds - // some value in the consumer configuration - const ec = description.startsWith(Js409Errors.IdleHeartbeatMissed) - ? ErrorCode.JetStreamIdleHeartBeat - : ErrorCode.JetStream409; - return new NatsError( - description, - ec, - ); - } - case 503: - return NatsError.errorForCode( - ErrorCode.JetStreamNotEnabled, - new Error(description), - ); - default: - if (description === "") { - description = ErrorCode.Unknown; - } - return new NatsError(description, `${code}`); - } -} diff --git a/jetstream/src/mod.ts b/jetstream/src/mod.ts index 50ecb368..3bcb2d7e 100644 --- a/jetstream/src/mod.ts +++ b/jetstream/src/mod.ts @@ -12,13 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -export { - checkJsError, - isFlowControlMsg, - isHeartbeatMsg, - jetstream, - jetstreamManager, -} from "./internal_mod.ts"; +export { jetstream, jetstreamManager } from "./internal_mod.ts"; export { AckPolicy, @@ -30,6 +24,9 @@ export { DiscardPolicy, isPullConsumer, isPushConsumer, + JetStreamApiCodes, + JetStreamApiError, + JetStreamError, JsHeaders, ReplayPolicy, RepublishHeaders, diff --git a/jetstream/src/pushconsumer.ts b/jetstream/src/pushconsumer.ts index 985cce0e..5d177c8e 100644 --- a/jetstream/src/pushconsumer.ts +++ b/jetstream/src/pushconsumer.ts @@ -1,3 +1,18 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import { toJsMsg } from "./jsmsg.ts"; import type { JsMsg } from "./jsmsg.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; @@ -21,7 +36,6 @@ import { IdleHeartbeatMonitor, millis, nanos, - NatsError, nuid, QueuedIteratorImpl, } from "@nats-io/nats-core/internal"; @@ -32,7 +46,7 @@ import type { Status, Subscription, } from "@nats-io/nats-core/internal"; -import { isFlowControlMsg, isHeartbeatMsg } from "./mod.ts"; +import { JetStreamStatus } from "./jserrors.ts"; export class PushConsumerMessagesImpl extends QueuedIteratorImpl implements ConsumerMessages { @@ -268,7 +282,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl const isProtocol = msg.subject === subject; if (isProtocol) { - if (isHeartbeatMsg(msg)) { + const status = new JetStreamStatus(msg); + if (status.isIdleHeartbeat()) { const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer"); const natsLastStream = msg.headers?.get("Nats-Last-Stream"); this.notify(ConsumerDebugEvents.Heartbeat, { @@ -277,7 +292,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl }); return; } - if (isFlowControlMsg(msg)) { + if (status.isFlowControlRequest()) { + status.debug(); this._push(() => { msg.respond(); this.notify(ConsumerDebugEvents.FlowControl, null); @@ -285,11 +301,10 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl return; } - const code = msg.headers?.code; - const description = msg.headers?.description?.toLowerCase() || - "unknown"; + const code = status.code; + const description = status.description; - if (code === 409 && description === "consumer deleted") { + if (status.isConsumerDeleted()) { this.notify( ConsumerEvents.ConsumerDeleted, `${code} ${description}`, @@ -297,8 +312,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl } if (this.abortOnMissingResource) { this._push(() => { - const error = new NatsError(description, `${code}`); - this.stop(error); + this.stop(status.toError()); }); return; } diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 97521133..957cf574 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -544,7 +544,7 @@ export enum ConsumerDebugEvents { /** * Notifies that the client received a server-side heartbeat. The payload the data - * portion has the format `{natsLastConsumer: string, natsLastStream: string}`; + * portion has the format `{natsLastConsumer: number, natsLastStream: number}`; */ Heartbeat = "heartbeat", @@ -659,7 +659,7 @@ export function isPushConsumer(v: PushConsumer | Consumer): v is PushConsumer { export interface JetStreamClient { /** * Publishes a message to a stream. If not stream is configured to store the message, the - * request will fail with {@link ErrorCode.NoResponders} error. + * request will fail with RequestError error with a nested NoRespondersError. * * @param subj - the subject for the message * @param payload - the message's data @@ -786,7 +786,10 @@ export interface DirectStreamAPI { * @param stream * @param query */ - getMessage(stream: string, query: DirectMsgRequest): Promise; + getMessage( + stream: string, + query: DirectMsgRequest, + ): Promise; /** * Retrieves all last subject messages for the specified subjects diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 44b06f91..f7a7b1e9 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -1205,18 +1205,23 @@ Deno.test("jsm - direct getMessage", async () => { await js.publish("foo", "e", { expect: { lastSequence: 4 } }); let m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "bar" }); + assertExists(m); assertEquals(m.seq, 4); m = await jsm.direct.getMessage("A", { last_by_subj: "foo" }); + assertExists(m); assertEquals(m.seq, 5); m = await jsm.direct.getMessage("A", { seq: 0, next_by_subj: "foo" }); + assertExists(m); assertEquals(m.seq, 1); m = await jsm.direct.getMessage("A", { seq: 4, next_by_subj: "foo" }); + assertExists(m); assertEquals(m.seq, 5); m = await jsm.direct.getMessage("A", { seq: 2, next_by_subj: "foo" }); + assertExists(m); assertEquals(m.seq, 2); await cleanup(ns, nc); @@ -1951,11 +1956,13 @@ Deno.test("jsm - direct msg decode", async () => { await js.publish("a.a", "hello"); await js.publish("a.a", JSON.stringify({ one: "two", a: [1, 2, 3] })); - assertEquals( - (await jsm.direct.getMessage(name, { seq: 1 })).string(), - "hello", - ); - assertEquals((await jsm.direct.getMessage(name, { seq: 2 })).json(), { + let m = await jsm.direct.getMessage(name, { seq: 1 }); + assertExists(m); + assertEquals(m.string(), "hello"); + + m = await jsm.direct.getMessage(name, { seq: 2 }); + assertExists(m); + assertEquals(m.json(), { one: "two", a: [1, 2, 3], }); diff --git a/kv/deno.json b/kv/deno.json index dbbccd06..e25a8695 100644 --- a/kv/deno.json +++ b/kv/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-12", + "version": "3.0.0-13", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-15" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-18" } } diff --git a/kv/import_map.json b/kv/import_map.json index b0d4b9ee..6156b24b 100644 --- a/kv/import_map.json +++ b/kv/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-30/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-15", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-15/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-31/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-18", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-18/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/kv/package.json b/kv/package.json index 2c9c691a..7c1a3457 100644 --- a/kv/package.json +++ b/kv/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-12", + "version": "3.0.0-13", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "kv library - this library implements all the base functionality for NATS KV javascript clients", "dependencies": { - "@nats-io/jetstream": "~3.0.0-15", - "@nats-io/nats-core": "~3.0.0-30" + "@nats-io/jetstream": "~3.0.0-18", + "@nats-io/nats-core": "~3.0.0-31" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/kv/src/kv.ts b/kv/src/kv.ts index 4f1839d2..9878a96c 100644 --- a/kv/src/kv.ts +++ b/kv/src/kv.ts @@ -16,7 +16,6 @@ import { compare, Empty, - ErrorCode, Feature, headers, millis, @@ -30,7 +29,6 @@ import type { MsgHdrs, NatsConnection, NatsConnectionImpl, - NatsError, Payload, QueuedIterator, } from "@nats-io/nats-core/internal"; @@ -39,6 +37,8 @@ import { AckPolicy, DeliverPolicy, DiscardPolicy, + JetStreamApiCodes, + JetStreamApiError, JsHeaders, ListerImpl, PubHeaders, @@ -567,8 +567,11 @@ export class Bucket implements KV, KvRemove { return Promise.resolve(n); } catch (err) { firstErr = err; - if ((err as NatsError)?.api_error?.err_code !== 10071) { - return Promise.reject(err); + if (err instanceof JetStreamApiError) { + const jserr = err as JetStreamApiError; + if (jserr.code !== JetStreamApiCodes.StreamWrongLastSequence) { + return Promise.reject(err); + } } } let rev = 0; @@ -610,12 +613,6 @@ export class Bucket implements KV, KvRemove { const pa = await this.js.publish(this.subjectForKey(ek, true), data, o); return pa.seq; } catch (err) { - const ne = err as NatsError; - if (ne.isJetStreamError()) { - ne.message = ne.api_error?.description!; - ne.code = `${ne.api_error?.code!}`; - return Promise.reject(ne); - } return Promise.reject(err); } } @@ -647,10 +644,11 @@ export class Bucket implements KV, KvRemove { } return ke; } catch (err) { - if ( - (err as NatsError).code === ErrorCode.JetStream404NoMessages - ) { - return null; + if (err instanceof JetStreamApiError) { + const jserr = err as JetStreamApiError; + if (jserr.code === JetStreamApiCodes.NoMessageFound) { + return null; + } } throw err; } diff --git a/kv/tests/kv_test.ts b/kv/tests/kv_test.ts index c004636b..d84962f7 100644 --- a/kv/tests/kv_test.ts +++ b/kv/tests/kv_test.ts @@ -1635,8 +1635,8 @@ Deno.test("kv - create after delete", async () => { const kv = await new Kvm(js).create("K"); await kv.create("a", Empty); - await assertRejects(async () => { - await kv.create("a", Empty); + await assertRejects(() => { + return kv.create("a", Empty); }); await kv.delete("a"); await kv.create("a", Empty); @@ -1645,6 +1645,23 @@ Deno.test("kv - create after delete", async () => { await cleanup(ns, nc); }); +Deno.test("kv - get non-existing non-direct", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const js = jetstream(nc); + const kv = await new Kvm(js).create("K", { allow_direct: false }); + const v = await kv.get("hello"); + assertEquals(v, null); + await cleanup(ns, nc); +}); + +Deno.test("kv - get non-existing direct", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const js = jetstream(nc); + const kv = await new Kvm(js).create("K", { allow_direct: true }); + assertEquals(await kv.get("hello"), null); + await cleanup(ns, nc); +}); + Deno.test("kv - string payloads", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf({})); diff --git a/migration.md b/migration.md index 0dbefe9c..f678a9e8 100644 --- a/migration.md +++ b/migration.md @@ -84,6 +84,11 @@ these modules for cross-runtime consumption. string or JSON use `string()` and `json()` methods. For publishing JSON payloads, simply specify the output of `JSON.stringify()` to the publish operation. +- NatsError was removed in favor of more descriptive types. For example, if you + make a request, the request could fail with a RequestError. The RequestError + in turn will contain the cause `TimeoutError` or a `NoRespondersError`. This + also means that in typescript, the callback signature has been relaxed to just + `(Error, Msg)=>void`. For more information see the JsDoc for the client. ## Changes in JetStream @@ -109,6 +114,10 @@ To use JetStream, you must install and import `@nats/jetstream`. `isConsumerOptsBuilder()` have been removed. Along side of it `ConsumerOptsBuilder` which was used by `subscribe()` and `pullSubscribe()` type has been removed. +- JetStream errors are now expressed by the type `JetStreamError` and + `JetStreamAPIError`. For API calls where the server could return an error, + these are `JetStreamAPIError` and contain all the information returned by the + server. ## Changes to KV diff --git a/obj/deno.json b/obj/deno.json index 42d2e23f..3c2c04cf 100644 --- a/obj/deno.json +++ b/obj/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-12", + "version": "3.0.0-13", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-15" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-18" } } diff --git a/obj/import_map.json b/obj/import_map.json index b0d4b9ee..6156b24b 100644 --- a/obj/import_map.json +++ b/obj/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-30/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-15", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-15/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-31/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-18", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-18/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/obj/package.json b/obj/package.json index 8d228e52..1c32b7dc 100644 --- a/obj/package.json +++ b/obj/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-12", + "version": "3.0.0-13", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients", "dependencies": { - "@nats-io/jetstream": "~3.0.0-15", - "@nats-io/nats-core": "~3.0.0-30" + "@nats-io/jetstream": "~3.0.0-18", + "@nats-io/nats-core": "~3.0.0-31" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/obj/src/objectstore.ts b/obj/src/objectstore.ts index f6c37c3b..29b19194 100644 --- a/obj/src/objectstore.ts +++ b/obj/src/objectstore.ts @@ -16,7 +16,6 @@ import type { MsgHdrs, NatsConnection, - NatsError, QueuedIterator, } from "@nats-io/nats-core/internal"; import { @@ -50,6 +49,9 @@ import type { import { DeliverPolicy, DiscardPolicy, + isMessageNotFound, + JetStreamApiCodes, + JetStreamApiError, JsHeaders, ListerImpl, PubHeaders, @@ -364,7 +366,10 @@ export class ObjectStoreImpl implements ObjectStore { soi.revision = m.seq; return soi; } catch (err) { - if ((err as NatsError).code === "404") { + if ( + err instanceof JetStreamApiError && + err.code === JetStreamApiCodes.NoMessageFound + ) { return null; } return Promise.reject(err); @@ -377,8 +382,10 @@ export class ObjectStoreImpl implements ObjectStore { try { return await this.jsm.streams.info(this.stream, opts); } catch (err) { - const nerr = err as NatsError; - if (nerr.code === "404") { + if ( + err instanceof JetStreamApiError && + err.code === JetStreamApiCodes.StreamNotFound + ) { return null; } return Promise.reject(err); @@ -808,7 +815,7 @@ export class ObjectStoreImpl implements ObjectStore { try { await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj }); } catch (err) { - if ((err as NatsError).code !== "404") { + if (!isMessageNotFound(err as Error)) { qi.stop(err as Error); } } diff --git a/services/deno.json b/services/deno.json index ee58151d..b6165eae 100644 --- a/services/deno.json +++ b/services/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-9", + "version": "3.0.0-10", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31" } } diff --git a/services/import_map.json b/services/import_map.json index 0c5e9ca1..10398a3b 100644 --- a/services/import_map.json +++ b/services/import_map.json @@ -1,7 +1,7 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-30/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-31/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/services/package.json b/services/package.json index d27e0f39..49e51557 100644 --- a/services/package.json +++ b/services/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-9", + "version": "3.0.0-10", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "services library - this library implements all the base functionality for NATS services for javascript clients", "dependencies": { - "@nats-io/nats-core": "~3.0.0-30" + "@nats-io/nats-core": "~3.0.0-31" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/services/src/service.ts b/services/src/service.ts index 58251812..ea6e0fc5 100644 --- a/services/src/service.ts +++ b/services/src/service.ts @@ -27,7 +27,6 @@ import type { MsgHdrs, Nanos, NatsConnection, - NatsError, Payload, PublishOptions, QueuedIterator, @@ -458,7 +457,7 @@ export class ServiceImpl implements Service { addInternalHandler( verb: ServiceVerb, - handler: (err: NatsError | null, msg: Msg) => Promise, + handler: (err: Error | null, msg: Msg) => Promise, ) { const v = `${verb}`.toUpperCase(); this._doAddInternalHandler(`${v}-all`, verb, handler); @@ -475,7 +474,7 @@ export class ServiceImpl implements Service { _doAddInternalHandler( name: string, verb: ServiceVerb, - handler: (err: NatsError | null, msg: Msg) => Promise, + handler: (err: Error | null, msg: Msg) => Promise, kind = "", id = "", ) { diff --git a/services/src/types.ts b/services/src/types.ts index c3c777d3..ae9cbb7e 100644 --- a/services/src/types.ts +++ b/services/src/types.ts @@ -1,7 +1,21 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import type { Msg, Nanos, - NatsError, Payload, PublishOptions, QueuedIterator, @@ -16,7 +30,7 @@ export interface ServiceMsg extends Msg { ): boolean; } -export type ServiceHandler = (err: NatsError | null, msg: ServiceMsg) => void; +export type ServiceHandler = (err: Error | null, msg: ServiceMsg) => void; /** * A service Endpoint */ diff --git a/transport-deno/deno.json b/transport-deno/deno.json index b7aaef40..8180ccdd 100644 --- a/transport-deno/deno.json +++ b/transport-deno/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-deno", - "version": "3.0.0-6", + "version": "3.0.0-7", "exports": { ".": "./src/mod.ts" }, @@ -20,7 +20,7 @@ }, "imports": { "@std/io": "jsr:@std/io@0.224.0", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-30", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-31", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2" } diff --git a/transport-deno/src/version.ts b/transport-deno/src/version.ts index 9386be88..0ebfddcf 100644 --- a/transport-deno/src/version.ts +++ b/transport-deno/src/version.ts @@ -1,2 +1,17 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // This file is generated - do not edit -export const version = "3.0.0-6"; +export const version = "3.0.0-7"; diff --git a/transport-node/package.json b/transport-node/package.json index 8bb3a98e..e3bc8163 100644 --- a/transport-node/package.json +++ b/transport-node/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-node", - "version": "3.0.0-15", + "version": "3.0.0-16", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -54,7 +54,7 @@ "node": ">= 18.0.0" }, "dependencies": { - "@nats-io/nats-core": "~3.0.0-30", + "@nats-io/nats-core": "~3.0.0-31", "@nats-io/nkeys": "~1.2.0-7", "@nats-io/nuid": "^2.0.1-2" }, diff --git a/transport-node/src/node_transport.ts b/transport-node/src/node_transport.ts index 0b292563..1d8a8d14 100644 --- a/transport-node/src/node_transport.ts +++ b/transport-node/src/node_transport.ts @@ -15,14 +15,14 @@ import type { ConnectionOptions } from "./nats-base-client"; import { checkOptions, + ConnectionError, DataBuffer, Deferred, deferred, - ErrorCode, extend, extractProtocolMessage, INFO, - NatsError, + InvalidOptionError, render, ServerInfo, Transport, @@ -88,7 +88,10 @@ export class NodeTransport implements Transport { //@ts-ignore: this is possibly a TlsSocket if (tlsRequired && this.socket.encrypted !== true) { - throw new NatsError("tls", ErrorCode.ServerOptionNotAvailable); + throw InvalidOptionError.illegalOption( + "tls", + "is not available on this server", + ); } this.connected = true; @@ -101,14 +104,14 @@ export class NodeTransport implements Transport { // this seems to be possible in Kubernetes // where an error is thrown, but it is undefined // when something like istio-init is booting up - err = NatsError.errorForCode( - ErrorCode.ConnectionRefused, - new Error("node provided an undefined error!"), + err = new ConnectionError( + "error connecting - node provided an undefined error", ); } + // @ts-ignore: node error const { code } = err; const perr = code === "ECONNREFUSED" - ? NatsError.errorForCode(ErrorCode.ConnectionRefused, err) + ? new ConnectionError("connection refused", { cause: err }) : err; this.socket?.destroy(); throw perr; @@ -248,7 +251,7 @@ export class NodeTransport implements Transport { const certOpts = await this.loadClientCerts() || {}; tlsOpts = extend(tlsOpts, this.options.tls, certOpts); } catch (err) { - return Promise.reject(new NatsError(err.message, ErrorCode.Tls, err)); + return Promise.reject(new ConnectionError(err.message, { cause: err })); } } const d = deferred(); @@ -277,7 +280,7 @@ export class NodeTransport implements Transport { tlsSocket.setNoDelay(true); } catch (err) { // tls throws errors on bad certs see nats.js#310 - d.reject(NatsError.errorForCode(ErrorCode.Tls, err)); + d.reject(new ConnectionError(err.message, { cause: err })); } return d; } @@ -294,7 +297,11 @@ export class NodeTransport implements Transport { const certOpts = await this.loadClientCerts() || {}; tlsOpts = extend(tlsOpts, this.options.tls, certOpts); } catch (err) { - return Promise.reject(new NatsError(err.message, ErrorCode.Tls, err)); + return Promise.reject( + new ConnectionError((err as Error).message, { + cause: err, + }), + ); } } const d = deferred(); @@ -321,7 +328,7 @@ export class NodeTransport implements Transport { }); } catch (err) { // tls throws errors on bad certs see nats.js#310 - d.reject(NatsError.errorForCode(ErrorCode.Tls, err)); + d.reject(new ConnectionError(err.message, { cause: err })); } return d; } diff --git a/transport-node/src/version.ts b/transport-node/src/version.ts index 7e38d46f..a6f62f28 100644 --- a/transport-node/src/version.ts +++ b/transport-node/src/version.ts @@ -1,2 +1,17 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // This file is generated - do not edit -export const version = "3.0.0-15"; +export const version = "3.0.0-16"; diff --git a/transport-node/tests/basics_test.js b/transport-node/tests/basics_test.js index d2a7c02e..38e8569a 100644 --- a/transport-node/tests/basics_test.js +++ b/transport-node/tests/basics_test.js @@ -17,7 +17,6 @@ const assert = require("node:assert").strict; const { connect, ConnectionError, - ErrorCode, createInbox, } = require( "../lib/mod", @@ -30,6 +29,7 @@ const { const { Lock } = require("./helpers/lock"); const { NatsServer } = require("./helpers/launcher"); const { jetstreamServerConf } = require("./helpers/jsutil.js"); +const { ProtocolError } = require("@nats-io/nats-core"); const u = "demo.nats.io:4222"; @@ -139,7 +139,7 @@ describe( const closed = nc.closed(); await ns.stop(); const err = await closed; - assert(err instanceof ClosedConnectionError); + assert(err instanceof ConnectionError); assert.equal(err.message, "connection refused"); }); @@ -150,7 +150,7 @@ describe( nc.protocol.sendCommand("X\r\n"); }); const err = await nc.closed(); - assert.equal(err?.code, ErrorCode.ProtocolError); + assert(err instanceof ProtocolError); await ns.stop(); }); diff --git a/transport-node/tests/tls_test.js b/transport-node/tests/tls_test.js index 5b0e0c5c..ff1a8b50 100644 --- a/transport-node/tests/tls_test.js +++ b/transport-node/tests/tls_test.js @@ -16,7 +16,6 @@ const { describe, it } = require("node:test"); const assert = require("node:assert").strict; const { connect, - ErrorCode, } = require( "../index", ); @@ -47,7 +46,7 @@ describe("tls", { timeout: 20_000, concurrency: true, forceExit: true }, () => { assert.fail("shouldn't have connected"); }) .catch((err) => { - assert.equal(err.code, ErrorCode.ServerOptionNotAvailable); + assert.equal(err.message, "option 'tls' is not available"); lock.unlock(); }); await lock; @@ -189,9 +188,7 @@ describe("tls", { timeout: 20_000, concurrency: true, forceExit: true }, () => { await assert.rejects(() => { return connect({ servers: `localhost:${ns.port}`, tls: conf }); }, (err) => { - assert.equal(err.code, ErrorCode.Tls); - assert.ok(err.chainedError); - assert.ok(re.exec(err.chainedError.message)); + assert.ok(re.exec(err.message)); return true; }); await ns.stop(); @@ -233,7 +230,6 @@ describe("tls", { timeout: 20_000, concurrency: true, forceExit: true }, () => { await connect({ servers: `localhost:${ns.port}`, tls: conf }); assert.fail("shouldn't have connected"); } catch (err) { - assert.equal(err.code, ErrorCode.Tls); const v = conf[arg]; assert.equal(err.message, `${v} doesn't exist`); }