From edfa0710ea974ff009fde2fb95cba617615dd3a9 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 21 Jun 2024 12:06:30 -0400 Subject: [PATCH] refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131) --- src/bulk/common.ts | 4 + src/cmap/connection.ts | 4 +- src/cmap/connection_pool.ts | 39 +--- src/index.ts | 18 +- src/operations/aggregate.ts | 5 +- src/operations/bulk_write.ts | 11 +- src/operations/command.ts | 8 +- src/operations/count.ts | 9 +- src/operations/create_collection.ts | 18 +- src/operations/delete.ts | 21 +- src/operations/distinct.ts | 9 +- src/operations/drop.ts | 24 ++- src/operations/estimated_document_count.ts | 9 +- src/operations/execute_operation.ts | 16 +- src/operations/find.ts | 6 +- src/operations/find_and_modify.ts | 9 +- src/operations/get_more.ts | 5 +- src/operations/indexes.ts | 22 +- src/operations/insert.ts | 19 +- src/operations/kill_cursors.ts | 12 +- src/operations/list_collections.ts | 5 +- src/operations/list_databases.ts | 11 +- src/operations/operation.ts | 10 +- src/operations/profiling_level.ts | 9 +- src/operations/remove_user.ts | 9 +- src/operations/rename.ts | 9 +- src/operations/run_command.ts | 17 +- src/operations/search_indexes/create.ts | 12 +- src/operations/search_indexes/drop.ts | 9 +- src/operations/search_indexes/update.ts | 9 +- src/operations/set_profiling_level.ts | 6 +- src/operations/stats.ts | 9 +- src/operations/update.ts | 24 ++- src/operations/validate_collection.ts | 9 +- src/sdam/server.ts | 12 +- src/sdam/topology.ts | 55 +++-- src/timeout.ts | 166 +++++++++++++- ...lient_side_operations_timeout.unit.test.ts | 12 +- .../node_csot.test.ts | 2 +- test/tools/cmap_spec_runner.ts | 12 +- test/unit/cmap/connection_pool.test.js | 22 +- test/unit/error.test.ts | 19 +- test/unit/operations/get_more.test.ts | 2 +- test/unit/sdam/topology.test.ts | 76 +++++-- test/unit/timeout.test.ts | 204 +++++++++++++++++- 45 files changed, 796 insertions(+), 202 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index a62d62a4a5c..dc0bcfb513f 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -19,6 +19,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { applyRetryableWrites, getTopology, @@ -842,6 +843,9 @@ export interface BulkWriteOptions extends CommandOperationOptions { forceServerObjectId?: boolean; /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */ let?: Document; + + /** @internal */ + timeoutContext?: TimeoutContext; } /** diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 381aeb3f64e..b954ae508e6 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -35,7 +35,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type TimeoutContext } from '../timeout'; import { BufferPool, calculateDurationInMs, @@ -102,7 +102,7 @@ export interface CommandOptions extends BSONSerializeOptions { directConnection?: boolean; /** @internal */ - timeout?: Timeout; + timeoutContext?: TimeoutContext; } /** @public */ diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 79440db1e06..5369cc155aa 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -27,8 +27,8 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { Timeout, TimeoutError } from '../timeout'; -import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils'; +import { type TimeoutContext, TimeoutError } from '../timeout'; +import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -355,41 +355,15 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - async checkOut(options?: { timeout?: Timeout }): Promise { + async checkOut(options: { timeoutContext: TimeoutContext }): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) ); - const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; - const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS; - const { promise, resolve, reject } = promiseWithResolvers(); - let timeout: Timeout | null = null; - if (options?.timeout) { - // CSOT enabled - // Determine if we're using the timeout passed in or a new timeout - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - // This check determines whether or not Topology.selectServer used the configured - // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - // server selection used `timeoutMS`, so we should use the existing timeout as the timeout - // here - timeout = options.timeout; - } else { - // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with - // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut - // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking - timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed); - } - } - } else { - timeout = Timeout.expires(waitQueueTimeoutMS); - } + const timeout = options.timeoutContext.connectionCheckoutTimeout; const waitQueueMember: WaitQueueMember = { resolve, @@ -404,6 +378,7 @@ export class ConnectionPool extends TypedEventEmitter { return await (timeout ? Promise.race([promise, timeout]) : promise); } catch (error) { if (TimeoutError.is(error)) { + timeout?.clear(); waitQueueMember[kCancelled] = true; this.emitAndLog( @@ -416,7 +391,7 @@ export class ConnectionPool extends TypedEventEmitter { : 'Timed out while checking out a connection from connection pool', this.address ); - if (options?.timeout) { + if (options.timeoutContext.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during connection checkout', { cause: timeoutError }); @@ -425,7 +400,7 @@ export class ConnectionPool extends TypedEventEmitter { } throw error; } finally { - if (timeout !== options?.timeout) timeout?.clear(); + if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear(); } } diff --git a/src/index.ts b/src/index.ts index be3756110de..c335f0fe846 100644 --- a/src/index.ts +++ b/src/index.ts @@ -567,7 +567,13 @@ export type { RTTSampler, ServerMonitoringMode } from './sdam/monitor'; -export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server'; +export type { + Server, + ServerCommandOptions, + ServerEvents, + ServerOptions, + ServerPrivate +} from './sdam/server'; export type { ServerDescription, ServerDescriptionOptions, @@ -598,7 +604,15 @@ export type { WithTransactionCallback } from './sessions'; export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort'; -export type { Timeout } from './timeout'; +export type { + CSOTTimeoutContext, + CSOTTimeoutContextOptions, + LegacyTimeoutContext, + LegacyTimeoutContextOptions, + Timeout, + TimeoutContext, + TimeoutContextOptions +} from './timeout'; export type { Transaction, TransactionOptions, TxnState } from './transactions'; export type { BufferPool, diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index 7b67fd0422d..f1721ba41cd 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -4,6 +4,7 @@ import { MongoInvalidArgumentError } from '../error'; import { type ExplainOptions } from '../explain'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; @@ -105,7 +106,8 @@ export class AggregateOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const options: AggregateOptions = this.options; const serverWireVersion = maxWireVersion(server); @@ -150,6 +152,7 @@ export class AggregateOperation extends CommandOperation { server, session, command, + timeoutContext, this.explain ? ExplainedCursorResponse : CursorResponse ); } diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 0a855644f06..55b61ef73b0 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -7,6 +7,7 @@ import type { import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { AbstractOperation, Aspect, defineAspects } from './operation'; /** @internal */ @@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const coll = this.collection; const operations = this.operations; - const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; + const options = { + ...this.options, + ...this.bsonOptions, + readPreference: this.readPreference, + timeoutContext + }; // Create the bulk operation const bulk: BulkOperationBase = diff --git a/src/operations/command.ts b/src/operations/command.ts index c64b4ae963a..5bd80f796d1 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -7,6 +7,7 @@ import type { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { commandSupportsReadConcern, decorateWithExplain, @@ -112,19 +113,22 @@ export abstract class CommandOperation extends AbstractOperation { server: Server, session: ClientSession | undefined, cmd: Document, + timeoutContext: TimeoutContext, responseType: T | undefined ): Promise>; public async executeCommand( server: Server, session: ClientSession | undefined, - cmd: Document + cmd: Document, + timeoutContext: TimeoutContext ): Promise; async executeCommand( server: Server, session: ClientSession | undefined, cmd: Document, + timeoutContext: TimeoutContext, responseType?: MongoDBResponseConstructor ): Promise { this.server = server; @@ -132,7 +136,7 @@ export abstract class CommandOperation extends AbstractOperation { const options = { ...this.options, ...this.bsonOptions, - timeout: this.timeout, + timeoutContext, readPreference: this.readPreference, session }; diff --git a/src/operations/count.ts b/src/operations/count.ts index 00aae501728..82330a11e76 100644 --- a/src/operations/count.ts +++ b/src/operations/count.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation { return 'count' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options; const cmd: Document = { count: this.collectionName, @@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation { cmd.maxTimeMS = options.maxTimeMS; } - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return result ? result.n : 0; } } diff --git a/src/operations/create_collection.ts b/src/operations/create_collection.ts index 8edc7e9a1c4..afb2680b9a0 100644 --- a/src/operations/create_collection.ts +++ b/src/operations/create_collection.ts @@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error'; import type { PkFactory } from '../mongo_client'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { CreateIndexesOperation } from './indexes'; import { Aspect, defineAspects } from './operation'; @@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation { return 'create' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const db = this.db; const name = this.name; const options = this.options; @@ -155,7 +160,7 @@ export class CreateCollectionOperation extends CommandOperation { unique: true } }); - await createOp.executeWithoutEncryptedFieldsCheck(server, session); + await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } if (!options.encryptedFields) { @@ -163,7 +168,7 @@ export class CreateCollectionOperation extends CommandOperation { } } - const coll = await this.executeWithoutEncryptedFieldsCheck(server, session); + const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); if (encryptedFields) { // Create the required index for queryable encryption support. @@ -173,7 +178,7 @@ export class CreateCollectionOperation extends CommandOperation { { __safeContent__: 1 }, {} ); - await createIndexOp.execute(server, session); + await createIndexOp.execute(server, session, timeoutContext); } return coll; @@ -181,7 +186,8 @@ export class CreateCollectionOperation extends CommandOperation { private async executeWithoutEncryptedFieldsCheck( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const db = this.db; const name = this.name; @@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation { } } // otherwise just execute the command - await super.executeCommand(server, session, cmd); + await super.executeCommand(server, session, cmd, timeoutContext); return new Collection(db, name, options); } } diff --git a/src/operations/delete.ts b/src/operations/delete.ts index f0ef61cb7b1..0e93ead36a2 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoServerError } from '../error'; import { type TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { type MongoDBNamespace } from '../utils'; import { type WriteConcernOptions } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; @@ -67,7 +68,8 @@ export class DeleteOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; @@ -95,7 +97,12 @@ export class DeleteOperation extends CommandOperation { } } - const res: TODO_NODE_3286 = await super.executeCommand(server, session, command); + const res: TODO_NODE_3286 = await super.executeCommand( + server, + session, + command, + timeoutContext + ); return res; } } @@ -107,9 +114,10 @@ export class DeleteOneOperation extends DeleteOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -127,9 +135,10 @@ export class DeleteManyOperation extends DeleteOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); diff --git a/src/operations/distinct.ts b/src/operations/distinct.ts index 4fda285d880..51f2a362d8c 100644 --- a/src/operations/distinct.ts +++ b/src/operations/distinct.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { decorateWithCollation, decorateWithReadConcern } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -42,7 +43,11 @@ export class DistinctOperation extends CommandOperation { return 'distinct' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const coll = this.collection; const key = this.key; const query = this.query; @@ -72,7 +77,7 @@ export class DistinctOperation extends CommandOperation { // Have we specified collation decorateWithCollation(cmd, coll, options); - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return this.explain ? result : result.values; } diff --git a/src/operations/drop.ts b/src/operations/drop.ts index 15624d4c07b..787bb6e7d0f 100644 --- a/src/operations/drop.ts +++ b/src/operations/drop.ts @@ -3,6 +3,7 @@ import type { Db } from '../db'; import { MONGODB_ERROR_CODES, MongoServerError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -29,7 +30,11 @@ export class DropCollectionOperation extends CommandOperation { return 'drop' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const db = this.db; const options = this.options; const name = this.name; @@ -57,7 +62,7 @@ export class DropCollectionOperation extends CommandOperation { // Drop auxilliary collections, ignoring potential NamespaceNotFound errors. const dropOp = new DropCollectionOperation(db, collectionName); try { - await dropOp.executeWithoutEncryptedFieldsCheck(server, session); + await dropOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } catch (err) { if ( !(err instanceof MongoServerError) || @@ -69,14 +74,15 @@ export class DropCollectionOperation extends CommandOperation { } } - return await this.executeWithoutEncryptedFieldsCheck(server, session); + return await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } private async executeWithoutEncryptedFieldsCheck( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - await super.executeCommand(server, session, { drop: this.name }); + await super.executeCommand(server, session, { drop: this.name }, timeoutContext); return true; } } @@ -96,8 +102,12 @@ export class DropDatabaseOperation extends CommandOperation { return 'dropDatabase' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - await super.executeCommand(server, session, { dropDatabase: 1 }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + await super.executeCommand(server, session, { dropDatabase: 1 }, timeoutContext); return true; } } diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index c1d6c381998..5ab5aa4c305 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -30,7 +31,11 @@ export class EstimatedDocumentCountOperation extends CommandOperation { return 'count' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const cmd: Document = { count: this.collectionName }; if (typeof this.options.maxTimeMS === 'number') { @@ -43,7 +48,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation { cmd.comment = this.options.comment; } - const response = await super.executeCommand(server, session, cmd); + const response = await super.executeCommand(server, session, cmd, timeoutContext); return response?.n || 0; } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index ec7c233eeca..0cffa0c35f7 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -24,7 +24,8 @@ import { } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; -import { supportsRetryableWrites } from '../utils'; +import { TimeoutContext } from '../timeout'; +import { squashError, supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; @@ -57,7 +58,7 @@ type ResultTypeFromOperation = export async function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(client: MongoClient, operation: T): Promise { +>(client: MongoClient, operation: T, timeoutContext?: TimeoutContext): Promise { if (!(operation instanceof AbstractOperation)) { // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); @@ -86,6 +87,12 @@ export async function executeOperation< ); } + timeoutContext ??= TimeoutContext.create({ + serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, + timeoutMS: operation.options.timeoutMS + }); + const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -109,7 +116,8 @@ export async function executeOperation< return await tryOperation(operation, { topology, session, - readPreference + readPreference, + timeoutContext }); } finally { if (session?.owner != null && session.owner === owner) { @@ -268,7 +276,7 @@ async function tryOperation< if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { operation.resetBatch(); } - return await operation.execute(server, session); + return await operation.execute(server, session, timeoutContext); } catch (operationError) { if (!(operationError instanceof MongoError)) throw operationError; diff --git a/src/operations/find.ts b/src/operations/find.ts index d34d99cc745..a2ea2ad25e8 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -6,6 +6,7 @@ import { ReadConcern } from '../read_concern'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort } from '../sort'; +import { type TimeoutContext } from '../timeout'; import { decorateWithExplain, type MongoDBNamespace, normalizeHintField } from '../utils'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; @@ -105,7 +106,8 @@ export class FindOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { this.server = server; @@ -124,7 +126,7 @@ export class FindOperation extends CommandOperation { ...this.bsonOptions, documentsReturnedIn: 'firstBatch', session, - timeout: this.timeout + timeoutContext }, this.explain ? ExplainedCursorResponse : CursorResponse ); diff --git a/src/operations/find_and_modify.ts b/src/operations/find_and_modify.ts index 92b17e93b3b..651bcccb626 100644 --- a/src/operations/find_and_modify.ts +++ b/src/operations/find_and_modify.ts @@ -5,6 +5,7 @@ import { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortForCmd } from '../sort'; +import { type TimeoutContext } from '../timeout'; import { decorateWithCollation, hasAtomicOperators, maxWireVersion } from '../utils'; import { type WriteConcern, type WriteConcernSettings } from '../write_concern'; import { CommandOperation, type CommandOperationOptions } from './command'; @@ -180,7 +181,11 @@ export class FindAndModifyOperation extends CommandOperation { return 'findAndModify' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const coll = this.collection; const query = this.query; const options = { ...this.options, ...this.bsonOptions }; @@ -208,7 +213,7 @@ export class FindAndModifyOperation extends CommandOperation { } // Execute the command - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return options.includeResultMetadata ? result : (result.value ?? null); } } diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index aa550721b6f..34317d533b5 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -3,6 +3,7 @@ import { CursorResponse } from '../cmap/wire_protocol/responses'; import { MongoRuntimeError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation'; @@ -58,7 +59,8 @@ export class GetMoreOperation extends AbstractOperation { */ override async execute( server: Server, - _session: ClientSession | undefined + _session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { if (server !== this.server) { throw new MongoRuntimeError('Getmore must run on the same server operation began on'); @@ -97,6 +99,7 @@ export class GetMoreOperation extends AbstractOperation { const commandOptions = { returnFieldSelector: null, documentsReturnedIn: 'nextBatch', + timeoutContext, ...this.options }; diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index fda3fa80dd6..c96a5d73453 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -6,6 +6,7 @@ import { MongoCompatibilityError } from '../error'; import { type OneOrMore } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { isObject, maxWireVersion, type MongoDBNamespace } from '../utils'; import { type CollationOptions, @@ -296,7 +297,11 @@ export class CreateIndexesOperation extends CommandOperation { return 'createIndexes'; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options; const indexes = this.indexes; @@ -316,7 +321,7 @@ export class CreateIndexesOperation extends CommandOperation { // collation is set on each index, it should not be defined at the root this.options.collation = undefined; - await super.executeCommand(server, session, cmd); + await super.executeCommand(server, session, cmd, timeoutContext); const indexNames = indexes.map(index => index.name || ''); return indexNames; @@ -344,9 +349,13 @@ export class DropIndexOperation extends CommandOperation { return 'dropIndexes' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const cmd = { dropIndexes: this.collection.collectionName, index: this.indexName }; - return await super.executeCommand(server, session, cmd); + return await super.executeCommand(server, session, cmd, timeoutContext); } } @@ -379,7 +388,8 @@ export class ListIndexesOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const serverWireVersion = maxWireVersion(server); @@ -393,7 +403,7 @@ export class ListIndexesOperation extends CommandOperation { command.comment = this.options.comment; } - return await super.executeCommand(server, session, command, CursorResponse); + return await super.executeCommand(server, session, command, timeoutContext, CursorResponse); } } diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 35a050ed1ca..1a40763e313 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -5,6 +5,7 @@ import { MongoInvalidArgumentError, MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; import { BulkWriteOperation } from './bulk_write'; @@ -27,7 +28,11 @@ export class InsertOperation extends CommandOperation { return 'insert' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { @@ -46,7 +51,7 @@ export class InsertOperation extends CommandOperation { command.comment = options.comment; } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } @@ -73,9 +78,10 @@ export class InsertOneOperation extends InsertOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res = await super.execute(server, session); + const res = await super.execute(server, session, timeoutContext); if (res.code) throw new MongoServerError(res); if (res.writeErrors) { // This should be a WriteError but we can't change it now because of error hierarchy @@ -123,7 +129,8 @@ export class InsertManyOperation extends AbstractOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const coll = this.collection; const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; @@ -137,7 +144,7 @@ export class InsertManyOperation extends AbstractOperation { ); try { - const res = await bulkWriteOperation.execute(server, session); + const res = await bulkWriteOperation.execute(server, session, timeoutContext); return { acknowledged: writeConcern?.w !== 0, insertedCount: res.insertedCount, diff --git a/src/operations/kill_cursors.ts b/src/operations/kill_cursors.ts index 356230e9c7a..72c6a04b276 100644 --- a/src/operations/kill_cursors.ts +++ b/src/operations/kill_cursors.ts @@ -2,6 +2,7 @@ import type { Long } from '../bson'; import { MongoRuntimeError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { type MongoDBNamespace, squashError } from '../utils'; import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation'; @@ -29,7 +30,11 @@ export class KillCursorsOperation extends AbstractOperation { return 'killCursors' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { if (server !== this.server) { throw new MongoRuntimeError('Killcursor must run on the same server operation began on'); } @@ -46,7 +51,10 @@ export class KillCursorsOperation extends AbstractOperation { cursors: [this.cursorId] }; try { - await server.command(this.ns, killCursorsCommand, { session }); + await server.command(this.ns, killCursorsCommand, { + session, + timeoutContext + }); } catch (error) { // The driver should never emit errors from killCursors, this is spec-ed behavior squashError(error); diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index e94300f1205..702db0fe3f2 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -3,6 +3,7 @@ import { CursorResponse } from '../cmap/wire_protocol/responses'; import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -54,12 +55,14 @@ export class ListCollectionsOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { return await super.executeCommand( server, session, this.generateCommand(maxWireVersion(server)), + timeoutContext, CursorResponse ); } diff --git a/src/operations/list_databases.ts b/src/operations/list_databases.ts index 5ad9142a1a7..bd740d50c68 100644 --- a/src/operations/list_databases.ts +++ b/src/operations/list_databases.ts @@ -3,6 +3,7 @@ import type { Db } from '../db'; import { type TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -41,7 +42,8 @@ export class ListDatabasesOperation extends CommandOperation { const cmd: Document = { listDatabases: 1 }; @@ -63,7 +65,12 @@ export class ListDatabasesOperation extends CommandOperation); + return await (super.executeCommand( + server, + session, + cmd, + timeoutContext + ) as Promise); } } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index e08d25bfec0..8558af7a4e5 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type Timeout, type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; export const Aspect = { @@ -80,15 +80,17 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; this.trySecondaryWrite = false; - - this.timeoutMS = options.timeoutMS; } /** Must match the first key of the command object sent to the server. Command name should be stateless (should not use 'this' keyword) */ abstract get commandName(): string; - abstract execute(server: Server, session: ClientSession | undefined): Promise; + abstract execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise; hasAspect(aspect: symbol): boolean { const ctor = this.constructor as { aspects?: Set }; diff --git a/src/operations/profiling_level.ts b/src/operations/profiling_level.ts index 383062c2a40..7c860a244b7 100644 --- a/src/operations/profiling_level.ts +++ b/src/operations/profiling_level.ts @@ -2,6 +2,7 @@ import type { Db } from '../db'; import { MongoUnexpectedServerResponseError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; /** @public */ @@ -20,8 +21,12 @@ export class ProfilingLevelOperation extends CommandOperation { return 'profile' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - const doc = await super.executeCommand(server, session, { profile: -1 }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + const doc = await super.executeCommand(server, session, { profile: -1 }, timeoutContext); if (doc.ok === 1) { const was = doc.was; if (was === 0) return 'off'; diff --git a/src/operations/remove_user.ts b/src/operations/remove_user.ts index ced8e4e1cab..7f484ba89a3 100644 --- a/src/operations/remove_user.ts +++ b/src/operations/remove_user.ts @@ -1,6 +1,7 @@ import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -22,8 +23,12 @@ export class RemoveUserOperation extends CommandOperation { return 'dropUser' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - await super.executeCommand(server, session, { dropUser: this.username }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + await super.executeCommand(server, session, { dropUser: this.username }, timeoutContext); return true; } } diff --git a/src/operations/rename.ts b/src/operations/rename.ts index a27d4afe45a..883be282b64 100644 --- a/src/operations/rename.ts +++ b/src/operations/rename.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -29,7 +30,11 @@ export class RenameOperation extends CommandOperation { return 'renameCollection' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { // Build the command const renameCollection = this.collection.namespace; const toCollection = this.collection.s.namespace.withCollection(this.newName).toString(); @@ -42,7 +47,7 @@ export class RenameOperation extends CommandOperation { dropTarget: dropTarget }; - await super.executeCommand(server, session, command); + await super.executeCommand(server, session, command, timeoutContext); return new Collection(this.collection.s.db, this.newName, this.collection.s.options); } } diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index 56462fa8843..b91e2d0344e 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -5,6 +5,7 @@ import { type TODO_NODE_3286 } from '../mongo_types'; import type { ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { MongoDBNamespace } from '../utils'; import { AbstractOperation } from './operation'; @@ -33,7 +34,11 @@ export class RunCommandOperation extends AbstractOperation { return 'runCommand' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { this.server = server; const res: TODO_NODE_3286 = await server.command( this.ns, @@ -42,7 +47,7 @@ export class RunCommandOperation extends AbstractOperation { ...this.options, readPreference: this.readPreference, session, - timeout: this.timeout + timeoutContext }, this.options.responseType ); @@ -67,13 +72,17 @@ export class RunAdminCommandOperation extends AbstractOperation return 'runCommand' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { this.server = server; const res: TODO_NODE_3286 = await server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session, - timeout: this.timeout + timeoutContext }); return res; } diff --git a/src/operations/search_indexes/create.ts b/src/operations/search_indexes/create.ts index 2ce66f4707e..2870868bc91 100644 --- a/src/operations/search_indexes/create.ts +++ b/src/operations/search_indexes/create.ts @@ -2,6 +2,7 @@ import type { Document } from '../../bson'; import type { Collection } from '../../collection'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @@ -31,14 +32,21 @@ export class CreateSearchIndexesOperation extends AbstractOperation { return 'createSearchIndexes' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command = { createSearchIndexes: namespace.collection, indexes: this.descriptions }; - const res = await server.command(namespace, command, { session }); + const res = await server.command(namespace, command, { + session, + timeoutContext + }); const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? []; return indexesCreated.map(({ name }) => name); diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index ee9acdf850e..28870d3220e 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -3,6 +3,7 @@ import type { Collection } from '../../collection'; import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @internal */ @@ -18,7 +19,11 @@ export class DropSearchIndexOperation extends AbstractOperation { return 'dropSearchIndex' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command: Document = { @@ -30,7 +35,7 @@ export class DropSearchIndexOperation extends AbstractOperation { } try { - await server.command(namespace, command, { session }); + await server.command(namespace, command, { session, timeoutContext }); } catch (error) { const isNamespaceNotFoundError = error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; diff --git a/src/operations/search_indexes/update.ts b/src/operations/search_indexes/update.ts index b6986da9410..e8701d2802e 100644 --- a/src/operations/search_indexes/update.ts +++ b/src/operations/search_indexes/update.ts @@ -2,6 +2,7 @@ import type { Document } from '../../bson'; import type { Collection } from '../../collection'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @internal */ @@ -18,7 +19,11 @@ export class UpdateSearchIndexOperation extends AbstractOperation { return 'updateSearchIndex' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command = { updateSearchIndex: namespace.collection, @@ -26,7 +31,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation { definition: this.definition }; - await server.command(namespace, command, { session }); + await server.command(namespace, command, { session, timeoutContext }); return; } } diff --git a/src/operations/set_profiling_level.ts b/src/operations/set_profiling_level.ts index 9969b2ea3c2..d76473f2632 100644 --- a/src/operations/set_profiling_level.ts +++ b/src/operations/set_profiling_level.ts @@ -2,6 +2,7 @@ import type { Db } from '../db'; import { MongoInvalidArgumentError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { enumToString } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; @@ -53,7 +54,8 @@ export class SetProfilingLevelOperation extends CommandOperation override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const level = this.level; @@ -64,7 +66,7 @@ export class SetProfilingLevelOperation extends CommandOperation } // TODO(NODE-3483): Determine error to put here - await super.executeCommand(server, session, { profile: this.profile }); + await super.executeCommand(server, session, { profile: this.profile }, timeoutContext); return level; } } diff --git a/src/operations/stats.ts b/src/operations/stats.ts index 41c9faf6e24..aafd3bf1bac 100644 --- a/src/operations/stats.ts +++ b/src/operations/stats.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -24,13 +25,17 @@ export class DbStatsOperation extends CommandOperation { return 'dbStats' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const command: Document = { dbStats: true }; if (this.options.scale != null) { command.scale = this.options.scale; } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } diff --git a/src/operations/update.ts b/src/operations/update.ts index ba0ad6d95ff..5b6f396afec 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoInvalidArgumentError, MongoServerError } import type { InferIdType, TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { hasAtomicOperators, type MongoDBNamespace } from '../utils'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; @@ -91,7 +92,11 @@ export class UpdateOperation extends CommandOperation { return this.statements.every(op => op.multi == null || op.multi === false); } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { @@ -122,7 +127,7 @@ export class UpdateOperation extends CommandOperation { } } - const res = await super.executeCommand(server, session, command); + const res = await super.executeCommand(server, session, command, timeoutContext); return res; } } @@ -143,9 +148,10 @@ export class UpdateOneOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -177,9 +183,10 @@ export class UpdateManyOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -230,9 +237,10 @@ export class ReplaceOneOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); diff --git a/src/operations/validate_collection.ts b/src/operations/validate_collection.ts index 4880a703a7a..16ae4cad9e0 100644 --- a/src/operations/validate_collection.ts +++ b/src/operations/validate_collection.ts @@ -3,6 +3,7 @@ import type { Document } from '../bson'; import { MongoUnexpectedServerResponseError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; /** @public */ @@ -37,10 +38,14 @@ export class ValidateCollectionOperation extends CommandOperation { return 'validate' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const collectionName = this.collectionName; - const doc = await super.executeCommand(server, session, this.command); + const doc = await super.executeCommand(server, session, this.command, timeoutContext); if (doc.result != null && typeof doc.result !== 'string') throw new MongoUnexpectedServerResponseError('Error with validation data'); if (doc.result != null && doc.result.match(/exception|corrupt/) != null) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 3d2a3ca1a31..08325086d53 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -40,6 +40,7 @@ import type { ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; import type { GetMoreOptions } from '../operations/get_more'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { isTransactionCommand } from '../transactions'; import { type EventEmitterWithState, @@ -104,6 +105,11 @@ export type ServerEvents = { } & ConnectionPoolEvents & EventEmitterWithState; +/** @internal */ +export type ServerCommandOptions = Omit & { + timeoutContext: TimeoutContext; +}; + /** @internal */ export class Server extends TypedEventEmitter { /** @internal */ @@ -267,20 +273,20 @@ export class Server extends TypedEventEmitter { public async command( ns: MongoDBNamespace, command: Document, - options: CommandOptions | undefined, + options: ServerCommandOptions, responseType: T | undefined ): Promise>; public async command( ns: MongoDBNamespace, command: Document, - options?: CommandOptions + options: ServerCommandOptions ): Promise; public async command( ns: MongoDBNamespace, cmd: Document, - options: CommandOptions, + options: ServerCommandOptions, responseType?: MongoDBResponseConstructor ): Promise { if (ns.db == null || typeof ns === 'string') { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 4c9d71d807d..6117b5317cd 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -34,11 +34,10 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong import { TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; -import { Timeout, TimeoutError } from '../timeout'; +import { Timeout, TimeoutContext, TimeoutError } from '../timeout'; import type { Transaction } from '../transactions'; import { type Callback, - csotMin, type EventEmitterWithState, HostAddress, List, @@ -179,8 +178,11 @@ export interface SelectServerOptions { session?: ClientSession; operationName: string; previousServer?: ServerDescription; - /** @internal*/ - timeout?: Timeout; + /** + * @internal + * TODO(NODE-5685): Make this required + * */ + timeoutContext?: TimeoutContext; } /** @public */ @@ -458,13 +460,20 @@ export class Topology extends TypedEventEmitter { } } - const timeoutMS = this.client.options.timeoutMS; - const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined; + const timeoutMS = this.client.s.options.timeoutMS; + const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS; const readPreference = options.readPreference ?? ReadPreference.primary; + + const timeoutContext = TimeoutContext.create({ + timeoutMS, + serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS + }); + const selectServerOptions = { operationName: 'ping', - timeout, - ...options + ...options, + timeoutContext }; try { const server = await this.selectServer( @@ -474,7 +483,7 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout }); + await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext }); stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); @@ -563,24 +572,10 @@ export class Topology extends TypedEventEmitter { new ServerSelectionStartedEvent(selector, this.description, options.operationName) ); } - const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0; - let timeout: Timeout | null; - if (options.timeout) { - // CSOT Enabled - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - timeout = options.timeout; - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); - } - } else { - timeout = null; - } - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); + let timeout; + if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout; + else { + timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); } const isSharded = this.description.type === TopologyType.Sharded; @@ -604,7 +599,7 @@ export class Topology extends TypedEventEmitter { ) ); } - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); return transaction.server; } @@ -654,7 +649,7 @@ export class Topology extends TypedEventEmitter { ); } - if (options.timeout) { + if (options.timeoutContext?.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during server selection', { cause: timeoutError }); @@ -664,7 +659,7 @@ export class Topology extends TypedEventEmitter { // Other server selection error throw error; } finally { - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); } } /** diff --git a/src/timeout.ts b/src/timeout.ts index 7af1a23f261..3d65992a02b 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -1,7 +1,7 @@ import { clearTimeout, setTimeout } from 'timers'; -import { MongoInvalidArgumentError } from './error'; -import { noop } from './utils'; +import { MongoInvalidArgumentError, MongoRuntimeError } from './error'; +import { csotMin, noop } from './utils'; /** @internal */ export class TimeoutError extends Error { @@ -107,3 +107,165 @@ export class Timeout extends Promise { ); } } + +/** @internal */ +export type TimeoutContextOptions = LegacyTimeoutContextOptions | CSOTTimeoutContextOptions; + +/** @internal */ +export type LegacyTimeoutContextOptions = { + serverSelectionTimeoutMS: number; + waitQueueTimeoutMS: number; + socketTimeoutMS?: number; +}; + +/** @internal */ +export type CSOTTimeoutContextOptions = { + timeoutMS: number; + serverSelectionTimeoutMS: number; + socketTimeoutMS?: number; +}; + +function isLegacyTimeoutContextOptions(v: unknown): v is LegacyTimeoutContextOptions { + return ( + v != null && + typeof v === 'object' && + 'serverSelectionTimeoutMS' in v && + typeof v.serverSelectionTimeoutMS === 'number' && + 'waitQueueTimeoutMS' in v && + typeof v.waitQueueTimeoutMS === 'number' + ); +} + +function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions { + return ( + v != null && + typeof v === 'object' && + 'serverSelectionTimeoutMS' in v && + typeof v.serverSelectionTimeoutMS === 'number' && + 'timeoutMS' in v && + typeof v.timeoutMS === 'number' + ); +} + +/** @internal */ +export abstract class TimeoutContext { + static create(options: TimeoutContextOptions): TimeoutContext { + if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); + else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); + else throw new MongoRuntimeError('Unrecognized options'); + } + + abstract get serverSelectionTimeout(): Timeout | null; + + abstract get connectionCheckoutTimeout(): Timeout | null; + + abstract get clearServerSelectionTimeout(): boolean; + + abstract get clearConnectionCheckoutTimeout(): boolean; + + abstract csotEnabled(): this is CSOTTimeoutContext; +} + +/** @internal */ +export class CSOTTimeoutContext extends TimeoutContext { + timeoutMS: number; + serverSelectionTimeoutMS: number; + socketTimeoutMS?: number; + + clearConnectionCheckoutTimeout: boolean; + clearServerSelectionTimeout: boolean; + + private _maxTimeMS?: number; + + private _serverSelectionTimeout?: Timeout | null; + private _connectionCheckoutTimeout?: Timeout | null; + + constructor(options: CSOTTimeoutContextOptions) { + super(); + this.timeoutMS = options.timeoutMS; + + this.serverSelectionTimeoutMS = options.serverSelectionTimeoutMS; + + this.socketTimeoutMS = options.socketTimeoutMS; + + this.clearServerSelectionTimeout = false; + this.clearConnectionCheckoutTimeout = true; + } + + get maxTimeMS(): number { + return this._maxTimeMS ?? -1; + } + + set maxTimeMS(v: number) { + this._maxTimeMS = v; + } + + csotEnabled(): this is CSOTTimeoutContext { + return true; + } + + get serverSelectionTimeout(): Timeout | null { + // check for undefined + if (typeof this._serverSelectionTimeout !== 'object') { + const usingServerSelectionTimeoutMS = + this.serverSelectionTimeoutMS !== 0 && + csotMin(this.timeoutMS, this.serverSelectionTimeoutMS) === this.serverSelectionTimeoutMS; + + if (usingServerSelectionTimeoutMS) { + this._serverSelectionTimeout = Timeout.expires(this.serverSelectionTimeoutMS); + } else { + if (this.timeoutMS > 0) { + this._serverSelectionTimeout = Timeout.expires(this.timeoutMS); + } else { + this._serverSelectionTimeout = null; + } + } + } + + return this._serverSelectionTimeout; + } + + get connectionCheckoutTimeout(): Timeout | null { + if (typeof this._connectionCheckoutTimeout !== 'object') { + if (typeof this._serverSelectionTimeout === 'object') { + // null or Timeout + this._connectionCheckoutTimeout = this._serverSelectionTimeout; + } else { + throw new MongoRuntimeError( + 'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira' + ); + } + } + return this._connectionCheckoutTimeout; + } +} + +/** @internal */ +export class LegacyTimeoutContext extends TimeoutContext { + options: LegacyTimeoutContextOptions; + clearServerSelectionTimeout: boolean; + clearConnectionCheckoutTimeout: boolean; + + constructor(options: LegacyTimeoutContextOptions) { + super(); + this.options = options; + this.clearServerSelectionTimeout = true; + this.clearConnectionCheckoutTimeout = true; + } + + csotEnabled(): this is CSOTTimeoutContext { + return false; + } + + get serverSelectionTimeout(): Timeout | null { + if (this.options.serverSelectionTimeoutMS != null && this.options.serverSelectionTimeoutMS > 0) + return Timeout.expires(this.options.serverSelectionTimeoutMS); + return null; + } + + get connectionCheckoutTimeout(): Timeout | null { + if (this.options.waitQueueTimeoutMS != null && this.options.waitQueueTimeoutMS > 0) + return Timeout.expires(this.options.waitQueueTimeoutMS); + return null; + } +} diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts index c1426d8db1d..c4989f58d7f 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts @@ -33,16 +33,20 @@ describe('CSOT spec unit tests', function () { client = this.configuration.newClient({ timeoutMS: 1000 }); // Spy on connection checkout and pull options argument const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut'); - const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer'); const expiresSpy = sinon.spy(Timeout, 'expires'); await client.db('db').collection('collection').insertOne({ x: 1 }); expect(checkoutSpy).to.have.been.calledOnce; - expect(checkoutSpy.firstCall.args[0].timeout).to.exist; + const timeoutContext = checkoutSpy.lastCall.args[0].timeoutContext; + expect(timeoutContext).to.exist; // Check that we passed through the timeout - expect(checkoutSpy.firstCall.args[0].timeout).to.equal( - selectServerSpy.lastCall.lastArg.timeout + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.be.instanceOf(Timeout); + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.equal( + // @ts-expect-error accessing private properties + timeoutContext._connectionCheckoutTimeout ); // Check that no more Timeouts are constructed after we enter checkout diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 5636eb00db7..17d85ba5b23 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -143,7 +143,7 @@ describe('CSOT driver tests', () => { }); it('throws a MongoOperationTimeoutError', { - metadata: { requires: { mongodb: '>=4.4' } }, + metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } }, test: async function () { const commandsStarted = []; client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index 56fd5ba92c6..69866981b0c 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -12,7 +12,8 @@ import { makeClientMetadata, type MongoClient, type Server, - shuffle + shuffle, + TimeoutContext } from '../mongodb'; import { isAnyRequirementSatisfied } from './unified-spec-runner/unified-utils'; import { type FailPoint, sleep } from './utils'; @@ -191,7 +192,14 @@ const compareInputToSpec = (input, expected, message) => { const getTestOpDefinitions = (threadContext: ThreadContext) => ({ checkOut: async function (op) { - const connection: Connection = await ConnectionPool.prototype.checkOut.call(threadContext.pool); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: threadContext.pool.options.waitQueueTimeoutMS + }); + const connection: Connection = await ConnectionPool.prototype.checkOut.call( + threadContext.pool, + { timeoutContext } + ); if (op.label != null) { threadContext.connections.set(op.label, connection); } else { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 18048befab4..1604cd82d86 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -10,8 +10,10 @@ const { ns, isHello } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); const { topologyWithPlaceholderClient } = require('../../tools/utils'); const { MongoClientAuthProviders } = require('../../mongodb'); +const { TimeoutContext } = require('../../mongodb'); describe('Connection Pool', function () { + let timeoutContext; let mockMongod; const stubServer = { topology: { @@ -44,6 +46,10 @@ describe('Connection Pool', function () { }) ); + beforeEach(() => { + timeoutContext = TimeoutContext.create({ waitQueueTimeoutMS: 0, serverSelectionTimeoutMS: 0 }); + }); + it('should destroy connections which have been closed', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; @@ -64,8 +70,10 @@ describe('Connection Pool', function () { const events = []; pool.on('connectionClosed', event => events.push(event)); - const conn = await pool.checkOut(); - const error = await conn.command(ns('admin.$cmd'), { ping: 1 }, {}).catch(error => error); + const conn = await pool.checkOut({ timeoutContext }); + const error = await conn + .command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext }) + .catch(error => error); expect(error).to.be.instanceOf(Error); pool.checkIn(conn); @@ -93,7 +101,7 @@ describe('Connection Pool', function () { pool.ready(); - const conn = await pool.checkOut(); + const conn = await pool.checkOut({ timeoutContext }); const maybeError = await conn.command(ns('admin.$cmd'), { ping: 1 }, undefined).catch(e => e); expect(maybeError).to.be.instanceOf(MongoError); expect(maybeError).to.match(/timed out/); @@ -114,11 +122,15 @@ describe('Connection Pool', function () { waitQueueTimeoutMS: 200, hostAddress: mockMongod.hostAddress() }); + const timeoutContext = TimeoutContext.create({ + waitQueueTimeoutMS: 200, + serverSelectionTimeoutMS: 0 + }); pool.ready(); - const conn = await pool.checkOut(); - const err = await pool.checkOut().catch(e => e); + const conn = await pool.checkOut({ timeoutContext }); + const err = await pool.checkOut({ timeoutContext }).catch(e => e); expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 6bab40d0318..bdc049cbc4f 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -28,6 +28,7 @@ import { ns, PoolClosedError as MongoPoolClosedError, setDifference, + TimeoutContext, type TopologyDescription, type TopologyOptions, WaitQueueTimeoutError as MongoWaitQueueTimeoutError @@ -376,11 +377,17 @@ describe('MongoErrors', () => { { replicaSet: 'rs' } as TopologyOptions ); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); return replSet .connect() - .then(topology => topology.selectServer('primary', {})) + .then(topology => topology.selectServer('primary', { timeoutContext })) .then(server => - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), { + timeoutContext + }) ) .then( () => expect.fail('expected command to fail'), @@ -419,10 +426,14 @@ describe('MongoErrors', () => { if (err) { return cleanup(err); } + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); - topology.selectServer('primary', {}).then(server => { + topology.selectServer('primary', { timeoutContext }).then(server => { server - .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), { timeoutContext }) .then(expect.fail, err => { let _err; try { diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index f79da44e22f..17bc20f6fa7 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -69,7 +69,7 @@ describe('GetMoreOperation', function () { const call = stub.getCall(0); expect(call.args[0]).to.equal(namespace); expect(call.args[1]).to.deep.equal(expectedGetMoreCommand); - expect(call.args[2]).to.deep.equal(opts); + expect(call.args[2]).to.containSubset(opts); }); }); diff --git a/test/unit/sdam/topology.test.ts b/test/unit/sdam/topology.test.ts index e4a34417d50..5264b5d9c45 100644 --- a/test/unit/sdam/topology.test.ts +++ b/test/unit/sdam/topology.test.ts @@ -17,6 +17,7 @@ import { Server, SrvPoller, SrvPollingEvent, + TimeoutContext, Topology, TopologyDescription, TopologyDescriptionChangedEvent, @@ -108,17 +109,28 @@ describe('Topology (unit)', function () { const topology = topologyWithPlaceholderClient(mockServer.hostAddress(), {}); topology.connect().then(() => { - topology.selectServer('primary', {}).then(server => { - server.command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }).then( - () => expect.fail('expected command to fail'), - err => { - expect(err).to.exist; - expect(err).to.match(/timed out/); - topology.close(); - done(); - } - ); - }, expect.fail); + const ctx = TimeoutContext.create({ + waitQueueTimeoutMS: 0, + serverSelectionTimeoutMS: 0, + socketTimeoutMS: 250 + }); + topology + .selectServer('primary', { + timeoutContext: ctx + }) + .then(server => { + server + .command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250, timeoutContext: ctx }) + .then( + () => expect.fail('expected command to fail'), + err => { + expect(err).to.exist; + expect(err).to.match(/timed out/); + topology.close(); + done(); + } + ); + }, expect.fail); }, expect.fail); }); }); @@ -217,10 +229,16 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; }); @@ -245,11 +263,17 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; topology.close(); @@ -269,14 +293,20 @@ describe('Topology (unit)', function () { topology = topologyWithPlaceholderClient(mockServer.hostAddress(), {}); await topology.connect(); + const timeoutContext = TimeoutContext.create({ + waitQueueTimeoutMS: 0, + serverSelectionTimeoutMS: 0 + }); const server = await topology.selectServer('primary', {}); let serverDescription; server.on('descriptionReceived', sd => (serverDescription = sd)); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown'); }); diff --git a/test/unit/timeout.test.ts b/test/unit/timeout.test.ts index 3fafc21b35f..119d0516a9c 100644 --- a/test/unit/timeout.test.ts +++ b/test/unit/timeout.test.ts @@ -1,6 +1,14 @@ import { expect } from 'chai'; -import { MongoInvalidArgumentError, Timeout, TimeoutError } from '../mongodb'; +import { + CSOTTimeoutContext, + LegacyTimeoutContext, + MongoInvalidArgumentError, + MongoRuntimeError, + Timeout, + TimeoutContext, + TimeoutError +} from '../mongodb'; describe('Timeout', function () { let timeout: Timeout; @@ -115,3 +123,197 @@ describe('Timeout', function () { }); }); }); + +describe('TimeoutContext', function () { + describe('TimeoutContext.create', function () { + context('when timeoutMS is a number', function () { + it('returns a CSOTTimeoutContext instance', function () { + const ctx = TimeoutContext.create({ + timeoutMS: 0, + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + + expect(ctx).to.be.instanceOf(CSOTTimeoutContext); + }); + }); + + context('when timeoutMS is undefined', function () { + it('returns a LegacyTimeoutContext instance', function () { + const ctx = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + + expect(ctx).to.be.instanceOf(LegacyTimeoutContext); + }); + }); + }); + + describe('CSOTTimeoutContext', function () { + let ctx: CSOTTimeoutContext; + + describe('get serverSelectionTimeout()', function () { + let timeout: Timeout | null; + + afterEach(() => { + timeout?.clear(); + }); + + context('when timeoutMS is 0 and serverSelectionTimeoutMS is 0', function () { + it('returns null', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 0, + serverSelectionTimeoutMS: 0 + }); + + expect(ctx.serverSelectionTimeout).to.be.null; + }); + }); + + context('when timeoutMS is 0 and serverSelectionTimeoutMS is >0', function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 0, + serverSelectionTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.instanceOf(Timeout); + + expect(timeout.duration).to.equal(ctx.serverSelectionTimeoutMS); + }); + }); + + context( + 'when timeoutMS is >0 serverSelectionTimeoutMS is >0 and timeoutMS > serverSelectionTimeoutMS', + function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 15, + serverSelectionTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.exist; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.serverSelectionTimeoutMS); + }); + } + ); + + context( + 'when timeoutMS is >0, serverSelectionTimeoutMS is >0 and timeoutMS < serverSelectionTimeoutMS', + function () { + it('returns a Timeout instance with duration set to timeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 10, + serverSelectionTimeoutMS: 15 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.exist; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.timeoutMS); + }); + } + ); + }); + + describe('get connectionCheckoutTimeout()', function () { + context('when called before get serverSelectionTimeout()', function () { + it('throws a MongoRuntimeError', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 100, + serverSelectionTimeoutMS: 15 + }); + + expect(() => ctx.connectionCheckoutTimeout).to.throw(MongoRuntimeError); + }); + }); + + context('when called after get serverSelectionTimeout()', function () { + let serverSelectionTimeout: Timeout; + let connectionCheckoutTimeout: Timeout; + + afterEach(() => { + serverSelectionTimeout.clear(); + connectionCheckoutTimeout.clear(); + }); + + it('returns same timeout as serverSelectionTimeout', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 100, + serverSelectionTimeoutMS: 86 + }); + serverSelectionTimeout = ctx.serverSelectionTimeout; + connectionCheckoutTimeout = ctx.connectionCheckoutTimeout; + + expect(connectionCheckoutTimeout).to.exist; + expect(connectionCheckoutTimeout).to.equal(serverSelectionTimeout); + }); + }); + }); + }); + + describe('LegacyTimeoutContext', function () { + let timeout: Timeout | null; + + afterEach(() => { + timeout?.clear(); + }); + + describe('get serverSelectionTimeout()', function () { + context('when serverSelectionTimeoutMS > 0', function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 100, + waitQueueTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.options.serverSelectionTimeoutMS); + }); + }); + + context('when serverSelectionTimeoutMS = 0', function () { + it('returns null', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.null; + }); + }); + }); + + describe('get connectionCheckoutTimeout()', function () { + context('when waitQueueTimeoutMS > 0', function () { + it('returns a Timeout instance with duration set to waitQueueTimeoutMS', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 10, + waitQueueTimeoutMS: 20 + }); + timeout = ctx.connectionCheckoutTimeout; + + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.options.waitQueueTimeoutMS); + }); + }); + + context('when waitQueueTimeoutMS = 0', function () { + it('returns null', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 10, + waitQueueTimeoutMS: 0 + }); + + expect(ctx.connectionCheckoutTimeout).to.be.null; + }); + }); + }); + }); +});