From 2ac7ea9da06c9c3f3f418086b75bb1a2e2065359 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 24 Oct 2024 12:24:51 -0500 Subject: [PATCH] wip - removing NatsError in favor of easier more generic APIs and testing with instanceof. Signed-off-by: Alberto Ricart --- TODO.md | 2 + core/README.md | 2 +- core/src/authenticator.ts | 8 +- core/src/bench.ts | 8 +- core/src/core.ts | 43 +-- core/src/errors.ts | 195 ++++++++++++++ core/src/headers.ts | 13 +- core/src/internal_mod.ts | 15 ++ core/src/mod.ts | 12 + core/src/msg.ts | 11 - core/src/muxsubscription.ts | 50 ++-- core/src/nats.ts | 99 ++++--- core/src/options.ts | 33 +-- core/src/protocol.ts | 217 +++++++-------- core/src/queued_iterator.ts | 8 +- core/src/request.ts | 21 +- core/src/util.ts | 6 +- core/src/ws_transport.ts | 13 +- core/tests/auth_test.ts | 288 ++++++++++---------- core/tests/authenticator_test.ts | 12 +- core/tests/autounsub_test.ts | 50 +++- core/tests/basics_test.ts | 308 +++++++++++----------- core/tests/drain_test.ts | 130 ++++----- core/tests/headers_test.ts | 22 +- core/tests/iterators_test.ts | 34 +-- core/tests/json_test.ts | 4 +- core/tests/mrequest_test.ts | 49 ++-- core/tests/noresponders_test.ts | 68 ----- core/tests/protocol_test.ts | 21 +- core/tests/reconnect_test.ts | 42 +-- core/tests/tls_test.ts | 52 ++-- core/tests/token_test.ts | 34 +-- core/tests/ws_test.ts | 12 +- jetstream/examples/util.ts | 4 +- jetstream/src/jsapi_types.ts | 15 ++ jetstream/src/jsbaseclient_api.ts | 11 +- jetstream/src/jsclient.ts | 27 +- jetstream/src/jserrors.ts | 111 ++++++++ jetstream/src/jsmsg.ts | 4 +- jetstream/tests/consume_test.ts | 5 +- jetstream/tests/consumers_ordered_test.ts | 4 +- jetstream/tests/jetstream_test.ts | 18 +- jetstream/tests/jsm_test.ts | 57 ++-- jetstream/tests/jsmsg_test.ts | 21 +- jetstream/tests/util.ts | 36 ++- services/src/service.ts | 2 +- services/tests/service_test.ts | 28 +- test_helpers/asserts.ts | 46 +--- test_helpers/mod.ts | 3 - transport-deno/src/deno_transport.ts | 11 +- transport-node/tests/basics_test.js | 7 +- transport-node/tests/reconnect_test.js | 5 +- 52 files changed, 1254 insertions(+), 1043 deletions(-) create mode 100644 core/src/errors.ts delete mode 100644 core/tests/noresponders_test.ts create mode 100644 jetstream/src/jserrors.ts diff --git a/TODO.md b/TODO.md index 6c6f0f4c..d7a9b673 100644 --- a/TODO.md +++ b/TODO.md @@ -9,3 +9,5 @@ - headers_only is needed in Consumer - add a test for next/fetch/consume where message size smaller than availablle + +- doc diff --git a/core/README.md b/core/README.md index 7061e021..d6068611 100644 --- a/core/README.md +++ b/core/README.md @@ -701,7 +701,7 @@ const sub = nc.subscribe("hello", { timeout: 1000 }); // handle the messages } })().catch((err) => { - if (err.code === ErrorCode.Timeout) { + if (err instanceof TimeoutError) { console.log(`sub timed out!`); } else { console.log(`sub iterator got an error!`); diff --git a/core/src/authenticator.ts b/core/src/authenticator.ts index 9df7ee41..955d755c 100644 --- a/core/src/authenticator.ts +++ b/core/src/authenticator.ts @@ -23,7 +23,6 @@ import type { TokenAuth, UserPass, } from "./core.ts"; -import { ErrorCode, NatsError } from "./core.ts"; export function multiAuthenticator(authenticators: Authenticator[]) { return (nonce?: string): Auth => { @@ -134,16 +133,13 @@ export function credsAuthenticator( // get the JWT let m = CREDS.exec(s); if (!m) { - throw NatsError.errorForCode(ErrorCode.BadCreds); + throw new Error("unable to parse credentials"); } const jwt = m[1].trim(); // get the nkey m = CREDS.exec(s); if (!m) { - throw NatsError.errorForCode(ErrorCode.BadCreds); - } - if (!m) { - throw NatsError.errorForCode(ErrorCode.BadCreds); + throw new Error("unable to parse credentials"); } const seed = TE.encode(m[1].trim()); diff --git a/core/src/bench.ts b/core/src/bench.ts index de15d34c..2cbe5a96 100644 --- a/core/src/bench.ts +++ b/core/src/bench.ts @@ -17,7 +17,6 @@ import { Empty } from "./types.ts"; import { nuid } from "./nuid.ts"; import { deferred, Perf } from "./util.ts"; import type { NatsConnectionImpl } from "./nats.ts"; -import { ErrorCode, NatsError } from "./core.ts"; import type { NatsConnection } from "./core.ts"; export class Metric { @@ -132,10 +131,9 @@ export class Bench { this.nc.closed() .then((err) => { if (err) { - throw new NatsError( - `bench closed with an error: ${err.message}`, - ErrorCode.Unknown, - err, + throw new Error( + `bench closed with an error`, + { cause: err }, ); } }); diff --git a/core/src/core.ts b/core/src/core.ts index 4878511b..1a1ae41e 100644 --- a/core/src/core.ts +++ b/core/src/core.ts @@ -14,6 +14,7 @@ */ import { nuid } from "./nuid.ts"; +import { InvalidOptionError } from "./errors.ts"; /** * Events reported by the {@link NatsConnection#status} iterator. @@ -45,7 +46,7 @@ export enum DebugEvents { export enum ErrorCode { // emitted by the client - ApiError = "BAD API", + // ApiError = "BAD API", BadAuthentication = "BAD_AUTHENTICATION", BadCreds = "BAD_CREDS", BadHeader = "BAD_HEADER", @@ -95,21 +96,6 @@ export function isNatsError(err: NatsError | Error): err is NatsError { return typeof (err as NatsError).code === "string"; } -export interface ApiError { - /** - * HTTP like error code in the 300 to 500 range - */ - code: number; - /** - * A human friendly description of the error - */ - description: string; - /** - * The NATS error code unique to each kind of error - */ - err_code?: number; -} - export class Messages { messages: Map; @@ -138,6 +124,21 @@ export class Messages { // safari doesn't support static class members const messages: Messages = new Messages(); +export type ApiError = { + /** + * Status code + */ + code: number; + /** + * JetStream Error Code + */ + err_code: number; + /** + * Error description + */ + description: string; +}; + export class NatsError extends Error { // TODO: on major version this should change to a number/enum code: string; @@ -193,7 +194,7 @@ export class NatsError extends Error { } } -export type MsgCallback = (err: NatsError | null, msg: T) => void; +export type MsgCallback = (err: Error | null, msg: T) => void; /** * Subscription Options @@ -232,6 +233,7 @@ export interface DnsResolveFn { export interface Status { type: Events | DebugEvents; data: string | ServersChanged | number; + error?: Error; permissionContext?: { operation: string; subject: string }; } @@ -811,7 +813,10 @@ export function createInbox(prefix = ""): string { prefix.split(".") .forEach((v) => { if (v === "*" || v === ">") { - throw new Error(`inbox prefixes cannot have wildcards '${prefix}'`); + throw InvalidOptionError.illegalArgument( + "prefix", + `cannot have wildcards ('${prefix}')`, + ); } }); return `${prefix}.${nuid.next()}`; @@ -824,7 +829,7 @@ export interface Request { resolver(err: Error | null, msg: Msg): void; - cancel(err?: NatsError): void; + cancel(err?: Error): void; } /** diff --git a/core/src/errors.ts b/core/src/errors.ts new file mode 100644 index 00000000..be019e36 --- /dev/null +++ b/core/src/errors.ts @@ -0,0 +1,195 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export class InvalidSubjectError extends Error { + constructor(subject: string, options?: ErrorOptions) { + super(`illegal subject: '${subject}'`, options); + this.name = "InvalidSubjectError"; + } +} + +export class InvalidHeaderError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(`invalid header: ${message}`, options); + this.name = "InvalidHeaderError"; + } +} + +export class InvalidOptionError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "InvalidOptionError"; + } + + static exclusiveOptions(opts: string[]): InvalidOptionError { + const names = opts.map((o) => `'${o}'`).join(","); + return new InvalidOptionError(`options ${names} are mutually exclusive.`); + } + + static illegalArgument(name: string, message: string): InvalidOptionError { + return new InvalidOptionError(`argument '${name}' ${message}`); + } + + static illegalOption(prop: string, message: string): InvalidOptionError { + return new InvalidOptionError(`option '${prop}' ${message}`); + } +} + +export class InvalidOperationError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "InvalidOperationError"; + } +} + +export class UserAuthenticationExpiredError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "UserAuthenticationExpiredError"; + } + + static parse(s: string): UserAuthenticationExpiredError | null { + const ss = s.toLowerCase(); + if (ss.indexOf("user authentication expired") !== -1) { + return new UserAuthenticationExpiredError(s); + } + return null; + } +} + +export class AuthorizationError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "AuthorizationError"; + } + + static parse(s: string): AuthorizationError | null { + const messages = [ + "authorization violation", + "account authentication expired", + "authentication timeout", + ]; + + const ss = s.toLowerCase(); + + for (let i = 0; i < messages.length; i++) { + if (ss.indexOf(messages[i]) !== -1) { + return new AuthorizationError(s); + } + } + + return null; + } +} + +export class ClosedConnectionError extends Error { + constructor() { + super("closed connection"); + this.name = "ClosedConnectionError"; + } +} + +export class ConnectionDrainingError extends Error { + constructor() { + super("connection draining"); + this.name = "DrainingConnectionError"; + } +} + +export class ConnectionError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "ConnectionError"; + } +} + +export class ProtocolError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "ProtocolError"; + } +} + +export class RequestError extends Error { + constructor(message = "", options?: ErrorOptions) { + super(message, options); + this.name = "RequestError"; + } +} + +export class TimeoutError extends Error { + constructor(options?: ErrorOptions) { + super("timeout", options); + this.name = "TimeoutError"; + } +} + +export class NoRespondersError extends Error { + subject: string; + constructor(subject: string, options?: ErrorOptions) { + super(`no responders: '${subject}'`, options); + this.subject = subject; + this.name = "NoResponders"; + } +} + +export class ServerError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "ServerError"; + } +} + +export class PermissionViolationError extends Error { + operation: "publish" | "subscription"; + subject: string; + queue?: string; + + constructor( + message: string, + operation: "publish" | "subscription", + subject: string, + queue?: string, + options?: ErrorOptions, + ) { + super(message, options); + this.name = "PermissionViolationError"; + this.operation = operation; + this.subject = subject; + this.queue = queue; + } + + static parse(s: string): PermissionViolationError | null { + const t = s ? s.toLowerCase() : ""; + if (t.indexOf("permissions violation") === -1) { + return null; + } + let operation: "publish" | "subscription" = "publish"; + let subject = ""; + let queue: string | undefined = undefined; + const m = s.match(/(Publish|Subscription) to "(\S+)"/); + if (m) { + operation = m[1].toLowerCase() as "publish" | "subscription"; + subject = m[2]; + if (operation === "subscription") { + const qm = s.match(/using queue "(\S+)"/); + if (qm) { + queue = qm[1]; + } + } + } + return new PermissionViolationError(s, operation, subject, queue); + } +} diff --git a/core/src/headers.ts b/core/src/headers.ts index 07fddbc3..ccb76666 100644 --- a/core/src/headers.ts +++ b/core/src/headers.ts @@ -17,7 +17,8 @@ import { TD, TE } from "./encoders.ts"; import type { MsgHdrs } from "./core.ts"; -import { ErrorCode, Match, NatsError } from "./core.ts"; +import { Match } from "./core.ts"; +import { InvalidHeaderError } from "./errors.ts"; // https://www.ietf.org/rfc/rfc822.txt // 3.1.2. STRUCTURE OF HEADER FIELDS @@ -47,9 +48,8 @@ export function canonicalMIMEHeaderKey(k: string): string { for (let i = 0; i < k.length; i++) { let c = k.charCodeAt(i); if (c === colon || c < start || c > end) { - throw new NatsError( - `'${k[i]}' is not a valid character for a header key`, - ErrorCode.BadHeader, + throw new InvalidHeaderError( + `'${k[i]}' is not a valid character in a header name`, ); } if (upper && a <= c && c <= z) { @@ -170,9 +170,8 @@ export class MsgHdrsImpl implements MsgHdrs { static validHeaderValue(k: string): string { const inv = /[\r\n]/; if (inv.test(k)) { - throw new NatsError( - "invalid header value - \\r and \\n are not allowed.", - ErrorCode.BadHeader, + throw new InvalidHeaderError( + "values cannot contain \\r or \\n", ); } return k.trim(); diff --git a/core/src/internal_mod.ts b/core/src/internal_mod.ts index 8d801642..db8b4ba8 100644 --- a/core/src/internal_mod.ts +++ b/core/src/internal_mod.ts @@ -142,3 +142,18 @@ export { Base64Codec, Base64UrlCodec, Base64UrlPaddedCodec } from "./base64.ts"; export { SHA256 } from "./sha256.ts"; export { wsconnect, wsUrlParseFn } from "./ws_transport.ts"; + +export { + AuthorizationError, + ClosedConnectionError, + ConnectionDrainingError, + ConnectionError, + InvalidOptionError, + NoRespondersError, + PermissionViolationError, + ProtocolError, + RequestError, + ServerError, + TimeoutError, + UserAuthenticationExpiredError, +} from "./errors.ts"; diff --git a/core/src/mod.ts b/core/src/mod.ts index 70986628..6229ea92 100644 --- a/core/src/mod.ts +++ b/core/src/mod.ts @@ -14,10 +14,14 @@ */ export { + AuthorizationError, backoff, Bench, buildAuthenticator, canonicalMIMEHeaderKey, + ClosedConnectionError, + ConnectionDrainingError, + ConnectionError, createInbox, credsAuthenticator, deadline, @@ -28,6 +32,7 @@ export { ErrorCode, Events, headers, + InvalidOptionError, jwtAuthenticator, Match, Metric, @@ -37,11 +42,18 @@ export { NatsError, nkeyAuthenticator, nkeys, + NoRespondersError, Nuid, nuid, + PermissionViolationError, + ProtocolError, + RequestError, RequestStrategy, + ServerError, syncIterator, + TimeoutError, tokenAuthenticator, + UserAuthenticationExpiredError, usernamePasswordAuthenticator, wsconnect, } from "./internal_mod.ts"; diff --git a/core/src/msg.ts b/core/src/msg.ts index 9d5aef65..8689e075 100644 --- a/core/src/msg.ts +++ b/core/src/msg.ts @@ -23,17 +23,6 @@ import type { RequestInfo, ReviverFn, } from "./core.ts"; -import { ErrorCode, NatsError } from "./core.ts"; - -export function isRequestError(msg: Msg): NatsError | null { - // NATS core only considers errors 503s on messages that have no payload - // everything else simply forwarded as part of the message and is considered - // application level information - if (msg && msg.data.length === 0 && msg.headers?.code === 503) { - return NatsError.errorForCode(ErrorCode.NoResponders); - } - return null; -} export class MsgImpl implements Msg { _headers?: MsgHdrs; diff --git a/core/src/muxsubscription.ts b/core/src/muxsubscription.ts index f49bef73..7469f899 100644 --- a/core/src/muxsubscription.ts +++ b/core/src/muxsubscription.ts @@ -12,9 +12,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { isRequestError } from "./msg.ts"; import type { Msg, MsgCallback, Request } from "./core.ts"; -import { createInbox, ErrorCode, NatsError } from "./core.ts"; +import { createInbox } from "./core.ts"; +import { NoRespondersError, RequestError } from "./errors.ts"; + +import type { PermissionViolationError } from "./errors.ts"; export class MuxSubscription { baseInbox!: string; @@ -60,37 +62,39 @@ export class MuxSubscription { return Array.from(this.reqs.values()); } - handleError(isMuxPermissionError: boolean, err?: NatsError): boolean { - if (err && err.permissionContext) { - if (isMuxPermissionError) { - // one or more requests queued but mux cannot process them - this.all().forEach((r) => { - r.resolver(err, {} as Msg); - }); + handleError( + isMuxPermissionError: boolean, + err: PermissionViolationError, + ): boolean { + if (isMuxPermissionError) { + // one or more requests queued but mux cannot process them + this.all().forEach((r) => { + r.resolver(err, {} as Msg); + }); + return true; + } + if (err.operation === "publish") { + const req = this.all().find((s) => { + return s.requestSubject === err.subject; + }); + if (req) { + req.resolver(err, {} as Msg); return true; } - const ctx = err.permissionContext; - if (ctx.operation === "publish") { - const req = this.all().find((s) => { - return s.requestSubject === ctx.subject; - }); - if (req) { - req.resolver(err, {} as Msg); - return true; - } - } } return false; } dispatcher(): MsgCallback { - return (err: NatsError | null, m: Msg) => { + return (err: Error | null, m: Msg) => { const token = this.getToken(m); if (token) { const r = this.get(token); if (r) { - if (err === null && m.headers) { - err = isRequestError(m); + if (err === null) { + err = (m?.data?.length === 0 && m.headers?.code === 503) + ? new NoRespondersError(r.requestSubject) + : null; } r.resolver(err, m); } @@ -99,7 +103,7 @@ export class MuxSubscription { } close() { - const err = NatsError.errorForCode(ErrorCode.Timeout); + const err = new RequestError("connection closed"); this.reqs.forEach((req) => { req.resolver(err, {} as Msg); }); diff --git a/core/src/nats.ts b/core/src/nats.ts index 23b52333..4af18e25 100644 --- a/core/src/nats.ts +++ b/core/src/nats.ts @@ -16,7 +16,6 @@ import { deferred } from "./util.ts"; import { ProtocolHandler, SubscriptionImpl } from "./protocol.ts"; import { Empty } from "./encoders.ts"; -import { NatsError } from "./types.ts"; import type { Features, SemVer } from "./semver.ts"; import { parseSemVer } from "./semver.ts"; @@ -27,8 +26,7 @@ import { RequestMany, RequestOne } from "./request.ts"; import type { RequestManyOptionsInternal } from "./request.ts"; -import { isRequestError } from "./msg.ts"; -import { createInbox, ErrorCode, RequestStrategy } from "./core.ts"; +import { createInbox, RequestStrategy } from "./core.ts"; import type { Dispatcher } from "./core.ts"; import type { @@ -47,6 +45,14 @@ import type { Subscription, SubscriptionOptions, } from "./core.ts"; +import { + ClosedConnectionError, + ConnectionDrainingError, + InvalidOptionError, + InvalidSubjectError, + NoRespondersError, + RequestError, +} from "./errors.ts"; export class NatsConnectionImpl implements NatsConnection { options: ConnectionOptions; @@ -91,17 +97,17 @@ export class NatsConnectionImpl implements NatsConnection { _check(subject: string, sub: boolean, pub: boolean) { if (this.isClosed()) { - throw NatsError.errorForCode(ErrorCode.ConnectionClosed); + throw new ClosedConnectionError(); } if (sub && this.isDraining()) { - throw NatsError.errorForCode(ErrorCode.ConnectionDraining); + throw new ConnectionDrainingError(); } if (pub && this.protocol.noMorePublishing) { - throw NatsError.errorForCode(ErrorCode.ConnectionDraining); + throw new ConnectionDrainingError(); } subject = subject || ""; if (subject.length === 0) { - throw NatsError.errorForCode(ErrorCode.BadSubject); + throw new InvalidSubjectError(subject); } } @@ -185,7 +191,9 @@ export class NatsConnectionImpl implements NatsConnection { opts.strategy = opts.strategy || RequestStrategy.Timer; opts.maxWait = opts.maxWait || 1000; if (opts.maxWait < 1) { - return Promise.reject(new NatsError("timeout", ErrorCode.InvalidOption)); + return Promise.reject( + InvalidOptionError.illegalOption("timeout", "should be greater than 0"), + ); } // the iterator for user results @@ -218,9 +226,9 @@ export class NatsConnectionImpl implements NatsConnection { // we only expect runtime errors or a no responders if ( msg?.data?.length === 0 && - msg?.headers?.status === ErrorCode.NoResponders + msg?.headers?.status === "503" ) { - err = NatsError.errorForCode(ErrorCode.NoResponders); + err = new NoRespondersError(subject); } // augment any error with the current stack to provide context // for the error on the suer code @@ -292,7 +300,7 @@ export class NatsConnectionImpl implements NatsConnection { try { this.publish(subject, data, { reply: sub.getSubject() }); } catch (err) { - cancel(err as NatsError); + cancel(err as Error); } let timer = setTimeout(() => { @@ -328,7 +336,7 @@ export class NatsConnectionImpl implements NatsConnection { }, ); } catch (err) { - r.cancel(err as NatsError); + r.cancel(err as Error); } } @@ -348,14 +356,13 @@ export class NatsConnectionImpl implements NatsConnection { const asyncTraces = !(this.protocol.options.noAsyncTraces || false); opts.timeout = opts.timeout || 1000; if (opts.timeout < 1) { - return Promise.reject(new NatsError("timeout", ErrorCode.InvalidOption)); + return Promise.reject( + InvalidOptionError.illegalOption("timeout", `must be greater than 0`), + ); } if (!opts.noMux && opts.reply) { return Promise.reject( - new NatsError( - "reply can only be used with noMux", - ErrorCode.InvalidOption, - ), + InvalidOptionError.exclusiveOptions(["reply", "noMux"]), ); } @@ -364,31 +371,30 @@ export class NatsConnectionImpl implements NatsConnection { ? opts.reply : createInbox(this.options.inboxPrefix); const d = deferred(); - const errCtx = asyncTraces ? new Error() : null; + const errCtx = asyncTraces ? new RequestError("") : null; const sub = this.subscribe( inbox, { max: 1, timeout: opts.timeout, callback: (err, msg) => { + // check for no responders + if (msg && msg.data?.length === 0 && msg.headers?.code === 503) { + err = new NoRespondersError(subject); + } if (err) { - // timeouts from `timeout()` will have the proper stack - if (errCtx && err.code !== ErrorCode.Timeout) { - err.stack += `\n\n${errCtx.stack}`; + // if we have a context, use that as the wrapper + if (errCtx) { + errCtx.message = err.message; + errCtx.cause = err; + err = errCtx; + } else { + err = new RequestError(err.message, { cause: err }); } - sub.unsubscribe(); d.reject(err); + sub.unsubscribe(); } else { - err = isRequestError(msg); - if (err) { - // if we failed here, help the developer by showing what failed - if (errCtx) { - err.stack += `\n\n${errCtx.stack}`; - } - d.reject(err); - } else { - d.resolve(msg); - } + d.resolve(msg); } }, }, @@ -418,7 +424,7 @@ export class NatsConnectionImpl implements NatsConnection { }, ); } catch (err) { - r.cancel(err as NatsError); + r.cancel(err as Error); } const p = Promise.race([r.timer, r.deferred]); @@ -435,23 +441,17 @@ export class NatsConnectionImpl implements NatsConnection { */ flush(): Promise { if (this.isClosed()) { - return Promise.reject( - NatsError.errorForCode(ErrorCode.ConnectionClosed), - ); + return Promise.reject(new ClosedConnectionError()); } return this.protocol.flush(); } drain(): Promise { if (this.isClosed()) { - return Promise.reject( - NatsError.errorForCode(ErrorCode.ConnectionClosed), - ); + return Promise.reject(new ClosedConnectionError()); } if (this.isDraining()) { - return Promise.reject( - NatsError.errorForCode(ErrorCode.ConnectionDraining), - ); + return Promise.reject(new ConnectionDrainingError()); } this.draining = true; return this.protocol.drain(); @@ -509,8 +509,11 @@ export class NatsConnectionImpl implements NatsConnection { } async rtt(): Promise { - if (!this.protocol._closed && !this.protocol.connected) { - throw NatsError.errorForCode(ErrorCode.Disconnect); + if (this.isClosed()) { + throw new ClosedConnectionError(); + } + if (!this.protocol.connected) { + throw new RequestError("connection disconnected"); } const start = Date.now(); await this.flush(); @@ -523,14 +526,10 @@ export class NatsConnectionImpl implements NatsConnection { reconnect(): Promise { if (this.isClosed()) { - return Promise.reject( - NatsError.errorForCode(ErrorCode.ConnectionClosed), - ); + return Promise.reject(new ClosedConnectionError()); } if (this.isDraining()) { - return Promise.reject( - NatsError.errorForCode(ErrorCode.ConnectionDraining), - ); + return Promise.reject(new ConnectionDrainingError()); } return this.protocol.reconnect(); } diff --git a/core/src/options.ts b/core/src/options.ts index 62388f03..92f51253 100644 --- a/core/src/options.ts +++ b/core/src/options.ts @@ -16,13 +16,14 @@ import { extend } from "./util.ts"; import { defaultPort, getResolveFn } from "./transport.ts"; import type { Authenticator, ConnectionOptions, ServerInfo } from "./core.ts"; -import { createInbox, DEFAULT_HOST, ErrorCode, NatsError } from "./core.ts"; +import { createInbox, DEFAULT_HOST } from "./core.ts"; import { multiAuthenticator, noAuthFn, tokenAuthenticator, usernamePasswordAuthenticator, } from "./authenticator.ts"; +import { ConnectionError, InvalidOptionError } from "./errors.ts"; export const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10; export const DEFAULT_JITTER = 100; @@ -83,10 +84,7 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { } if (opts.servers.length > 0 && opts.port) { - throw new NatsError( - "port and servers options are mutually exclusive", - ErrorCode.InvalidOption, - ); + throw InvalidOptionError.exclusiveOptions(["servers", "port"]); } if (opts.servers.length === 0 && opts.port) { @@ -101,10 +99,7 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { ["reconnectDelayHandler", "authenticator"].forEach((n) => { if (options[n] && typeof options[n] !== "function") { - throw new NatsError( - `${n} option should be a function`, - ErrorCode.NotFunction, - ); + throw InvalidOptionError.illegalOption(n, "must be a function"); } }); @@ -122,11 +117,7 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { } if (options.inboxPrefix) { - try { - createInbox(options.inboxPrefix); - } catch (err) { - throw new NatsError((err as Error).message, ErrorCode.ApiError); - } + createInbox(options.inboxPrefix); } // if not set - we set it @@ -137,9 +128,9 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { if (options.resolve) { if (typeof getResolveFn() !== "function") { - throw new NatsError( - `'resolve' is not supported on this client`, - ErrorCode.InvalidOption, + throw InvalidOptionError.illegalOption( + "resolve", + "not supported by this client implementation", ); } } @@ -151,16 +142,16 @@ export function checkOptions(info: ServerInfo, options: ConnectionOptions) { const { proto, tls_required: tlsRequired, tls_available: tlsAvailable } = info; if ((proto === undefined || proto < 1) && options.noEcho) { - throw new NatsError("noEcho", ErrorCode.ServerOptionNotAvailable); + throw new ConnectionError(`option 'noEcho' is not available`); } const tls = tlsRequired || tlsAvailable || false; if (options.tls && !tls) { - throw new NatsError("tls", ErrorCode.ServerOptionNotAvailable); + throw new ConnectionError(`option 'tls' is not available`); } } -export function checkUnsupportedOption(prop: string, v?: string) { +export function checkUnsupportedOption(prop: string, v?: unknown) { if (v) { - throw new NatsError(prop, ErrorCode.InvalidOption); + throw InvalidOptionError.illegalOption(prop, "is not supported"); } } diff --git a/core/src/protocol.ts b/core/src/protocol.ts index 8d96ffff..96ac7fb1 100644 --- a/core/src/protocol.ts +++ b/core/src/protocol.ts @@ -13,24 +13,22 @@ * limitations under the License. */ import { decode, Empty, encode, TE } from "./encoders.ts"; -import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts"; import type { Transport } from "./transport.ts"; -import { deferred, delay, extend, timeout } from "./util.ts"; +import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts"; import type { Deferred, Timeout } from "./util.ts"; +import { deferred, delay, extend, timeout } from "./util.ts"; import { DataBuffer } from "./databuffer.ts"; -import { Servers } from "./servers.ts"; import type { ServerImpl } from "./servers.ts"; +import { Servers } from "./servers.ts"; import { QueuedIteratorImpl } from "./queued_iterator.ts"; import type { MsgHdrsImpl } from "./headers.ts"; import { MuxSubscription } from "./muxsubscription.ts"; -import { Heartbeat } from "./heartbeats.ts"; import type { PH } from "./heartbeats.ts"; +import { Heartbeat } from "./heartbeats.ts"; import type { MsgArg, ParserEvent } from "./parser.ts"; import { Kind, Parser } from "./parser.ts"; import { MsgImpl } from "./msg.ts"; import { Features, parseSemVer } from "./semver.ts"; -import { DebugEvents, ErrorCode, Events, NatsError } from "./core.ts"; - import type { ConnectionOptions, Dispatcher, @@ -45,12 +43,25 @@ import type { Subscription, SubscriptionOptions, } from "./core.ts"; +import { DebugEvents, Events } from "./core.ts"; import { DEFAULT_MAX_PING_OUT, DEFAULT_PING_INTERVAL, DEFAULT_RECONNECT_TIME_WAIT, } from "./options.ts"; +import { + AuthorizationError, + ClosedConnectionError, + ConnectionError, + InvalidOperationError, + InvalidOptionError, + PermissionViolationError, + ProtocolError, + RequestError, + ServerError, + UserAuthenticationExpiredError, +} from "./errors.ts"; const FLUSH_THRESHOLD = 1024 * 32; @@ -154,7 +165,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl } } - callback(err: NatsError | null, msg: Msg) { + callback(err: Error | null, msg: Msg) { this.cancelTimeout(); err ? this.stop(err) : this.push(msg); } @@ -195,10 +206,12 @@ export class SubscriptionImpl extends QueuedIteratorImpl drain(): Promise { if (this.protocol.isClosed()) { - return Promise.reject(NatsError.errorForCode(ErrorCode.ConnectionClosed)); + return Promise.reject(new ClosedConnectionError()); } if (this.isClosed()) { - return Promise.reject(NatsError.errorForCode(ErrorCode.SubClosed)); + return Promise.reject( + new InvalidOperationError("subscription is already closed"), + ); } if (!this.drained) { this.draining = true; @@ -289,28 +302,26 @@ export class Subscriptions { } } - handleError(err?: NatsError): boolean { - if (err && err.permissionContext) { - const ctx = err.permissionContext; - const subs = this.all(); - let sub; - if (ctx.operation === "subscription") { - sub = subs.find((s) => { - return s.subject === ctx.subject && s.queue === ctx.queue; - }); - } else if (ctx.operation === "publish") { - // we have a no mux subscription - sub = subs.find((s) => { - return s.requestSubject === ctx.subject; - }); - } - if (sub) { - sub.callback(err, {} as Msg); - sub.close(); - this.subs.delete(sub.sid); - return sub !== this.mux; - } + handleError(err: PermissionViolationError): boolean { + const subs = this.all(); + let sub; + if (err.operation === "subscription") { + sub = subs.find((s) => { + return s.subject === err.subject && s.queue === err.queue; + }); + } else if (err.operation === "publish") { + // we have a no mux subscription + sub = subs.find((s) => { + return s.requestSubject === err.subject; + }); } + if (sub) { + sub.callback(err, {} as Msg); + sub.close(); + this.subs.delete(sub.sid); + return sub !== this.mux; + } + return false; } @@ -345,7 +356,7 @@ export class ProtocolHandler implements Dispatcher { outBytes: number; inBytes: number; pendingLimit: number; - lastError?: NatsError; + lastError?: Error; abortReconnect: boolean; whyClosed: string; @@ -402,7 +413,7 @@ export class ProtocolHandler implements Dispatcher { this.pongs = []; // reject the pongs - the disconnect from here shouldn't have a trace // because that confuses API consumers - const err = NatsError.errorForCode(ErrorCode.Disconnect); + const err = new RequestError("connection disconnected"); err.stack = ""; pongs.forEach((p) => { p.reject(err); @@ -492,7 +503,7 @@ export class ProtocolHandler implements Dispatcher { // two of these, and the default for the client will be to // close, rather than attempt again - possibly they have an // authenticator that dynamically updates - if (this.lastError?.code === ErrorCode.AuthenticationExpired) { + if (this.lastError instanceof UserAuthenticationExpiredError) { this.lastError = undefined; } }) @@ -607,7 +618,7 @@ export class ProtocolHandler implements Dispatcher { } else if (this.lastError) { throw this.lastError; } else { - throw NatsError.errorForCode(ErrorCode.ConnectionRefused); + throw new ConnectionError("connection refused"); } } const now = Date.now(); @@ -646,40 +657,20 @@ export class ProtocolHandler implements Dispatcher { return h; } - static toError(s: string): NatsError { - const t = s ? s.toLowerCase() : ""; - if (t.indexOf("permissions violation") !== -1) { - const err = new NatsError(s, ErrorCode.PermissionsViolation); - const m = s.match(/(Publish|Subscription) to "(\S+)"/); - if (m) { - const operation = m[1].toLowerCase(); - const subject = m[2]; - let queue = undefined; - - if (operation === "subscription") { - const qm = s.match(/using queue "(\S+)"/); - if (qm) { - queue = qm[1]; - } - } - err.permissionContext = { - operation, - subject, - queue, - }; - } + static toError(s: string): Error { + let err: Error | null = PermissionViolationError.parse(s); + if (err) { + return err; + } + err = UserAuthenticationExpiredError.parse(s); + if (err) { return err; - } else if (t.indexOf("authorization violation") !== -1) { - return new NatsError(s, ErrorCode.AuthorizationViolation); - } else if (t.indexOf("user authentication expired") !== -1) { - return new NatsError(s, ErrorCode.AuthenticationExpired); - } else if (t.indexOf("account authentication expired") != -1) { - return new NatsError(s, ErrorCode.AccountExpired); - } else if (t.indexOf("authentication timeout") !== -1) { - return new NatsError(s, ErrorCode.AuthenticationTimeout); - } else { - return new NatsError(s, ErrorCode.ProtocolError); } + err = AuthorizationError.parse(s); + if (err) { + return err; + } + return new ProtocolError(s); } processMsg(msg: MsgArg, data: Uint8Array) { @@ -705,44 +696,48 @@ export class ProtocolHandler implements Dispatcher { } processError(m: Uint8Array) { - const s = decode(m); + let s = decode(m); + if (s.startsWith("'") && s.endsWith("'")) { + s = s.slice(1, s.length - 1); + } const err = ProtocolHandler.toError(s); - const status: Status = { type: Events.Error, data: err.code }; - if (err.isPermissionError()) { - let isMuxPermissionError = false; - if (err.permissionContext) { - status.permissionContext = err.permissionContext; + + switch (err.constructor) { + case PermissionViolationError: { + const pe = err as PermissionViolationError; const mux = this.subscriptions.getMux(); - isMuxPermissionError = mux?.subject === err.permissionContext.subject; - } - this.subscriptions.handleError(err); - this.muxSubscriptions.handleError(isMuxPermissionError, err); - if (isMuxPermissionError) { - // remove the permission - enable it to be recreated - this.subscriptions.setMux(null); + const isMuxPermission = mux ? pe.subject === mux.subject : false; + this.subscriptions.handleError(pe); + this.muxSubscriptions.handleError(isMuxPermission, pe); + if (isMuxPermission) { + // remove the permission - enable it to be recreated + this.subscriptions.setMux(null); + } + break; } } - this.dispatchStatus(status); + + this.dispatchStatus({ type: Events.Error, error: err, data: err.message }); this.handleError(err); } - handleError(err: NatsError) { - if (err.isAuthError()) { + handleError(err: Error) { + if ( + err instanceof UserAuthenticationExpiredError || + err instanceof AuthorizationError + ) { this.handleAuthError(err); - } else if (err.isProtocolError()) { - this.lastError = err; - } else if (err.isAuthTimeout()) { - this.lastError = err; } - // fallthrough here - if (!err.isPermissionError()) { + + if (!(err instanceof PermissionViolationError)) { this.lastError = err; } } - handleAuthError(err: NatsError) { + handleAuthError(err: UserAuthenticationExpiredError | AuthorizationError) { if ( - (this.lastError && err.code === this.lastError.code) && + (this.lastError instanceof UserAuthenticationExpiredError || + this.lastError instanceof AuthorizationError) && this.options.ignoreAuthErrorAbort === false ) { this.abortReconnect = true; @@ -862,14 +857,17 @@ export class ProtocolHandler implements Dispatcher { subject: string, payload: Payload = Empty, options?: PublishOptions, - ) { + ): void { let data; if (payload instanceof Uint8Array) { data = payload; } else if (typeof payload === "string") { data = TE.encode(payload); } else { - throw NatsError.errorForCode(ErrorCode.BadPayload); + throw InvalidOptionError.illegalArgument( + "payload", + "supported payloads types are strings or Uint8Array", + ); } let len = data.length; @@ -880,7 +878,10 @@ export class ProtocolHandler implements Dispatcher { let hlen = 0; if (options.headers) { if (this.info && !this.info.headers) { - throw new NatsError("headers", ErrorCode.ServerOptionNotAvailable); + throw InvalidOptionError.illegalOption( + "headers", + "are not supported by the current server", + ); } const hdrs = options.headers as MsgHdrsImpl; headers = hdrs.encode(); @@ -889,7 +890,7 @@ export class ProtocolHandler implements Dispatcher { } if (this.info && len > this.info.max_payload) { - throw NatsError.errorForCode(ErrorCode.MaxPayloadExceeded); + throw new ServerError(`server 'max_payload' exceeded`); } this.outBytes += len; this.outMsgs++; @@ -940,14 +941,14 @@ export class ProtocolHandler implements Dispatcher { return s; } - unsubscribe(s: SubscriptionImpl, max?: number) { + unsubscribe(s: SubscriptionImpl, max?: number): void { this.unsub(s, max); if (s.max === undefined || s.received >= s.max) { this.subscriptions.cancel(s); } } - unsub(s: SubscriptionImpl, max?: number) { + unsub(s: SubscriptionImpl, max?: number): void { if (!s || this.isClosed()) { return; } @@ -959,7 +960,7 @@ export class ProtocolHandler implements Dispatcher { s.max = max; } - resub(s: SubscriptionImpl, subject: string) { + resub(s: SubscriptionImpl, subject: string): void { if (!s || this.isClosed()) { return; } @@ -981,7 +982,7 @@ export class ProtocolHandler implements Dispatcher { return p; } - sendSubscriptions() { + sendSubscriptions(): void { const cmds: string[] = []; this.subscriptions.all().forEach((s) => { const sub = s as SubscriptionImpl; @@ -1024,21 +1025,21 @@ export class ProtocolHandler implements Dispatcher { return this._closed; } - drain(): Promise { + async drain(): Promise { const subs = this.subscriptions.all(); const promises: Promise[] = []; subs.forEach((sub: Subscription) => { promises.push(sub.drain()); }); - return Promise.all(promises) - .then(async () => { - this.noMorePublishing = true; - await this.flush(); - return this.close(); - }) - .catch(() => { - // cannot happen - }); + try { + await Promise.allSettled(promises); + } catch { + // nothing we can do here + } finally { + this.noMorePublishing = true; + await this.flush(); + } + return this.close(); } private flushPending() { diff --git a/core/src/queued_iterator.ts b/core/src/queued_iterator.ts index e5d224a2..f7c45b5c 100644 --- a/core/src/queued_iterator.ts +++ b/core/src/queued_iterator.ts @@ -15,8 +15,8 @@ import type { Deferred } from "./util.ts"; import { deferred } from "./util.ts"; import type { QueuedIterator } from "./core.ts"; -import { ErrorCode, NatsError } from "./core.ts"; import type { CallbackFn, Dispatcher } from "./core.ts"; +import { InvalidOperationError } from "./errors.ts"; export class QueuedIteratorImpl implements QueuedIterator, Dispatcher { inflight: number; @@ -83,10 +83,12 @@ export class QueuedIteratorImpl implements QueuedIterator, Dispatcher { async *iterate(): AsyncIterableIterator { if (this.noIterator) { - throw new NatsError("unsupported iterator", ErrorCode.ApiError); + throw new InvalidOperationError( + "iterator cannot be used when a callback is registered", + ); } if (this.yielding) { - throw new NatsError("already yielding", ErrorCode.ApiError); + throw new InvalidOperationError("iterator is already yielding"); } this.yielding = true; try { diff --git a/core/src/request.ts b/core/src/request.ts index 97579224..7ae3c933 100644 --- a/core/src/request.ts +++ b/core/src/request.ts @@ -22,12 +22,13 @@ import type { RequestManyOptions, RequestOptions, } from "./core.ts"; -import { ErrorCode, NatsError, RequestStrategy } from "./core.ts"; +import { RequestStrategy } from "./core.ts"; +import { RequestError } from "./errors.ts"; export class BaseRequest { token: string; received: number; - ctx?: Error; + ctx?: RequestError; requestSubject: string; mux: MuxSubscription; @@ -41,7 +42,7 @@ export class BaseRequest { this.received = 0; this.token = nuid.next(); if (asyncTraces) { - this.ctx = new Error(); + this.ctx = new RequestError(); } } } @@ -87,7 +88,7 @@ export class RequestMany extends BaseRequest implements Request { }, opts.maxWait); } - cancel(err?: NatsError): void { + cancel(err?: Error): void { if (err) { this.callback(err, null); } @@ -101,7 +102,7 @@ export class RequestMany extends BaseRequest implements Request { if (this.ctx) { err.stack += `\n\n${this.ctx.stack}`; } - this.cancel(err as NatsError); + this.cancel(err as Error); } else { this.callback(null, msg); if (this.opts.strategy === RequestStrategy.Count) { @@ -150,7 +151,11 @@ export class RequestOne extends BaseRequest implements Request { } if (err) { if (this.ctx) { - err.stack += `\n\n${this.ctx.stack}`; + this.ctx.message = err.message; + this.ctx.cause = err; + err = this.ctx; + } else { + err = new RequestError(err.message, { cause: err }); } this.deferred.reject(err); } else { @@ -159,13 +164,13 @@ export class RequestOne extends BaseRequest implements Request { this.cancel(); } - cancel(err?: NatsError): void { + cancel(err?: Error): void { if (this.timer) { this.timer.cancel(); } this.mux.cancel(this); this.deferred.reject( - err ? err : NatsError.errorForCode(ErrorCode.Cancelled), + err ? err : new RequestError("cancelled"), ); } } diff --git a/core/src/util.ts b/core/src/util.ts index 37f66153..d512e85e 100644 --- a/core/src/util.ts +++ b/core/src/util.ts @@ -15,7 +15,7 @@ // deno-lint-ignore-file no-explicit-any import { TD } from "./encoders.ts"; import type { Nanos } from "./core.ts"; -import { ErrorCode, NatsError } from "./core.ts"; +import { TimeoutError } from "./errors.ts"; export type ValueResult = { isError: false; @@ -67,7 +67,7 @@ export interface Timeout extends Promise { export function timeout(ms: number, asyncTraces = true): Timeout { // by generating the stack here to help identify what timed out - const err = asyncTraces ? NatsError.errorForCode(ErrorCode.Timeout) : null; + const err = asyncTraces ? new TimeoutError() : null; let methods; let timer: number; const p = new Promise((_resolve, reject) => { @@ -80,7 +80,7 @@ export function timeout(ms: number, asyncTraces = true): Timeout { // @ts-ignore: node is not a number timer = setTimeout(() => { if (err === null) { - reject(NatsError.errorForCode(ErrorCode.Timeout)); + reject(new TimeoutError()); } else { reject(err); } diff --git a/core/src/ws_transport.ts b/core/src/ws_transport.ts index ce1647c8..852d82bc 100644 --- a/core/src/ws_transport.ts +++ b/core/src/ws_transport.ts @@ -19,7 +19,6 @@ import type { Server, ServerInfo, } from "./core.ts"; -import { ErrorCode, NatsError } from "./core.ts"; import type { Deferred } from "./util.ts"; import { deferred, delay, render } from "./util.ts"; import type { Transport, TransportFactory } from "./transport.ts"; @@ -29,6 +28,7 @@ import { DataBuffer } from "./databuffer.ts"; import { INFO } from "./protocol.ts"; import { NatsConnectionImpl } from "./nats.ts"; import { version } from "./version.ts"; +import { ConnectionError, InvalidOptionError } from "./errors.ts"; const VERSION = version; const LANG = "nats.ws"; @@ -155,11 +155,7 @@ export class WsTransport implements Transport { return; } const evt = e as ErrorEvent; - const err = new NatsError( - evt.message, - ErrorCode.Unknown, - new Error(evt.error), - ); + const err = new ConnectionError(evt.message); if (!connected) { ok.reject(err); } else { @@ -336,7 +332,10 @@ export function wsconnect( urlParseFn: wsUrlParseFn, factory: (): Transport => { if (opts.tls) { - throw new NatsError("tls", ErrorCode.InvalidOption); + throw InvalidOptionError.illegalOption( + "tls", + "is not configurable on w3c websocket connections", + ); } return new WsTransport(); }, diff --git a/core/tests/auth_test.ts b/core/tests/auth_test.ts index 89c1dd1d..92f3e656 100644 --- a/core/tests/auth_test.ts +++ b/core/tests/auth_test.ts @@ -12,7 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { _setup, assertErrorCode, cleanup, NatsServer } from "test_helpers"; +import { _setup, cleanup, NatsServer } from "test_helpers"; import { assert, assertArrayIncludes, @@ -32,7 +32,6 @@ import type { MsgImpl, NatsConnection, NatsConnectionImpl, - NatsError, NKeyAuth, Status, UserPass, @@ -43,14 +42,19 @@ import { DEFAULT_MAX_RECONNECT_ATTEMPTS, deferred, Empty, - ErrorCode, Events, jwtAuthenticator, nkeyAuthenticator, nkeys, + RequestError, tokenAuthenticator, usernamePasswordAuthenticator, } from "../src/internal_mod.ts"; +import { + AuthorizationError, + PermissionViolationError, + UserAuthenticationExpiredError, +} from "../src/errors.ts"; const conf = { authorization: { @@ -67,29 +71,33 @@ const conf = { Deno.test("auth - none", async () => { const ns = await NatsServer.start(conf); - try { - const nc = await connect( - { port: ns.port }, - ); - await nc.close(); - fail("shouldnt have been able to connect"); - } catch (ex) { - assertErrorCode(ex as NatsError, ErrorCode.AuthorizationViolation); - } + + await assertRejects( + async () => { + const nc = await connect( + { port: ns.port }, + ); + await nc.close(); + fail("shouldnt have been able to connect"); + }, + AuthorizationError, + ); + await ns.stop(); }); Deno.test("auth - bad", async () => { const ns = await NatsServer.start(conf); - try { - const nc = await connect( - { port: ns.port, user: "me", pass: "hello" }, - ); - await nc.close(); - fail("shouldnt have been able to connect"); - } catch (ex) { - assertErrorCode(ex as NatsError, ErrorCode.AuthorizationViolation); - } + await assertRejects( + async () => { + const nc = await connect( + { port: ns.port, user: "me", pass: "hello" }, + ); + await nc.close(); + fail("shouldnt have been able to connect"); + }, + AuthorizationError, + ); await ns.stop(); }); @@ -159,11 +167,12 @@ Deno.test("auth - sub no permissions keeps connection", async () => { }); const v = await Promise.all([errStatus, cbErr, sub.closed]); - assertEquals(v[0].data, ErrorCode.PermissionsViolation); + assertEquals(v[0].data, `Permissions Violation for Subscription to "bar"`); assertEquals( v[1]?.message, - "'Permissions Violation for Subscription to \"bar\"'", + `Permissions Violation for Subscription to "bar"`, ); + assertEquals(nc.isClosed(), false); await cleanup(ns, nc); @@ -200,10 +209,13 @@ Deno.test("auth - sub iterator no permissions keeps connection", async () => { await nc.flush(); const v = await Promise.all([errStatus, iterErr, sub.closed]); - assertEquals(v[0].data, ErrorCode.PermissionsViolation); + assertEquals( + v[0].data, + `Permissions Violation for Subscription to "bar"`, + ); assertEquals( v[1]?.message, - "'Permissions Violation for Subscription to \"bar\"'", + `Permissions Violation for Subscription to "bar"`, ); assertEquals(sub.isClosed(), true); assertEquals(nc.isClosed(), false); @@ -232,7 +244,7 @@ Deno.test("auth - pub permissions keep connection", async () => { nc.publish("bar"); const v = await errStatus; - assertEquals(v.data, ErrorCode.PermissionsViolation); + assertEquals(v.data, `Permissions Violation for Publish to "bar"`); assertEquals(nc.isClosed(), false); await cleanup(ns, nc); @@ -256,15 +268,16 @@ Deno.test("auth - req permissions keep connection", async () => { } })().then(); - const err = await assertRejects( + await assertRejects( async () => { await nc.request("bar"); }, - ) as NatsError; - assertEquals(err.code, ErrorCode.PermissionsViolation); + RequestError, + `Permissions Violation for Publish to "bar"`, + ); const v = await errStatus; - assertEquals(v.data, ErrorCode.PermissionsViolation); + assertEquals(v.data, `Permissions Violation for Publish to "bar"`); assertEquals(nc.isClosed(), false); await cleanup(ns, nc); @@ -436,18 +449,20 @@ Deno.test("auth - custom error", async () => { }); Deno.test("basics - bad auth", async () => { - try { - await connect( - { - servers: "connect.ngs.global", - waitOnFirstConnect: true, - user: "me", - pass: "you", - }, - ); - } catch (err) { - assertErrorCode(err as NatsError, ErrorCode.AuthorizationViolation); - } + await assertRejects( + () => { + return connect( + { + servers: "connect.ngs.global", + reconnect: false, + user: "me", + pass: "you", + }, + ); + }, + AuthorizationError, + "Authorization Violation", + ); }); Deno.test("auth - nkey authentication", async () => { @@ -549,21 +564,19 @@ Deno.test("auth - expiration is notified", async () => { const U = nkeys.createUser(); const ujwt = await encodeUser("U", U, A, { bearer_token: true }, { - exp: Math.round(Date.now() / 1000) + 3, + exp: Math.round(Date.now() / 1000) + 5, }); const nc = await connect({ port: ns.port, - maxReconnectAttempts: -1, + reconnect: false, authenticator: jwtAuthenticator(ujwt), }); let authErrors = 0; (async () => { for await (const s of nc.status()) { - if ( - s.type === Events.Error && s.data === ErrorCode.AuthenticationExpired - ) { + if (s.error instanceof UserAuthenticationExpiredError) { authErrors++; } } @@ -571,7 +584,8 @@ Deno.test("auth - expiration is notified", async () => { const err = await nc.closed(); assert(authErrors >= 1); - assertErrorCode(err!, ErrorCode.AuthenticationExpired); + assertExists(err); + assert(err instanceof UserAuthenticationExpiredError, err?.message); await cleanup(ns); }); @@ -628,7 +642,7 @@ Deno.test("auth - expiration is notified and recovered", async () => { } break; case Events.Error: - if (s.data === ErrorCode.AuthenticationExpired) { + if (s.error instanceof UserAuthenticationExpiredError) { authErrors++; } break; @@ -664,7 +678,7 @@ Deno.test("auth - bad auth is notified", async () => { (async () => { for await (const s of nc.status()) { if ( - s.type === Events.Error && s.data === ErrorCode.AuthorizationViolation + s.type === Events.Error && s.error instanceof AuthorizationError ) { badAuths++; } @@ -676,7 +690,7 @@ Deno.test("auth - bad auth is notified", async () => { const err = await nc.closed(); assert(badAuths > 1); - assertErrorCode(err!, ErrorCode.AuthorizationViolation); + assert(err instanceof AuthorizationError); await ns.stop(); }); @@ -721,32 +735,20 @@ Deno.test("auth - perm request error", async () => { const status = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.permissionContext?.operation === "publish" && - s.permissionContext?.subject === "q" - ) { - status.resolve(s); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if (pe.operation === "publish" && pe.subject === "q") { + status.resolve(s); + } } } })().then(); - const response = deferred(); - nc.request("q") - .catch((err) => { - response.resolve(err); - }); - - const [r, s] = await Promise.all([response, status]); - assertErrorCode(r, ErrorCode.PermissionsViolation); - const ne = r as NatsError; - assertEquals(ne.permissionContext?.operation, "publish"); - assertEquals(ne.permissionContext?.subject, "q"); - - assertEquals(s.type, Events.Error); - assertEquals(s.data, ErrorCode.PermissionsViolation); - assertEquals(s.permissionContext?.operation, "publish"); - assertEquals(s.permissionContext?.subject, "q"); + assertRejects(() => { + return nc.request("q"); + }, RequestError); + await status; await cleanup(ns, nc, sc); }); @@ -790,31 +792,22 @@ Deno.test("auth - perm request error no mux", async () => { const status = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.permissionContext?.operation === "publish" && - s.permissionContext?.subject === "q" - ) { - status.resolve(s); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if (pe.operation === "publish" && pe.subject === "q") { + status.resolve(s); + } } } })().then(); - const response = deferred(); - nc.request("q", Empty, { noMux: true, timeout: 1000 }) - .catch((err) => { - response.resolve(err); - }); - - const [r, s] = await Promise.all([response, status]); - assertErrorCode(r, ErrorCode.PermissionsViolation); - const ne = r as NatsError; - assertEquals(ne.permissionContext?.operation, "publish"); - assertEquals(ne.permissionContext?.subject, "q"); - - assertEquals(s.type, Events.Error); - assertEquals(s.data, ErrorCode.PermissionsViolation); - assertEquals(s.permissionContext?.operation, "publish"); - assertEquals(s.permissionContext?.subject, "q"); + await assertRejects( + () => { + return nc.request("q", Empty, { noMux: true, timeout: 1000 }); + }, + RequestError, + "q", + ); await cleanup(ns, nc, sc); }); @@ -859,11 +852,11 @@ Deno.test("auth - perm request error deliver to sub", async () => { const status = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.permissionContext?.operation === "publish" && - s.permissionContext?.subject === "q" - ) { - status.resolve(s); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if (pe.subject === "q" && pe.operation === "publish") { + status.resolve(); + } } } })().then(); @@ -874,22 +867,17 @@ Deno.test("auth - perm request error deliver to sub", async () => { }, }); - const response = deferred(); - nc.request("q", Empty, { noMux: true, reply: inbox, timeout: 1000 }) - .catch((err) => { - response.resolve(err); - }); - - const [r, s] = await Promise.all([response, status]); - assertErrorCode(r, ErrorCode.PermissionsViolation); - const ne = r as NatsError; - assertEquals(ne.permissionContext?.operation, "publish"); - assertEquals(ne.permissionContext?.subject, "q"); - - assertEquals(s.type, Events.Error); - assertEquals(s.data, ErrorCode.PermissionsViolation); - assertEquals(s.permissionContext?.operation, "publish"); - assertEquals(s.permissionContext?.subject, "q"); + await assertRejects( + () => { + return nc.request("q", Empty, { + noMux: true, + reply: inbox, + timeout: 1000, + }); + }, + RequestError, + `Permissions Violation for Publish to "q"`, + ); assertEquals(sub.isClosed(), false); @@ -932,13 +920,14 @@ Deno.test("auth - mux sub ok", async () => { }); await sc.flush(); - const response = deferred(); - nc.request("q") - .catch((err) => { - response.resolve(err); - }); - const ne = await response as NatsError; - assertEquals(ne.permissionContext?.operation, "subscription"); + await assertRejects( + () => { + return nc.request("q"); + }, + RequestError, + "Permissions Violation for Subscription", + ); + //@ts-ignore: test assertEquals(nc.protocol.subscriptions.getMux(), null); @@ -1001,34 +990,25 @@ Deno.test("auth - perm sub iterator error", async () => { const status = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.permissionContext?.operation === "subscription" && - s.permissionContext?.subject === "q" - ) { - status.resolve(s); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if (pe.subject === "q" && pe.operation === "publish") { + status.resolve(s); + } } } })().then(); const sub = nc.subscribe("q"); - const iterReject = deferred(); - (async () => { - for await (const _m of sub) { - // ignored - } - })().catch((err) => { - iterReject.resolve(err as NatsError); - }); - - const [s, i] = await Promise.all([status, iterReject]); - assertEquals(s.type, Events.Error); - assertEquals(s.data, ErrorCode.PermissionsViolation); - assertEquals(s.permissionContext?.operation, "subscription"); - assertEquals(s.permissionContext?.subject, "q"); - - assertEquals(i.code, ErrorCode.PermissionsViolation); - assertEquals(i.permissionContext?.operation, "subscription"); - assertEquals(i.permissionContext?.subject, "q"); + await assertRejects( + async () => { + for await (const _m of sub) { + // ignored + } + }, + PermissionViolationError, + `Permissions Violation for Subscription to "q"`, + ); await cleanup(ns, nc); }); @@ -1051,7 +1031,7 @@ Deno.test("auth - perm error is not in lastError", async () => { const nci = nc as NatsConnectionImpl; assertEquals(nci.protocol.lastError, undefined); - const d = deferred(); + const d = deferred(); nc.subscribe("q", { callback: (err) => { d.resolve(err); @@ -1060,7 +1040,7 @@ Deno.test("auth - perm error is not in lastError", async () => { const err = await d; assert(err !== null); - assertEquals(err?.isPermissionError(), true); + assert(err instanceof PermissionViolationError); assert(nci.protocol.lastError === undefined); await cleanup(ns, nc); @@ -1090,7 +1070,7 @@ Deno.test("auth - ignore auth error abort", async () => { let count = 0; (async () => { for await (const s of nc.status()) { - if (s.type === "error" && s.data === "AUTHORIZATION_VIOLATION") { + if (s.error instanceof AuthorizationError) { count++; } } @@ -1247,7 +1227,6 @@ Deno.test("auth - request context", async () => { }); const a = await connect({ user: "a", pass: "a", port: ns.port }); - console.log(await (a as NatsConnectionImpl).context()); await a.request("q.hello"); await cleanup(ns, nc, a); @@ -1276,7 +1255,7 @@ Deno.test("auth - sub queue permission", async () => { }, }); - const qBad = deferred(); + const qBad = deferred(); nc.subscribe("q", { queue: "bad", callback: (err, _msg) => { @@ -1292,7 +1271,7 @@ Deno.test("auth - sub queue permission", async () => { await qA; - assertEquals(err.code, ErrorCode.PermissionsViolation); + assert(err instanceof PermissionViolationError); assertStringIncludes(err.message, 'using queue "bad"'); await cleanup(ns, nc); }); @@ -1319,7 +1298,6 @@ Deno.test("auth - account expired", async () => { const ujwt = await encodeUser("U", U, A, { bearer_token: true }); const { ns, nc } = await _setup(connect, conf, { - debug: true, reconnect: false, authenticator: jwtAuthenticator(ujwt), }); @@ -1327,7 +1305,10 @@ Deno.test("auth - account expired", async () => { const d = deferred(); (async () => { for await (const s of nc.status()) { - if (s.type === Events.Error && s.data === ErrorCode.AccountExpired) { + if ( + s.error instanceof AuthorizationError && + s.data === "Account Authentication Expired" + ) { d.resolve(); break; } @@ -1336,7 +1317,8 @@ Deno.test("auth - account expired", async () => { const w = await nc.closed(); assertExists(w); - assertEquals((w as NatsError).code, ErrorCode.AccountExpired); + assert(w instanceof AuthorizationError); + assertEquals(w.message, "Account Authentication Expired"); await cleanup(ns, nc); }); diff --git a/core/tests/authenticator_test.ts b/core/tests/authenticator_test.ts index 93895080..41e03ab4 100644 --- a/core/tests/authenticator_test.ts +++ b/core/tests/authenticator_test.ts @@ -35,7 +35,7 @@ import type { NatsConnectionImpl, } from "../src/internal_mod.ts"; -import { assertEquals } from "jsr:@std/assert"; +import { assertEquals, assertThrows } from "jsr:@std/assert"; import { encodeAccount, encodeOperator, @@ -253,3 +253,13 @@ Deno.test("authenticator - creds fn", async () => { await testAuthenticatorFn(authenticator, conf); }); + +Deno.test("authenticator - bad creds", () => { + assertThrows( + () => { + credsAuthenticator(new TextEncoder().encode("hello"))(); + }, + Error, + "unable to parse credentials", + ); +}); diff --git a/core/tests/autounsub_test.ts b/core/tests/autounsub_test.ts index 353fe1b9..069da3ec 100644 --- a/core/tests/autounsub_test.ts +++ b/core/tests/autounsub_test.ts @@ -12,12 +12,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { assert, assertEquals } from "jsr:@std/assert"; +import { assertEquals, assertRejects } from "jsr:@std/assert"; -import { createInbox, Empty, ErrorCode } from "../src/internal_mod.ts"; +import { createInbox, Empty, RequestError } from "../src/internal_mod.ts"; import type { NatsConnectionImpl, Subscription } from "../src/internal_mod.ts"; import { _setup, cleanup, Lock } from "test_helpers"; import { connect } from "./connect.ts"; +import { TimeoutError } from "../src/errors.ts"; Deno.test("autounsub - max option", async () => { const { ns, nc } = await _setup(connect); @@ -198,16 +199,47 @@ Deno.test("autounsub - check cancelled request leaks", async () => { assertEquals(nci.protocol.subscriptions.size(), 1); assertEquals(nci.protocol.muxSubscriptions.size(), 1); + await assertRejects( + () => { + return rp; + }, + RequestError, + subj, + ); // the rejection should be timeout - const lock = Lock(); - rp.catch((rej) => { - assert( - rej?.code === ErrorCode.NoResponders || rej?.code === ErrorCode.Timeout, - ); - lock.unlock(); + + // mux subs should have pruned + assertEquals(nci.protocol.muxSubscriptions.size(), 0); + await cleanup(ns, nc); +}); + +Deno.test("autounsub - timeout cancelled request leaks", async () => { + const { ns, nc } = await _setup(connect); + const nci = nc as NatsConnectionImpl; + const subj = createInbox(); + + // should have no subscriptions + assertEquals(nci.protocol.subscriptions.size(), 0); + + nci.subscribe(subj, { + callback: () => { + // ignored so it times out + }, }); - await lock; + const rp = nc.request(subj, Empty, { timeout: 250 }); + + assertEquals(nci.protocol.subscriptions.size(), 2); + assertEquals(nci.protocol.muxSubscriptions.size(), 1); + + // the rejection should be timeout + await assertRejects( + () => { + return rp; + }, + TimeoutError, + ); + // mux subs should have pruned assertEquals(nci.protocol.muxSubscriptions.size(), 0); await cleanup(ns, nc); diff --git a/core/tests/basics_test.ts b/core/tests/basics_test.ts index 38c16f78..4a49eb26 100644 --- a/core/tests/basics_test.ts +++ b/core/tests/basics_test.ts @@ -17,20 +17,18 @@ import { assertArrayIncludes, assertEquals, assertExists, + assertInstanceOf, assertRejects, assertThrows, fail, } from "jsr:@std/assert"; -import { assertThrowsAsyncErrorCode } from "../../test_helpers/asserts.ts"; - import { collect, createInbox, deferred, delay, Empty, - ErrorCode, Feature, headers, isIP, @@ -42,20 +40,23 @@ import type { Msg, MsgHdrs, NatsConnectionImpl, - NatsError, Payload, Publisher, PublishOptions, SubscriptionImpl, } from "../src/internal_mod.ts"; -import { - _setup, - assertErrorCode, - cleanup, - Lock, - NatsServer, -} from "test_helpers"; +import { _setup, cleanup, Lock, NatsServer } from "test_helpers"; import { connect } from "./connect.ts"; +import { + ClosedConnectionError, + ConnectionError, + InvalidOptionError, + InvalidSubjectError, + ProtocolError, + RequestError, + ServerError, + TimeoutError, +} from "../src/errors.ts"; Deno.test("basics - connect port", async () => { const ns = await NatsServer.start(); @@ -86,13 +87,13 @@ Deno.test("basics - connect servers", async () => { }); Deno.test("basics - fail connect", async () => { - await connect({ servers: `127.0.0.1:32001` }) - .then(() => { - fail(); - }) - .catch((err) => { - assertErrorCode(err, ErrorCode.ConnectionRefused); - }); + await assertRejects( + () => { + return connect({ servers: `127.0.0.1:32001` }); + }, + ConnectionError, + "connection refused", + ); }); Deno.test("basics - publish", async () => { @@ -104,14 +105,14 @@ Deno.test("basics - publish", async () => { Deno.test("basics - no publish without subject", async () => { const { ns, nc } = await _setup(connect); - try { - nc.publish(""); - fail("should not be able to publish without a subject"); - } catch (err) { - assertEquals((err as NatsError).code, ErrorCode.BadSubject); - } finally { - await cleanup(ns, nc); - } + assertThrows( + () => { + nc.publish(""); + }, + InvalidSubjectError, + "illegal subject: ''", + ); + await cleanup(ns, nc); }); Deno.test("basics - pubsub", async () => { @@ -370,10 +371,13 @@ Deno.test("basics - request", async () => { Deno.test("basics - request no responders", async () => { const { ns, nc } = await _setup(connect); - const s = createInbox(); - await assertThrowsAsyncErrorCode(async () => { - await nc.request(s, Empty, { timeout: 100 }); - }, ErrorCode.NoResponders); + await assertRejects( + () => { + return nc.request("q", Empty, { timeout: 100 }); + }, + RequestError, + "no responders: 'q'", + ); await cleanup(ns, nc); }); @@ -381,9 +385,10 @@ Deno.test("basics - request timeout", async () => { const { ns, nc } = await _setup(connect); const s = createInbox(); nc.subscribe(s, { callback: () => {} }); - await assertThrowsAsyncErrorCode(async () => { - await nc.request(s, Empty, { timeout: 100 }); - }, ErrorCode.Timeout); + await assertRejects(() => { + return nc.request(s, Empty, { timeout: 100 }); + }, TimeoutError); + await cleanup(ns, nc); }); @@ -391,21 +396,20 @@ Deno.test("basics - request cancel rejects", async () => { const { ns, nc } = await _setup(connect); const nci = nc as NatsConnectionImpl; const s = createInbox(); - const lock = Lock(); - nc.request(s, Empty, { timeout: 1000 }) - .then(() => { - fail(); - }) - .catch((err) => { - assertEquals(err.code, ErrorCode.Cancelled); - lock.unlock(); - }); + const check = assertRejects( + () => { + return nc.request(s, Empty, { timeout: 1000 }); + }, + RequestError, + "cancelled", + ); nci.protocol.muxSubscriptions.reqs.forEach((v) => { v.cancel(); }); - await lock; + + await check; await cleanup(ns, nc); }); @@ -428,7 +432,7 @@ Deno.test("basics - old style requests", async () => { await cleanup(ns, nc); }); -Deno.test("basics - request with custom subject", async () => { +Deno.test("basics - reply can only be used with noMux", async () => { const { ns, nc } = await _setup(connect); nc.subscribe("q", { callback: (_err, msg) => { @@ -436,18 +440,14 @@ Deno.test("basics - request with custom subject", async () => { }, }); - try { - await nc.request( - "q", - Empty, - { reply: "bar", timeout: 1000 }, - ); + await assertRejects( + () => { + return nc.request("q", Empty, { reply: "bar", timeout: 1000 }); + }, + InvalidOptionError, + "options 'reply','noMux' are mutually exclusive.", + ); - fail("should have failed"); - } catch (err) { - const nerr = err as NatsError; - assertEquals(ErrorCode.InvalidOption, nerr.code); - } await cleanup(ns, nc); }); @@ -496,13 +496,12 @@ Deno.test("basics - request with headers and custom subject", async () => { Deno.test("basics - request requires a subject", async () => { const { ns, nc } = await _setup(connect); await assertRejects( - async () => { - //@ts-ignore: subject missing on purpose - await nc.request(); + () => { + //@ts-ignore: testing + return nc.request(); }, - Error, - "BAD_SUBJECT", - undefined, + InvalidSubjectError, + "illegal subject: ''", ); await cleanup(ns, nc); }); @@ -511,28 +510,24 @@ Deno.test("basics - closed returns error", async () => { const { ns, nc } = await _setup(connect, {}, { reconnect: false }); setTimeout(() => { (nc as NatsConnectionImpl).protocol.sendCommand("Y\r\n"); - }, 1000); - await nc.closed() - .then((v) => { - assertEquals((v as NatsError).code, ErrorCode.ProtocolError); - }); - + }, 100); + const done = await nc.closed(); + assertInstanceOf(done, ProtocolError); await cleanup(ns, nc); }); Deno.test("basics - subscription with timeout", async () => { const { ns, nc } = await _setup(connect); - const lock = Lock(1); const sub = nc.subscribe(createInbox(), { max: 1, timeout: 250 }); - (async () => { - for await (const _m of sub) { - // ignored - } - })().catch((err) => { - assertErrorCode(err, ErrorCode.Timeout); - lock.unlock(); - }); - await lock; + await assertRejects( + async () => { + for await (const _m of sub) { + // ignored + } + }, + TimeoutError, + "timeout", + ); await cleanup(ns, nc); }); @@ -592,20 +587,16 @@ Deno.test("basics - no mux requests create normal subs", async () => { Deno.test("basics - no mux requests timeout", async () => { const { ns, nc } = await _setup(connect); - const lock = Lock(); const subj = createInbox(); nc.subscribe(subj, { callback: () => {} }); + await assertRejects( + () => { + return nc.request(subj, Empty, { timeout: 500, noMux: true }); + }, + RequestError, + "timeout", + ); - await nc.request( - subj, - Empty, - { timeout: 1000, noMux: true }, - ) - .catch((err) => { - assertErrorCode(err, ErrorCode.Timeout); - lock.unlock(); - }); - await lock; await cleanup(ns, nc); }); @@ -633,11 +624,11 @@ Deno.test("basics - no mux request timeout doesn't leak subs", async () => { assertEquals(nci.protocol.subscriptions.size(), 1); await assertRejects( - async () => { - await nc.request("q", Empty, { noMux: true, timeout: 1000 }); + () => { + return nc.request("q", Empty, { noMux: true, timeout: 1000 }); }, - Error, - "TIMEOUT", + RequestError, + "timeout", ); assertEquals(nci.protocol.subscriptions.size(), 1); @@ -649,14 +640,9 @@ Deno.test("basics - no mux request no responders doesn't leak subs", async () => const nci = nc as NatsConnectionImpl; assertEquals(nci.protocol.subscriptions.size(), 0); - - await assertRejects( - async () => { - await nc.request("q", Empty, { noMux: true, timeout: 1000 }); - }, - Error, - "503", - ); + await assertRejects(() => { + return nc.request("q", Empty, { noMux: true, timeout: 500 }); + }); assertEquals(nci.protocol.subscriptions.size(), 0); await cleanup(ns, nc); @@ -704,42 +690,73 @@ Deno.test("basics - no mux request no perms doesn't leak subs", async () => { await cleanup(ns, nc); }); -Deno.test("basics - no max_payload messages", async () => { +Deno.test("basics - max_payload errors", async () => { const { ns, nc } = await _setup(connect, { max_payload: 2048 }); const nci = nc as NatsConnectionImpl; assert(nci.protocol.info); const big = new Uint8Array(nci.protocol.info.max_payload + 1); - const subj = createInbox(); - try { - nc.publish(subj, big); - fail(); - } catch (err) { - assertErrorCode(err as NatsError, ErrorCode.MaxPayloadExceeded); - } + assertThrows( + () => { + nc.publish("foo", big); + }, + ServerError, + `'max_payload' exceeded`, + ); - try { - await nc.request(subj, big).then(); - fail(); - } catch (err) { - assertErrorCode(err as NatsError, ErrorCode.MaxPayloadExceeded); - } + assertRejects( + () => { + return nc.request("foo", big); + }, + ServerError, + `'max_payload' exceeded`, + ); - const sub = nc.subscribe(subj); - (async () => { - for await (const m of sub) { - m.respond(big); - fail(); - } - })().catch((err) => { - assertErrorCode(err, ErrorCode.MaxPayloadExceeded); + const d = deferred(); + setTimeout(() => { + nc.request("foo").catch((err) => { + d.reject(err); + }); }); - await nc.request(subj).then(() => { - fail(); - }).catch((err) => { - assertErrorCode(err, ErrorCode.Timeout); - }); + const sub = nc.subscribe("foo"); + + for await (const m of sub) { + assertThrows( + () => { + m.respond(big); + }, + ServerError, + `'max_payload' exceeded`, + ); + break; + } + + await assertRejects( + () => { + return d; + }, + TimeoutError, + "timeout", + ); + + await cleanup(ns, nc); +}); + +Deno.test("basics - close cancels requests", async () => { + const { ns, nc } = await _setup(connect); + nc.subscribe("q", { callback: () => {} }); + + const done = assertRejects( + () => { + return nc.request("q"); + }, + RequestError, + "connection closed", + ); + + await nc.close(); + await done; await cleanup(ns, nc); }); @@ -961,38 +978,34 @@ Deno.test("basics - port and server are mutually exclusive", async () => { await connect({ servers: "localhost", port: 4222 }); }, Error, - "port and servers options are mutually exclusive", + "options 'servers','port' are mutually exclusive.", undefined, ); }); Deno.test("basics - rtt", async () => { const { ns, nc } = await _setup(connect, {}, { - maxReconnectAttempts: 5, - reconnectTimeWait: 250, + maxReconnectAttempts: 1, + reconnectTimeWait: 750, }); const rtt = await nc.rtt(); assert(rtt >= 0); await ns.stop(); - await delay(500); + await assertRejects( - async () => { - await nc.rtt(); + () => { + return nc.rtt(); }, - Error, - ErrorCode.Disconnect, + RequestError, + "disconnected", ); await nc.closed(); - await assertRejects( - async () => { - await nc.rtt(); - }, - Error, - ErrorCode.ConnectionClosed, - ); + await assertRejects(() => { + return nc.rtt(); + }, ClosedConnectionError); }); Deno.test("basics - request many count", async () => { @@ -1224,7 +1237,8 @@ Deno.test("basics - initial connect error", async () => { // in deno we get the connection reset - but if running in CI this may turn out to be // a connection refused assertArrayIncludes(["ECONNRESET", "CONNECTION_REFUSED"], [ - (err as NatsError).code, + //@ts-ignore: exception has code + err.code, ]); } listener.close(); @@ -1244,16 +1258,16 @@ Deno.test("basics - inbox prefixes cannot have wildcards", async () => { async () => { await connect({ inboxPrefix: "_inbox.foo.>" }); }, - Error, - "inbox prefixes cannot have wildcards", + InvalidOptionError, + "argument 'prefix' cannot have wildcards", ); assertThrows( () => { createInbox("_inbox.foo.*"); }, - Error, - "inbox prefixes cannot have wildcards", + InvalidOptionError, + "argument 'prefix' cannot have wildcards", ); }); diff --git a/core/tests/drain_test.ts b/core/tests/drain_test.ts index fe692945..384dfb9b 100644 --- a/core/tests/drain_test.ts +++ b/core/tests/drain_test.ts @@ -12,16 +12,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { assert, assertEquals, fail } from "jsr:@std/assert"; -import { createInbox, ErrorCode } from "../src/internal_mod.ts"; -import type { Msg, NatsError } from "../src/internal_mod.ts"; import { - assertThrowsAsyncErrorCode, - assertThrowsErrorCode, - Lock, -} from "test_helpers"; + assert, + assertEquals, + assertRejects, + assertThrows, +} from "jsr:@std/assert"; +import { createInbox } from "../src/internal_mod.ts"; +import { Lock } from "test_helpers"; import { _setup, cleanup } from "test_helpers"; import { connect } from "./connect.ts"; +import { + ClosedConnectionError, + ConnectionDrainingError, + InvalidOperationError, +} from "../src/errors.ts"; Deno.test("drain - connection drains when no subs", async () => { const { ns, nc } = await _setup(connect); @@ -118,77 +123,43 @@ Deno.test("drain - publish after drain fails", async () => { nc.subscribe(subj); await nc.drain(); - assertThrowsErrorCode( - () => { - nc.publish(subj); - }, - ErrorCode.ConnectionClosed, - ErrorCode.ConnectionDraining, - ); + try { + nc.publish(subj); + } catch (err) { + assert( + err instanceof ClosedConnectionError || + err instanceof ConnectionDrainingError, + ); + } + await ns.stop(); }); Deno.test("drain - reject reqrep during connection drain", async () => { const { ns, nc } = await _setup(connect); - const nc2 = await connect({ port: ns.port }); - const lock = Lock(); - const subj = createInbox(); - // start a service for replies - await nc.subscribe(subj, { - callback: (_, msg: Msg) => { - if (msg.reply) { - msg.respond("ok"); - } - }, - }); - await nc.flush(); - - let first = true; - const done = Lock(); - await nc2.subscribe(subj, { - callback: async () => { - if (first) { - first = false; - nc2.drain() - .then(() => { - done.unlock(); - }); - try { - // should fail - await nc2.request(subj + "a"); - fail("shouldn't have been able to request"); - lock.unlock(); - } catch (err) { - assertEquals((err as NatsError).code, ErrorCode.ConnectionDraining); - lock.unlock(); - } - } - }, - }); - // publish a trigger for the drain and requests - nc2.publish(subj); - await nc2.flush(); - await lock; - await nc.close(); + const done = nc.drain(); + await assertRejects(() => { + return nc.request("foo"); + }, ConnectionDrainingError); await done; - await ns.stop(); + await cleanup(ns, nc); }); Deno.test("drain - reject drain on closed", async () => { const { ns, nc } = await _setup(connect); await nc.close(); - await assertThrowsAsyncErrorCode(async () => { - await nc.drain(); - }, ErrorCode.ConnectionClosed); + await assertRejects(() => { + return nc.drain(); + }, ClosedConnectionError); await ns.stop(); }); Deno.test("drain - reject drain on draining", async () => { const { ns, nc } = await _setup(connect); const done = nc.drain(); - await assertThrowsAsyncErrorCode(() => { + await assertRejects(() => { return nc.drain(); - }, ErrorCode.ConnectionDraining); + }, ConnectionDrainingError); await done; await ns.stop(); }); @@ -196,9 +167,10 @@ Deno.test("drain - reject drain on draining", async () => { Deno.test("drain - reject subscribe on draining", async () => { const { ns, nc } = await _setup(connect); const done = nc.drain(); - assertThrowsErrorCode(() => { + assertThrows(() => { return nc.subscribe("foo"); - }, ErrorCode.ConnectionDraining); + }, ConnectionDrainingError); + await done; await ns.stop(); }); @@ -207,9 +179,13 @@ Deno.test("drain - reject subscription drain on closed sub callback", async () = const { ns, nc } = await _setup(connect); const sub = nc.subscribe("foo", { callback: () => {} }); sub.unsubscribe(); - await assertThrowsAsyncErrorCode(() => { - return sub.drain(); - }, ErrorCode.SubClosed); + await assertRejects( + () => { + return sub.drain(); + }, + InvalidOperationError, + "subscription is already closed", + ); await nc.close(); await ns.stop(); }); @@ -217,13 +193,21 @@ Deno.test("drain - reject subscription drain on closed sub callback", async () = Deno.test("drain - reject subscription drain on closed sub iter", async () => { const { ns, nc } = await _setup(connect); const sub = nc.subscribe("foo"); + const d = (async () => { + for await (const _ of sub) { + // nothing + } + })().then(); + sub.unsubscribe(); - for await (const _m of sub) { - // nothing to do here - } - await assertThrowsAsyncErrorCode(() => { - return sub.drain(); - }, ErrorCode.SubClosed); + await d; + await assertRejects( + () => { + return sub.drain(); + }, + InvalidOperationError, + "subscription is already closed", + ); await nc.close(); await ns.stop(); }); @@ -240,9 +224,9 @@ Deno.test("drain - reject subscription drain on closed", async () => { const { ns, nc } = await _setup(connect); const sub = nc.subscribe("foo"); await nc.close(); - await assertThrowsAsyncErrorCode(() => { + await assertRejects(() => { return sub.drain(); - }, ErrorCode.ConnectionClosed); + }, ClosedConnectionError); await ns.stop(); }); diff --git a/core/tests/headers_test.ts b/core/tests/headers_test.ts index f104476d..2e6c5965 100644 --- a/core/tests/headers_test.ts +++ b/core/tests/headers_test.ts @@ -21,7 +21,6 @@ import { Match, MsgHdrsImpl, MsgImpl, - NatsError, Parser, } from "../src/internal_mod.ts"; import type { @@ -33,19 +32,28 @@ import { NatsServer } from "../../test_helpers/launcher.ts"; import { assert, assertEquals, assertThrows } from "jsr:@std/assert"; import { TestDispatcher } from "./parser_test.ts"; import { _setup, cleanup } from "test_helpers"; +import { InvalidHeaderError } from "../src/errors.ts"; Deno.test("headers - illegal key", () => { const h = headers(); ["bad:", "bad ", String.fromCharCode(127)].forEach((v) => { - assertThrows(() => { - h.set(v, "aaa"); - }, NatsError); + assertThrows( + () => { + h.set(v, "aaa"); + }, + InvalidHeaderError, + "is not a valid character in a header name", + ); }); ["\r", "\n"].forEach((v) => { - assertThrows(() => { - h.set("a", v); - }, NatsError); + assertThrows( + () => { + h.set("a", v); + }, + InvalidHeaderError, + "invalid header: values cannot contain \\r or \\n", + ); }); }); diff --git a/core/tests/iterators_test.ts b/core/tests/iterators_test.ts index da22bee9..4c5b2b42 100644 --- a/core/tests/iterators_test.ts +++ b/core/tests/iterators_test.ts @@ -13,18 +13,18 @@ * limitations under the License. */ import { connect } from "./connect.ts"; -import { assert, assertEquals, assertRejects } from "jsr:@std/assert"; -import { assertErrorCode, Lock, NatsServer } from "test_helpers"; +import { assertEquals, assertRejects } from "jsr:@std/assert"; +import { Lock, NatsServer } from "test_helpers"; import { createInbox, delay, - ErrorCode, nuid, QueuedIteratorImpl, syncIterator, } from "../src/internal_mod.ts"; import type { NatsConnectionImpl } from "../src/internal_mod.ts"; import { _setup, cleanup } from "test_helpers"; +import { InvalidOperationError } from "../src/errors.ts"; Deno.test("iterators - unsubscribe breaks and closes", async () => { const { ns, nc } = await _setup(connect); @@ -132,27 +132,21 @@ Deno.test("iterators - connection close closes", async () => { Deno.test("iterators - cb subs fail iterator", async () => { const { ns, nc } = await _setup(connect); const subj = createInbox(); - const lock = Lock(2); - const sub = nc.subscribe(subj, { - callback: (err, msg) => { - assert(err === null); - assert(msg); - lock.unlock(); + const sub = nc.subscribe(subj, { callback: () => {} }); + + await assertRejects( + async () => { + for await (const _ of sub) { + // nothing + } }, - }); + InvalidOperationError, + "iterator cannot be used when a callback is registered", + ); - (async () => { - for await (const _m of sub) { - lock.unlock(); - } - })().catch((err) => { - assertErrorCode(err, ErrorCode.ApiError); - lock.unlock(); - }); nc.publish(subj); await nc.flush(); await cleanup(ns, nc); - await lock; }); Deno.test("iterators - cb message counts", async () => { @@ -259,7 +253,7 @@ Deno.test("iterators - sync iterator", async () => { } }, Error, - "unsupported iterator", + "iterator cannot be used when a callback is registered", ); await cleanup(ns, nc); diff --git a/core/tests/json_test.ts b/core/tests/json_test.ts index 86126948..61e792b5 100644 --- a/core/tests/json_test.ts +++ b/core/tests/json_test.ts @@ -15,7 +15,7 @@ import { connect } from "./connect.ts"; import { assertEquals } from "jsr:@std/assert"; import { createInbox } from "../src/internal_mod.ts"; -import type { Msg, NatsError } from "../src/internal_mod.ts"; +import type { Msg } from "../src/internal_mod.ts"; import { Lock } from "test_helpers"; import { _setup, cleanup } from "test_helpers"; @@ -25,7 +25,7 @@ function macro(input: unknown) { const lock = Lock(); const subj = createInbox(); nc.subscribe(subj, { - callback: (err: NatsError | null, msg: Msg) => { + callback: (err: Error | null, msg: Msg) => { assertEquals(null, err); // in JSON undefined is translated to null if (input === undefined) { diff --git a/core/tests/mrequest_test.ts b/core/tests/mrequest_test.ts index 01f24603..630ad099 100644 --- a/core/tests/mrequest_test.ts +++ b/core/tests/mrequest_test.ts @@ -20,11 +20,11 @@ import { deferred, delay, Empty, - Events, RequestStrategy, } from "../src/internal_mod.ts"; import { assert, assertEquals, assertRejects, fail } from "jsr:@std/assert"; +import { NoRespondersError, PermissionViolationError } from "../src/errors.ts"; async function requestManyCount(noMux = false): Promise { const { ns, nc } = await _setup(connect, {}); @@ -257,8 +257,8 @@ async function requestManyStopsOnError(noMux = false): Promise { // do nothing } }, - Error, - "503", + NoRespondersError, + subj, ); await cleanup(ns, nc); } @@ -285,8 +285,11 @@ Deno.test("mreq - pub permission error", async () => { const d = deferred(); (async () => { for await (const s of nc.status()) { - if (s.type === Events.Error && s.permissionContext?.subject === "q") { - d.resolve(); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if (pe.subject === "q" && pe.operation === "publish") { + d.resolve(); + } } } })().then(); @@ -330,11 +333,13 @@ Deno.test("mreq - sub permission error", async () => { const d = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.type === Events.Error && - s.permissionContext?.operation === "subscription" - ) { - d.resolve(); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if ( + pe.operation === "subscription" && pe.subject.startsWith("_INBOX.") + ) { + d.resolve(); + } } } })().then(); @@ -351,7 +356,7 @@ Deno.test("mreq - sub permission error", async () => { // nothing; } }, - Error, + PermissionViolationError, "Permissions Violation for Subscription", ); await d; @@ -390,11 +395,13 @@ Deno.test("mreq - lost sub permission", async () => { const d = deferred(); (async () => { for await (const s of nc.status()) { - if ( - s.type === Events.Error && - s.permissionContext?.operation === "subscription" - ) { - d.resolve(); + if (s.error instanceof PermissionViolationError) { + const pe = s.error as PermissionViolationError; + if ( + pe.operation === "subscription" && pe.subject.startsWith("_INBOX.") + ) { + d.resolve(); + } } } })().then(); @@ -404,14 +411,14 @@ Deno.test("mreq - lost sub permission", async () => { const iter = await nc.requestMany("q", Empty, { strategy: RequestStrategy.Count, maxMessages: 3, - maxWait: 5000, + maxWait: 2000, noMux: true, }); for await (const _m of iter) { - // nothing + // nothing; } }, - Error, + PermissionViolationError, "Permissions Violation for Subscription", ); await d; @@ -456,8 +463,8 @@ Deno.test("mreq - no responder doesn't leak subs", async () => { // nothing } }, - Error, - "503", + NoRespondersError, + "no responders: 'q'", ); // the mux subscription diff --git a/core/tests/noresponders_test.ts b/core/tests/noresponders_test.ts deleted file mode 100644 index 4d37bfa0..00000000 --- a/core/tests/noresponders_test.ts +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2020-2023 The NATS Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { connect } from "./connect.ts"; -import { createInbox, Empty, ErrorCode, headers } from "@nats-io/nats-core"; -import { assertErrorCode, Lock, NatsServer } from "test_helpers"; -import { assert, assertEquals, fail } from "jsr:@std/assert"; - -Deno.test("noresponders - option", async () => { - const srv = await NatsServer.start(); - const nc = await connect( - { - servers: `127.0.0.1:${srv.port}`, - }, - ); - - const lock = Lock(); - await nc.request(createInbox()) - .then(() => { - fail("should have not resolved"); - }) - .catch((err) => { - assertErrorCode(err, ErrorCode.NoResponders); - lock.unlock(); - }); - - await lock; - await nc.close(); - await srv.stop(); -}); - -Deno.test("noresponders - list", async () => { - const srv = await NatsServer.start(); - const nc = await connect( - { - servers: `nats://127.0.0.1:${srv.port}`, - }, - ); - - const subj = createInbox(); - const sub = nc.subscribe(subj); - (async () => { - for await (const m of sub) { - const h = headers(); - h.append("a", "b"); - m.respond(Empty, { headers: h }); - } - })().then(); - await nc.flush(); - - const msg = await nc.request(subj); - assert(msg.headers); - assertEquals(msg.headers.get("a"), "b"); - await nc.close(); - await srv.stop(); -}); diff --git a/core/tests/protocol_test.ts b/core/tests/protocol_test.ts index c8f2ece0..26bfe21b 100644 --- a/core/tests/protocol_test.ts +++ b/core/tests/protocol_test.ts @@ -14,7 +14,6 @@ */ import { Empty, - ErrorCode, extractProtocolMessage, MuxSubscription, protoLen, @@ -23,10 +22,10 @@ import { Subscriptions, } from "../src/internal_mod.ts"; import type { Msg, ProtocolHandler } from "../src/internal_mod.ts"; -import { assertErrorCode } from "test_helpers"; -import { assertEquals, equal } from "jsr:@std/assert"; +import { assertEquals, assertRejects, equal } from "jsr:@std/assert"; +import { RequestError } from "../src/errors.ts"; -Deno.test("protocol - mux subscription unknown return null", async () => { +Deno.test("protocol - mux subscription cancel", async () => { const mux = new MuxSubscription(); mux.init(); @@ -37,13 +36,17 @@ Deno.test("protocol - mux subscription unknown return null", async () => { assertEquals(mux.get("alberto"), r); assertEquals(mux.getToken({ subject: "" } as Msg), null); - const p = Promise.race([r.deferred, r.timer]) - .catch((err) => { - assertErrorCode(err, ErrorCode.Cancelled); - }); + const check = assertRejects( + () => { + return Promise.race([r.deferred, r.timer]); + }, + RequestError, + "cancelled", + ); r.cancel(); - await p; + + await check; assertEquals(mux.size(), 0); }); diff --git a/core/tests/reconnect_test.ts b/core/tests/reconnect_test.ts index 3a96cdca..6073bf3d 100644 --- a/core/tests/reconnect_test.ts +++ b/core/tests/reconnect_test.ts @@ -12,23 +12,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { assert, assertEquals, fail } from "jsr:@std/assert"; +import { assert, assertEquals, assertInstanceOf, fail } from "jsr:@std/assert"; import { connect } from "./connect.ts"; -import { assertErrorCode, Lock, NatsServer } from "test_helpers"; +import { Lock, NatsServer } from "test_helpers"; import { createInbox, DataBuffer, DebugEvents, deferred, delay, - ErrorCode, Events, tokenAuthenticator, } from "../src/internal_mod.ts"; -import type { NatsConnectionImpl, NatsError } from "../src/internal_mod.ts"; +import type { NatsConnectionImpl } from "../src/internal_mod.ts"; import { _setup, cleanup } from "test_helpers"; import { deadline } from "jsr:@std/async"; +import { ConnectionError } from "../src/errors.ts"; Deno.test("reconnect - should receive when some servers are invalid", async () => { const lock = Lock(1); @@ -77,11 +77,8 @@ Deno.test("reconnect - events", async () => { } })().then(); await srv.stop(); - try { - await nc.closed(); - } catch (err) { - assertErrorCode(err as NatsError, ErrorCode.ConnectionRefused); - } + const err = await nc.closed(); + assertInstanceOf(err, ConnectionError, "connection closed"); assertEquals(disconnects, 1); assertEquals(reconnecting, 10); }); @@ -276,27 +273,8 @@ Deno.test("reconnect - wait on first connect", async () => { // stop the server await srv.stop(); // no reconnect, will quit the client - const what = await nc.closed() as NatsError; - assertEquals(what.code, ErrorCode.ConnectionRefused); -}); - -Deno.test("reconnect - wait on first connect off", async () => { - const srv = await NatsServer.start({}); - const port = srv.port; - await delay(500); - await srv.stop(); - await delay(1000); - const pnc = connect({ - port: port, - }); - - try { - // should fail - await pnc; - } catch (err) { - const nerr = err as NatsError; - assertEquals(nerr.code, ErrorCode.ConnectionRefused); - } + const err = await nc.closed(); + assertInstanceOf(err, ConnectionError, "connection refused"); }); Deno.test("reconnect - close stops reconnects", async () => { @@ -467,7 +445,7 @@ Deno.test("reconnect - authentication timeout reconnects", async () => { }, }); - let counter = 4; + let counter = 3; const authenticator = tokenAuthenticator(() => { if (counter-- <= 0) { return "hello"; @@ -485,7 +463,7 @@ Deno.test("reconnect - authentication timeout reconnects", async () => { port: ns.port, token: "hello", waitOnFirstConnect: true, - timeout: 2000, + ignoreAuthErrorAbort: true, authenticator, }); diff --git a/core/tests/tls_test.ts b/core/tests/tls_test.ts index 202892df..eaa25e36 100644 --- a/core/tests/tls_test.ts +++ b/core/tests/tls_test.ts @@ -12,29 +12,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { - assertEquals, - assertRejects, - assertStringIncludes, - fail, -} from "jsr:@std/assert"; +import { assertEquals, assertRejects } from "jsr:@std/assert"; import { connect } from "./connect.ts"; -import { ErrorCode } from "../src/internal_mod.ts"; +import { ConnectionError } from "../src/internal_mod.ts"; import type { NatsConnectionImpl } from "../src/internal_mod.ts"; -import { assertErrorCode, cleanup, Lock, NatsServer } from "test_helpers"; +import { cleanup, NatsServer } from "test_helpers"; Deno.test("tls - fail if server doesn't support TLS", async () => { const ns = await NatsServer.start(); - const lock = Lock(); - await connect({ port: ns.port, tls: {} }) - .then(() => { - fail("shouldn't have connected"); - }) - .catch((err) => { - assertErrorCode(err, ErrorCode.ServerOptionNotAvailable); - lock.unlock(); - }); - await lock; + await assertRejects( + () => { + return connect({ port: ns.port, tls: {}, reconnect: false }); + }, + ConnectionError, + "option 'tls' is not available", + ); await ns.stop(); }); @@ -53,23 +45,13 @@ Deno.test("tls - custom ca fails without root", async () => { }; const ns = await NatsServer.start(config); - const lock = Lock(); - await connect({ servers: `localhost:${ns.port}` }) - .then(() => { - fail("shouldn't have connected without client ca"); - }) - .catch((err) => { - // this is a bogus error name - but at least we know we are rejected - assertEquals(err.name, "InvalidData"); - assertStringIncludes( - err.message, - "invalid peer certificate", - ); - assertStringIncludes(err.message, "UnknownIssuer"); - lock.unlock(); - }); - - await lock; + await assertRejects( + () => { + return connect({ servers: `localhost:${ns.port}`, reconnect: false }); + }, + ConnectionError, + "invalid peer certificate: UnknownIssuer", + ); await ns.stop(); await Deno.remove(tlsConfig.certsDir, { recursive: true }); }); diff --git a/core/tests/token_test.ts b/core/tests/token_test.ts index 633eb1df..02f0e883 100644 --- a/core/tests/token_test.ts +++ b/core/tests/token_test.ts @@ -12,41 +12,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { fail } from "jsr:@std/assert"; -import { ErrorCode } from "../src/internal_mod.ts"; -import { assertErrorCode, NatsServer } from "test_helpers"; +import { assertRejects } from "jsr:@std/assert"; +import { NatsServer } from "test_helpers"; import { connect } from "./connect.ts"; +import { AuthorizationError } from "../src/errors.ts"; const conf = { authorization: { token: "tokenxxxx" } }; Deno.test("token - empty", async () => { const ns = await NatsServer.start(conf); - try { - const nc = await connect( - { port: ns.port, reconnect: false }, - ); - nc.closed().then((err) => { - console.table(err); - }); - await nc.close(); - fail("should not have connected"); - } catch (err) { - assertErrorCode(err as Error, ErrorCode.AuthorizationViolation); - } + await assertRejects(() => { + return connect({ port: ns.port, reconnect: false, debug: true }); + }, AuthorizationError); + await ns.stop(); }); Deno.test("token - bad", async () => { const ns = await NatsServer.start(conf); - try { - const nc = await connect( - { port: ns.port, token: "bad" }, + await assertRejects(() => { + return connect( + { port: ns.port, token: "bad", reconnect: false }, ); - await nc.close(); - fail("should not have connected"); - } catch (err) { - assertErrorCode(err as Error, ErrorCode.AuthorizationViolation); - } + }, AuthorizationError); await ns.stop(); }); diff --git a/core/tests/ws_test.ts b/core/tests/ws_test.ts index 1130a3af..4efdb85a 100644 --- a/core/tests/ws_test.ts +++ b/core/tests/ws_test.ts @@ -23,12 +23,12 @@ import { import { createInbox, DebugEvents, - ErrorCode, Events, + InvalidOptionError, wsconnect, wsUrlParseFn, } from "../src/internal_mod.ts"; -import type { NatsConnectionImpl, NatsError } from "../src/internal_mod.ts"; +import type { NatsConnectionImpl } from "../src/internal_mod.ts"; import { assertBetween, cleanup, @@ -105,15 +105,13 @@ Deno.test( ); Deno.test("ws - tls options are not supported", async () => { - const err = await assertRejects( + await assertRejects( () => { return wsconnect({ servers: "wss://demo.nats.io:8443", tls: {} }); }, - Error, - "tls", + InvalidOptionError, + "option 'tls' is not configurable on w3c websocket connections", ); - - assertEquals((err as NatsError).code, ErrorCode.InvalidOption); }); Deno.test( diff --git a/jetstream/examples/util.ts b/jetstream/examples/util.ts index a3296971..f767f5b7 100644 --- a/jetstream/examples/util.ts +++ b/jetstream/examples/util.ts @@ -14,8 +14,8 @@ */ import { createConsumer, fill, initStream } from "../tests/jstest_util.ts"; -import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-27"; -import { nuid } from "jsr:@nats-io/nats-core@3.0.0-27"; +import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-30"; +import { nuid } from "jsr:@nats-io/nats-core@3.0.0-30"; export async function setupStreamAndConsumer( nc: NatsConnection, diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 4bec38b1..51bbf836 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -31,6 +31,21 @@ export interface ApiResponse { error?: ApiError; } +export interface JsApiError { + /** + * HTTP like error code in the 300 to 500 range + */ + code: number; + /** + * A human friendly description of the error + */ + description: string; + /** + * The NATS error code unique to each kind of error + */ + err_code?: number; +} + /** * An alternate location to read mirrored data */ diff --git a/jetstream/src/jsbaseclient_api.ts b/jetstream/src/jsbaseclient_api.ts index 4badfa40..76ad2d53 100644 --- a/jetstream/src/jsbaseclient_api.ts +++ b/jetstream/src/jsbaseclient_api.ts @@ -27,9 +27,9 @@ import type { NatsError, RequestOptions, } from "@nats-io/nats-core/internal"; -import { checkJsErrorCode } from "./jsutil.ts"; import type { ApiResponse } from "./jsapi_types.ts"; import type { JetStreamOptions } from "./types.ts"; +import { JetStreamApiError } from "./jserrors.ts"; const defaultPrefix = "$JS.API"; const defaultTimeout = 5000; @@ -142,14 +142,7 @@ export class BaseApiClientImpl { const v = JSON.parse(new TextDecoder().decode(m.data)); const r = v as ApiResponse; if (r.error) { - const err = checkJsErrorCode(r.error.code, r.error.description); - if (err !== null) { - err.api_error = r.error; - if (r.error.description !== "") { - err.message = r.error.description; - } - throw err; - } + throw new JetStreamApiError(r.error); } return v; } diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index 8f3dfe61..c40b0567 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -15,12 +15,7 @@ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { - delay, - Empty, - NatsError, - QueuedIteratorImpl, -} from "@nats-io/nats-core/internal"; +import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal"; import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts"; @@ -39,7 +34,7 @@ import type { StreamAPI, Streams, } from "./types.ts"; -import { ErrorCode, headers } from "@nats-io/nats-core/internal"; +import { headers } from "@nats-io/nats-core/internal"; import type { Msg, @@ -54,6 +49,8 @@ import type { JetStreamAccountStats, } from "./jsapi_types.ts"; import { DirectStreamAPIImpl } from "./jsm.ts"; +import { RequestError } from "../../core/src/mod.ts"; +import { JetStreamError } from "./jserrors.ts"; export function toJetStreamClient( nc: NatsConnection | JetStreamClient, @@ -91,11 +88,12 @@ export async function jetstreamManager( try { await adm.getAccountInfo(); } catch (err) { - const ne = err as NatsError; - if (ne.code === ErrorCode.NoResponders) { - ne.code = ErrorCode.JetStreamNotEnabled; + if ( + err instanceof RequestError && err.message.includes("no responders") + ) { + throw new JetStreamError("jetstream is not enabled", err); } - throw ne; + throw new JetStreamError((err as Error).message, err as Error); } } return adm; @@ -227,8 +225,9 @@ export class JetStreamClientImpl extends BaseApiClientImpl // if here we succeeded break; } catch (err) { - const ne = err as NatsError; - if (ne.code === "503" && i + 1 < retries) { + if ( + err instanceof RequestError && err.message.includes("no responders") + ) { await delay(retry_delay); } else { throw err; @@ -237,7 +236,7 @@ export class JetStreamClientImpl extends BaseApiClientImpl } const pa = this.parseJsResponse(r!) as PubAck; if (pa.stream === "") { - throw NatsError.errorForCode(ErrorCode.JetStreamInvalidAck); + throw new JetStreamError("invalid ack response"); } pa.duplicate = pa.duplicate ? pa.duplicate : false; return pa; diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts new file mode 100644 index 00000000..13666cab --- /dev/null +++ b/jetstream/src/jserrors.ts @@ -0,0 +1,111 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { Msg } from "@nats-io/nats-core"; + +export type ApiError = { + /** + * Status code + */ + code: number; + /** + * JetStream Error Code + */ + err_code: number; + /** + * Error description + */ + description: string; +}; + +export class JetStreamStatus { + msg: Msg; + _description: string; + + constructor(msg: Msg) { + this.msg = msg; + this._description = ""; + } + + get status(): number { + return this.msg.headers?.code || 0; + } + + get description(): string { + if (this._description === "") { + this._description = this.msg.headers?.description?.toLowerCase() || ""; + } + return this._description; + } + + isBadRequest() { + return this.status === 400; + } + + isConsumerDeleted() { + return this.status === 409 && this.description === "consumer deleted"; + } + + isStreamDeleted(): boolean { + return this.status === 409 && this.description === "stream deleted"; + } + + isIdleHeartbeatMissed(): boolean { + return this.status === 409 && this.description === "idle heartbeats missed"; + } + + isMaxWaitingExceeded(): boolean { + return this.status === 409 && this.description === "exceeded maxwaiting"; + } + + isConsumerIsPushBased(): boolean { + return this.status === 409 && this.description === "consumer is push based"; + } +} + +export class JetStreamError extends Error { + constructor(message: string, opts?: ErrorOptions) { + super(message, opts); + this.name = "JetStreamError"; + } +} + +export enum JetStreamApiCodes { + ConsumerNotFound = 10014, + StreamNotFound = 10059, + JetStreamNotEnabledForAccount = 10039, +} + +export class JetStreamApiError extends Error { + #apiError: ApiError; + + constructor(jsErr: ApiError, opts?: ErrorOptions) { + super(jsErr.description, opts); + this.#apiError = jsErr; + this.name = "JetStreamApiError"; + } + + get code(): number { + return this.#apiError.err_code; + } + + get status(): number { + return this.#apiError.code; + } + + apiError(): ApiError { + return Object.assign({}, this.#apiError); + } +} diff --git a/jetstream/src/jsmsg.ts b/jetstream/src/jsmsg.ts index c16d06fc..64cd4b3d 100644 --- a/jetstream/src/jsmsg.ts +++ b/jetstream/src/jsmsg.ts @@ -77,7 +77,7 @@ export interface JsMsg { /** * Indicate to the JetStream server that processing of the message - * failed, and that it should be resent after the spefied number of + * failed, and that it should be resent after the specified number of * milliseconds. * @param millis */ @@ -243,7 +243,7 @@ export class JsMsgImpl implements JsMsg { const proto = mi.publisher as unknown as ProtocolHandler; const trace = !(proto.options?.noAsyncTraces || false); const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, { - timeout: this.timeout, + timeout: opts.timeout, }, trace); proto.request(r); try { diff --git a/jetstream/tests/consume_test.ts b/jetstream/tests/consume_test.ts index 3104108f..0c3c4101 100644 --- a/jetstream/tests/consume_test.ts +++ b/jetstream/tests/consume_test.ts @@ -45,6 +45,7 @@ import { } from "../src/mod.ts"; import type { ConsumerStatus } from "../src/mod.ts"; +import { InvalidOperationError } from "../../core/src/errors.ts"; Deno.test("consumers - consume", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); @@ -93,8 +94,8 @@ Deno.test("consumers - consume callback rejects iter", async () => { // should fail } }, - Error, - "unsupported iterator", + InvalidOperationError, + "iterator cannot be used when a callback is registered", ); iter.stop(); diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index 025828af..352281ad 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -43,6 +43,7 @@ import type { import { StreamImpl } from "../src/jsmstream_api.ts"; import { delayUntilAssetNotFound } from "./util.ts"; import { flakyTest } from "../../test_helpers/mod.ts"; +import { JetStreamApiError } from "../src/jserrors.ts"; Deno.test("ordered consumers - get", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); @@ -1123,13 +1124,14 @@ Deno.test( // continue until the server says the consumer doesn't exist await delayUntilAssetNotFound(c); + await nc.flush(); // so should get that error once await assertRejects( () => { return c.next({ expires: 1000 }); }, - Error, + JetStreamApiError, "consumer not found", ); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 9a8c1f86..0a5579e3 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -26,7 +26,6 @@ import { } from "../src/mod.ts"; import type { Advisory } from "../src/mod.ts"; -import type { NatsError } from "@nats-io/nats-core/internal"; import { deferred, delay, @@ -53,6 +52,7 @@ import { notCompatible, } from "test_helpers"; import { PubHeaders } from "../src/jsapi_types.ts"; +import { JetStreamApiError } from "../src/jserrors.ts"; Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); @@ -747,19 +747,11 @@ Deno.test("jetstream - detailed errors", async () => { num_replicas: 3, subjects: ["foo"], }); - }) as NatsError; + }, JetStreamApiError); - assert(ne.api_error); - assertEquals( - ne.message, - "replicas > 1 not supported in non-clustered mode", - ); - assertEquals( - ne.api_error.description, - "replicas > 1 not supported in non-clustered mode", - ); - assertEquals(ne.api_error.code, 500); - assertEquals(ne.api_error.err_code, 10074); + assertEquals(ne.message, "replicas > 1 not supported in non-clustered mode"); + assertEquals(ne.code, 10074); + assertEquals(ne.status, 500); await cleanup(ns, nc); }); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index a2ff776e..44b06f91 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -24,11 +24,10 @@ import { import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; import { Feature } from "@nats-io/nats-core/internal"; -import type { NatsConnection, NatsError } from "@nats-io/nats-core"; +import type { NatsConnection } from "@nats-io/nats-core"; import { deferred, Empty, - ErrorCode, headers, jwtAuthenticator, nanos, @@ -55,7 +54,6 @@ import { import { initStream } from "./jstest_util.ts"; import { _setup, - assertThrowsAsyncErrorCode, cleanup, connect, flakyTest, @@ -75,6 +73,7 @@ import type { ConsumerAPIImpl } from "../src/jsmconsumer_api.ts"; import { ConsumerApiAction, StoreCompression } from "../src/jsapi_types.ts"; import type { JetStreamManagerImpl } from "../src/jsclient.ts"; import { stripNatsMetadata } from "./util.ts"; +import { JetStreamApiError, JetStreamError } from "../src/jserrors.ts"; const StreamNameRequired = "stream name required"; const ConsumerNameRequired = "durable name required"; @@ -82,9 +81,13 @@ const ConsumerNameRequired = "durable name required"; Deno.test("jsm - jetstream not enabled", async () => { // start a regular server - no js conf const { ns, nc } = await _setup(connect); - await assertThrowsAsyncErrorCode(async () => { - await jetstreamManager(nc); - }, ErrorCode.JetStreamNotEnabled); + await assertRejects( + () => { + return jetstreamManager(nc); + }, + JetStreamError, + "jetstream is not enabled", + ); await cleanup(ns, nc); }); @@ -110,9 +113,13 @@ Deno.test("jsm - account not enabled", async () => { }, }; const { ns, nc } = await _setup(connect, jetstreamServerConf(conf)); - await assertThrowsAsyncErrorCode(async () => { - await jetstreamManager(nc); - }, ErrorCode.JetStreamNotEnabled); + await assertRejects( + () => { + return jetstreamManager(nc); + }, + JetStreamError, + "not enabled for account", + ); const a = await connect( { port: ns.port, user: "a", pass: "a" }, @@ -1016,25 +1023,19 @@ Deno.test( Deno.test("jsm - jetstream error info", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf({})); const jsm = await jetstreamManager(nc); - try { - await jsm.streams.add({ - name: "a", - num_replicas: 3, - subjects: ["a.>"], - }); - fail("should have failed"); - } catch (err) { - const ne = err as NatsError; - assert(ne.isJetStreamError()); - const jerr = ne.jsError(); - assert(jerr); - assertEquals(jerr.code, 500); - assertEquals(jerr.err_code, 10074); - assertEquals( - jerr.description, - "replicas > 1 not supported in non-clustered mode", - ); - } + await assertRejects( + () => { + return jsm.streams.add( + { + name: "a", + num_replicas: 3, + subjects: ["a.>"], + }, + ); + }, + JetStreamApiError, + "replicas > 1 not supported in non-clustered mode", + ); await cleanup(ns, nc); }); diff --git a/jetstream/tests/jsmsg_test.ts b/jetstream/tests/jsmsg_test.ts index ab7c7090..1544c2f8 100644 --- a/jetstream/tests/jsmsg_test.ts +++ b/jetstream/tests/jsmsg_test.ts @@ -21,8 +21,7 @@ import { } from "../src/mod.ts"; import { createInbox, Empty, nanos } from "@nats-io/nats-core"; -import type { Msg } from "@nats-io/nats-core"; -import type { MsgImpl } from "@nats-io/nats-core/internal"; +import type { Msg, MsgImpl } from "@nats-io/nats-core/internal"; import type { JsMsgImpl } from "../src/jsmsg.ts"; import { parseInfo, toJsMsg } from "../src/jsmsg.ts"; @@ -34,6 +33,7 @@ import { jetstreamServerConf, } from "test_helpers"; import type { JetStreamManagerImpl } from "../src/jsclient.ts"; +import { RequestError, TimeoutError } from "../../core/src/mod.ts"; Deno.test("jsmsg - parse", () => { // "$JS.ACK....." @@ -163,8 +163,8 @@ Deno.test("jsmsg - no ack consumer is ackAck 503", async () => { (): Promise => { return jm!.ackAck(); }, - Error, - "503", + RequestError, + "no responders", ); await cleanup(ns, nc); @@ -217,12 +217,11 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => { const start = Date.now(); await assertRejects( (): Promise => { - return jm!.ackAck({ timeout: 1500 }); + return jm!.ackAck({ timeout: 1000 }); }, - Error, - "TIMEOUT", + TimeoutError, ); - assertBetween(Date.now() - start, 1300, 1700); + assertBetween(Date.now() - start, 1000, 1500); await cleanup(ns, nc); }); @@ -252,8 +251,7 @@ Deno.test("jsmsg - ackAck js options timeout", async () => { (): Promise => { return jm!.ackAck(); }, - Error, - "TIMEOUT", + TimeoutError, ); assertBetween(Date.now() - start, 1300, 1700); @@ -285,8 +283,7 @@ Deno.test("jsmsg - ackAck legacy timeout", async () => { (): Promise => { return jm!.ackAck(); }, - Error, - "TIMEOUT", + TimeoutError, ); assertBetween(Date.now() - start, 1300, 1700); diff --git a/jetstream/tests/util.ts b/jetstream/tests/util.ts index e1c7a241..c6351c4c 100644 --- a/jetstream/tests/util.ts +++ b/jetstream/tests/util.ts @@ -1,7 +1,23 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import { delay } from "@nats-io/nats-core"; import { fail } from "node:assert"; import type { Consumer, Stream } from "../src/types.ts"; import { StreamImpl } from "../src/jsmstream_api.ts"; +import { JetStreamApiCodes, JetStreamApiError } from "../src/jserrors.ts"; export function stripNatsMetadata(md?: Record) { if (md) { @@ -17,17 +33,27 @@ export async function delayUntilAssetNotFound( a: Consumer | Stream, ): Promise { const expected = a instanceof StreamImpl ? "stream" : "consumer"; - const m = `${expected} not found`; while (true) { try { await a.info(); await delay(20); } catch (err) { - if ((err as Error).message === m) { - break; - } else { - fail((err as Error).message); + if (err instanceof JetStreamApiError) { + const jserr = err as JetStreamApiError; + if ( + jserr.code === JetStreamApiCodes.ConsumerNotFound && + expected === "consumer" + ) { + break; + } + if ( + jserr.code === JetStreamApiCodes.StreamNotFound && + expected === "stream" + ) { + break; + } } + fail((err as Error).message); } } } diff --git a/services/src/service.ts b/services/src/service.ts index 3ddd1814..58251812 100644 --- a/services/src/service.ts +++ b/services/src/service.ts @@ -373,7 +373,7 @@ export class ServiceImpl implements Service { sv.queue = queue; const callback = handler - ? (err: NatsError | null, msg: Msg) => { + ? (err: Error | null, msg: Msg) => { if (err) { this.close(err); return; diff --git a/services/tests/service_test.ts b/services/tests/service_test.ts index 6b490a1a..ca151276 100644 --- a/services/tests/service_test.ts +++ b/services/tests/service_test.ts @@ -19,6 +19,7 @@ import { assertArrayIncludes, assertEquals, assertExists, + assertInstanceOf, assertRejects, assertThrows, fail, @@ -28,14 +29,14 @@ import { collect, createInbox, delay, - ErrorCode, + NoRespondersError, nuid, + PermissionViolationError, } from "@nats-io/nats-core/internal"; import type { Msg, NatsConnection, NatsConnectionImpl, - NatsError, QueuedIterator, SubscriptionImpl, } from "@nats-io/nats-core/internal"; @@ -230,8 +231,7 @@ Deno.test("service - basics", async () => { async () => { await collect(await m.ping("test", "c")); }, - Error, - ErrorCode.NoResponders, + NoRespondersError, ); assertEquals(await count(m.info()), 2); @@ -241,8 +241,7 @@ Deno.test("service - basics", async () => { async () => { await collect(await m.info("test", "c")); }, - Error, - ErrorCode.NoResponders, + NoRespondersError, ); assertEquals(await count(m.stats()), 2); @@ -252,8 +251,7 @@ Deno.test("service - basics", async () => { async () => { await collect(await m.stats("test", "c")); }, - Error, - ErrorCode.NoResponders, + NoRespondersError, ); await srvA.stop(); @@ -291,11 +289,8 @@ Deno.test("service - stop error", async () => { fail("shouldn't have subscribed"); }); - const err = await service.stopped as NatsError; - assertEquals( - err.code, - ErrorCode.PermissionsViolation, - ); + const err = await service.stopped as Error; + assertInstanceOf(err, PermissionViolationError); await cleanup(ns, nc); }); @@ -325,11 +320,8 @@ Deno.test("service - start error", async () => { msg?.respond(); }); - const err = await service.stopped as NatsError; - assertEquals( - err.code, - ErrorCode.PermissionsViolation, - ); + const err = await service.stopped as Error; + assertInstanceOf(err, PermissionViolationError); await cleanup(ns, nc); }); diff --git a/test_helpers/asserts.ts b/test_helpers/asserts.ts index d1e5e3fa..df6d2206 100644 --- a/test_helpers/asserts.ts +++ b/test_helpers/asserts.ts @@ -13,49 +13,9 @@ * limitations under the License. */ -import { assert, assertThrows, fail } from "jsr:@std/assert"; -import type { NatsError } from "../core/src/mod.ts"; -import { isNatsError } from "../core/src/internal_mod.ts"; - -export function assertErrorCode(err?: Error, ...codes: string[]) { - if (!err) { - fail(`expected an error to be thrown`); - } - if (isNatsError(err)) { - const { code } = err as NatsError; - assert(code); - const ok = codes.find((c) => { - return code.indexOf(c) !== -1; - }); - if (ok === "") { - fail(`got ${code} - expected any of [${codes.join(", ")}]`); - } - } else { - fail(`didn't get a nats error - got: ${err.message}`); - } -} - -export function assertThrowsErrorCode( - fn: () => T, - ...codes: string[] -) { - const err = assertThrows(fn); - assertErrorCode(err as Error, ...codes); -} - -export async function assertThrowsAsyncErrorCode( - fn: () => Promise, - ...codes: string[] -) { - try { - await fn(); - fail("expected to throw"); - } catch (err) { - assertErrorCode(err as Error, ...codes); - } -} +import { assertGreaterOrEqual, assertLessOrEqual } from "jsr:@std/assert"; export function assertBetween(n: number, low: number, high: number) { - console.assert(n >= low, `${n} >= ${low}`); - console.assert(n <= high, `${n} <= ${low}`); + assertGreaterOrEqual(n, low, `${n} >= ${low}`) + assertLessOrEqual(n, high, `${n} <= ${high}`) } diff --git a/test_helpers/mod.ts b/test_helpers/mod.ts index 142148d0..2b73040f 100644 --- a/test_helpers/mod.ts +++ b/test_helpers/mod.ts @@ -29,9 +29,6 @@ export { Lock } from "./lock.ts"; export { Connection, TestServer } from "./test_server.ts"; export { assertBetween, - assertErrorCode, - assertThrowsAsyncErrorCode, - assertThrowsErrorCode, } from "./asserts.ts"; export { NatsServer, ServerSignals } from "./launcher.ts"; diff --git a/transport-deno/src/deno_transport.ts b/transport-deno/src/deno_transport.ts index 9330699d..83874e4b 100644 --- a/transport-deno/src/deno_transport.ts +++ b/transport-deno/src/deno_transport.ts @@ -19,10 +19,8 @@ import { DataBuffer, deferred, Empty, - ErrorCode, extractProtocolMessage, INFO, - NatsError, render, } from "@nats-io/nats-core/internal"; @@ -36,6 +34,7 @@ import type { import { writeAll } from "@std/io"; import { version } from "./version.ts"; +import { ConnectionError } from "../../core/src/errors.ts"; const VERSION = version; const LANG = "nats.deno"; @@ -99,10 +98,10 @@ export class DenoTransport implements Transport { this.conn.close(); } } catch (err) { - this.conn?.close(); - throw (err as NatsError)?.name === "ConnectionRefused" - ? NatsError.errorForCode(ErrorCode.ConnectionRefused) - : err; + const m = (err as Error).name === "ConnectionRefused" + ? "connection refused" + : (err as Error).message; + throw new ConnectionError(m, { cause: err }); } } diff --git a/transport-node/tests/basics_test.js b/transport-node/tests/basics_test.js index f1b751a8..d2a7c02e 100644 --- a/transport-node/tests/basics_test.js +++ b/transport-node/tests/basics_test.js @@ -16,6 +16,7 @@ const { describe, it } = require("node:test"); const assert = require("node:assert").strict; const { connect, + ConnectionError, ErrorCode, createInbox, } = require( @@ -79,7 +80,8 @@ describe( assert.fail("should have not connected"); }) .catch((err) => { - assert.equal(err.code, ErrorCode.ConnectionRefused); + assert(err instanceof ConnectionError); + assert.equal(err.message, "connection refused"); }); }); @@ -137,7 +139,8 @@ describe( const closed = nc.closed(); await ns.stop(); const err = await closed; - assert.equal(err?.code, ErrorCode.ConnectionRefused); + assert(err instanceof ClosedConnectionError); + assert.equal(err.message, "connection refused"); }); it("basics - server error", async () => { diff --git a/transport-node/tests/reconnect_test.js b/transport-node/tests/reconnect_test.js index 816d63f7..d4ecce4e 100644 --- a/transport-node/tests/reconnect_test.js +++ b/transport-node/tests/reconnect_test.js @@ -21,7 +21,6 @@ const { NatsServer } = require("./helpers/launcher"); const { createInbox, Events, - ErrorCode, deferred, DebugEvents, } = require("@nats-io/nats-core/internal"); @@ -81,7 +80,7 @@ describe( try { await nc.closed(); } catch (err) { - assert.equal(err.code, ErrorCode.ConnectionRefused); + assert.equal(err.message, "connection refused"); } assert.equal(disconnects, 1, "disconnects"); assert.equal(reconnecting, 10, "reconnecting"); @@ -124,7 +123,7 @@ describe( nc.protocol.servers.getCurrentServer().lastConnect; const dt = deferred(); - const _ = (async () => { + (async () => { for await (const e of nc.status()) { switch (e.type) { case DebugEvents.Reconnecting: