Skip to content

Commit

Permalink
feat: remove MongoAbortError
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 10, 2025
1 parent f8c802e commit 5de94ee
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 202 deletions.
4 changes: 2 additions & 2 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,9 @@ export class StateMachine {
resolve
} = promiseWithResolvers<void>();

abortListener = addAbortListener(options?.signal, error => {
abortListener = addAbortListener(options?.signal, function () {
destroySockets();
rejectOnTlsSocketError(error);
rejectOnTlsSocketError(this.reason);
});

socket
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
checkoutTime
};

const abortListener = addAbortListener(options.signal, error => {
const abortListener = addAbortListener(options.signal, function () {
waitQueueMember.cancelled = true;
reject(error);
reject(this.reason);
});

this.waitQueue.push(waitQueueMember);
Expand Down
4 changes: 3 additions & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ export function onData(
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
const abortListener = addAbortListener(signal, errorHandler);
const abortListener = addAbortListener(signal, function () {
errorHandler(this.reason);
});

const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
timeoutForSocketRead?.throwIfExpired();
Expand Down
17 changes: 8 additions & 9 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import {
type Disposable,
kDispose,
type MongoDBNamespace,
squashError,
throwIfAborted
squashError
} from '../utils';

/**
Expand Down Expand Up @@ -467,7 +466,7 @@ export abstract class AbstractCursor<
}

async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

if (this.closed) {
return;
Expand Down Expand Up @@ -495,7 +494,7 @@ export abstract class AbstractCursor<
}

yield document;
throwIfAborted(this.signal);
this.signal?.throwIfAborted();
}
} finally {
// Only close the cursor if it has not already been closed. This finally clause handles
Expand Down Expand Up @@ -541,7 +540,7 @@ export abstract class AbstractCursor<
}

async hasNext(): Promise<boolean> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

if (this.cursorId === Long.ZERO) {
return false;
Expand All @@ -568,7 +567,7 @@ export abstract class AbstractCursor<

/** Get the next available document from the cursor, returns null if no more documents are available. */
async next(): Promise<TSchema | null> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

if (this.cursorId === Long.ZERO) {
throw new MongoCursorExhaustedError();
Expand Down Expand Up @@ -600,7 +599,7 @@ export abstract class AbstractCursor<
* Try to get the next available document from the cursor or `null` if an empty batch is returned
*/
async tryNext(): Promise<TSchema | null> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

if (this.cursorId === Long.ZERO) {
throw new MongoCursorExhaustedError();
Expand Down Expand Up @@ -641,7 +640,7 @@ export abstract class AbstractCursor<
* @deprecated - Will be removed in a future release. Use for await...of instead.
*/
async forEach(iterator: (doc: TSchema) => boolean | void): Promise<void> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

if (typeof iterator !== 'function') {
throw new MongoInvalidArgumentError('Argument "iterator" must be a function');
Expand All @@ -668,7 +667,7 @@ export abstract class AbstractCursor<
* cursor.rewind() can be used to reset the cursor.
*/
async toArray(): Promise<TSchema[]> {
throwIfAborted(this.signal);
this.signal?.throwIfAborted();

const array: TSchema[] = [];
// at the end of the loop (since readBufferedDocuments is called) the buffer will be empty
Expand Down
28 changes: 0 additions & 28 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,34 +196,6 @@ export class MongoError extends Error {
}
}

/**
* An error thrown when a signal is aborted
*
* A MongoAbortError has the name "AbortError" to match the name
* given to a DOMException thrown from web APIs that support AbortSignals.
*
* @example
* ```js
* try {
* const res = await fetch('...', { signal });
* await collection.insertOne(await res.json(), { signal });
* catch (error) {
* if (error.name === 'AbortError') {
* // error is MongoAbortError or DOMException,
* // both represent the signal being aborted
* }
* }
* ```
*
* @public
* @category Error
*/
export class MongoAbortError extends MongoError {
override get name(): 'AbortError' {
return 'AbortError';
}
}

/**
* An error coming from the mongo server
*
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export {
export { ClientEncryption } from './client-side-encryption/client_encryption';
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
export {
MongoAbortError,
MongoAPIError,
MongoAWSError,
MongoAzureError,
Expand Down
4 changes: 2 additions & 2 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
previousServer: options.previousServer
};

const abortListener = addAbortListener(options.signal, error => {
const abortListener = addAbortListener(options.signal, function () {
waitQueueMember.cancelled = true;
reject(error);
reject(this.reason);
});

this.waitQueue.push(waitQueueMember);
Expand Down
22 changes: 6 additions & 16 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import type { FindCursor } from './cursor/find_cursor';
import type { Db } from './db';
import {
type AnyError,
MongoAbortError,
MongoAPIError,
MongoCompatibilityError,
MongoInvalidArgumentError,
Expand Down Expand Up @@ -1355,7 +1354,9 @@ export async function once<T>(ee: EventEmitter, name: string, options?: Abortabl
const { promise, resolve, reject } = promiseWithResolvers<T>();
const onEvent = (data: T) => resolve(data);
const onError = (error: Error) => reject(error);
const abortListener = addAbortListener(options?.signal, reject);
const abortListener = addAbortListener(options?.signal, function () {
reject(this.reason);
});

ee.once(name, onEvent).once('error', onError);

Expand Down Expand Up @@ -1479,20 +1480,9 @@ export interface Disposable {

export function addAbortListener(
signal: AbortSignal | undefined | null,
listener: (event: Error) => void
listener: (this: AbortSignal, event: Event) => void
): Disposable | undefined {
if (signal == null) return;

const convertReasonToError = () =>
listener(new MongoAbortError('Operation was aborted', { cause: signal.reason }));

signal.addEventListener('abort', convertReasonToError);

return { [kDispose]: () => signal.removeEventListener('abort', convertReasonToError) };
}

export function throwIfAborted(signal?: { aborted?: boolean; reason?: any }): void {
if (signal?.aborted) {
throw new MongoAbortError('Operation was aborted', { cause: signal.reason });
}
signal.addEventListener('abort', listener);
return { [kDispose]: () => signal.removeEventListener('abort', listener) };
}
Loading

0 comments on commit 5de94ee

Please sign in to comment.