diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 892952c3cf..62ef274dbd 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments } from '../utils'; import { applyRetryableWrites, @@ -874,6 +875,9 @@ export interface BulkWriteOptions extends CommandOperationOptions { forceServerObjectId?: boolean; /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */ let?: Document; + + /** @internal */ + timeoutContext?: TimeoutContext; } const executeCommandsAsync = promisify(executeCommands); diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 115da60db2..05e1fce44d 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type TimeoutContext } from '../timeout'; import { BufferPool, calculateDurationInMs, @@ -95,7 +95,7 @@ export interface CommandOptions extends BSONSerializeOptions { directConnection?: boolean; /** @internal */ - timeout?: Timeout; + timeoutContext?: TimeoutContext; } /** @public */ diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 83d36adaf6..25591ca1d9 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -27,8 +27,8 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { Timeout, TimeoutError } from '../timeout'; -import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils'; +import { type TimeoutContext, TimeoutError } from '../timeout'; +import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - async checkOut(options?: { timeout?: Timeout }): Promise { + async checkOut(options: { timeoutContext: TimeoutContext }): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) ); - const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; - const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS; - const { promise, resolve, reject } = promiseWithResolvers(); - let timeout: Timeout | null = null; - if (options?.timeout) { - // CSOT enabled - // Determine if we're using the timeout passed in or a new timeout - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - // This check determines whether or not Topology.selectServer used the configured - // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - // server selection used `timeoutMS`, so we should use the existing timeout as the timeout - // here - timeout = options.timeout; - } else { - // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with - // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut - // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking - timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed); - } - } - } else { - timeout = Timeout.expires(waitQueueTimeoutMS); - } + const timeout = options.timeoutContext.connectionCheckoutTimeout; const waitQueueMember: WaitQueueMember = { resolve, @@ -403,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter { return await (timeout ? Promise.race([promise, timeout]) : promise); } catch (error) { if (TimeoutError.is(error)) { + timeout?.clear(); waitQueueMember[kCancelled] = true; this.emitAndLog( @@ -415,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter { : 'Timed out while checking out a connection from connection pool', this.address ); - if (options?.timeout) { + if (options.timeoutContext.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during connection checkout', { cause: timeoutError }); @@ -424,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter { } throw error; } finally { - if (timeout !== options?.timeout) timeout?.clear(); + if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear(); } } diff --git a/src/index.ts b/src/index.ts index 5c27aacd73..33c9905f6a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -526,7 +526,13 @@ export type { RTTSampler, ServerMonitoringMode } from './sdam/monitor'; -export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server'; +export type { + Server, + ServerCommandOptions, + ServerEvents, + ServerOptions, + ServerPrivate +} from './sdam/server'; export type { ServerDescription, ServerDescriptionOptions, @@ -557,7 +563,15 @@ export type { WithTransactionCallback } from './sessions'; export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort'; -export type { Timeout } from './timeout'; +export type { + CSOTTimeoutContext, + CSOTTimeoutContextOptions, + LegacyTimeoutContext, + LegacyTimeoutContextOptions, + Timeout, + TimeoutContext, + TimeoutContextOptions +} from './timeout'; export type { Transaction, TransactionOptions, TxnState } from './transactions'; export type { BufferPool, diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index a3a9349c27..92a1394c15 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -3,6 +3,7 @@ import { MongoInvalidArgumentError } from '../error'; import { type TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; @@ -94,7 +95,11 @@ export class AggregateOperation extends CommandOperation { this.pipeline.push(stage); } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options: AggregateOptions = this.options; const serverWireVersion = maxWireVersion(server); const command: Document = { aggregate: this.target, pipeline: this.pipeline }; @@ -134,7 +139,12 @@ export class AggregateOperation extends CommandOperation { command.cursor.batchSize = options.batchSize; } - const res: TODO_NODE_3286 = await super.executeCommand(server, session, command); + const res: TODO_NODE_3286 = await super.executeCommand( + server, + session, + command, + timeoutContext + ); return res; } } diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 58a143f208..bb5ee83f1a 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -7,6 +7,7 @@ import type { import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { AbstractOperation, Aspect, defineAspects } from './operation'; /** @internal */ @@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const coll = this.collection; const operations = this.operations; - const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; + const options = { + ...this.options, + ...this.bsonOptions, + readPreference: this.readPreference, + timeoutContext + }; // Create the bulk operation const bulk: BulkOperationBase = diff --git a/src/operations/command.ts b/src/operations/command.ts index 156e002476..545456f601 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -6,6 +6,7 @@ import type { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { commandSupportsReadConcern, decorateWithExplain, @@ -110,7 +111,8 @@ export abstract class CommandOperation extends AbstractOperation { async executeCommand( server: Server, session: ClientSession | undefined, - cmd: Document + cmd: Document, + timeoutContext: TimeoutContext ): Promise { // TODO: consider making this a non-enumerable property this.server = server; @@ -118,7 +120,7 @@ export abstract class CommandOperation extends AbstractOperation { const options = { ...this.options, ...this.bsonOptions, - timeout: this.timeout, + timeoutContext, readPreference: this.readPreference, session }; diff --git a/src/operations/count.ts b/src/operations/count.ts index 00aae50172..82330a11e7 100644 --- a/src/operations/count.ts +++ b/src/operations/count.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation { return 'count' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options; const cmd: Document = { count: this.collectionName, @@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation { cmd.maxTimeMS = options.maxTimeMS; } - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return result ? result.n : 0; } } diff --git a/src/operations/count_documents.ts b/src/operations/count_documents.ts index 11fb0c3db6..1ce496d001 100644 --- a/src/operations/count_documents.ts +++ b/src/operations/count_documents.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { AggregateOperation, type AggregateOptions } from './aggregate'; /** @public */ @@ -31,8 +32,12 @@ export class CountDocumentsOperation extends AggregateOperation { super(collection.s.namespace, pipeline, options); } - override async execute(server: Server, session: ClientSession | undefined): Promise { - const result = await super.execute(server, session); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + const result = await super.execute(server, session, timeoutContext); // NOTE: We're avoiding creating a cursor here to reduce the callstack. const response = result as unknown as Document; diff --git a/src/operations/create_collection.ts b/src/operations/create_collection.ts index 8edc7e9a1c..afb2680b9a 100644 --- a/src/operations/create_collection.ts +++ b/src/operations/create_collection.ts @@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error'; import type { PkFactory } from '../mongo_client'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { CreateIndexesOperation } from './indexes'; import { Aspect, defineAspects } from './operation'; @@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation { return 'create' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const db = this.db; const name = this.name; const options = this.options; @@ -155,7 +160,7 @@ export class CreateCollectionOperation extends CommandOperation { unique: true } }); - await createOp.executeWithoutEncryptedFieldsCheck(server, session); + await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } if (!options.encryptedFields) { @@ -163,7 +168,7 @@ export class CreateCollectionOperation extends CommandOperation { } } - const coll = await this.executeWithoutEncryptedFieldsCheck(server, session); + const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); if (encryptedFields) { // Create the required index for queryable encryption support. @@ -173,7 +178,7 @@ export class CreateCollectionOperation extends CommandOperation { { __safeContent__: 1 }, {} ); - await createIndexOp.execute(server, session); + await createIndexOp.execute(server, session, timeoutContext); } return coll; @@ -181,7 +186,8 @@ export class CreateCollectionOperation extends CommandOperation { private async executeWithoutEncryptedFieldsCheck( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const db = this.db; const name = this.name; @@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation { } } // otherwise just execute the command - await super.executeCommand(server, session, cmd); + await super.executeCommand(server, session, cmd, timeoutContext); return new Collection(db, name, options); } } diff --git a/src/operations/delete.ts b/src/operations/delete.ts index e952554ff7..fa3cf089e7 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoServerError } from '../error'; import { type TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; import type { WriteConcernOptions } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; @@ -67,7 +68,8 @@ export class DeleteOperation extends CommandOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; @@ -95,7 +97,12 @@ export class DeleteOperation extends CommandOperation { } } - const res: TODO_NODE_3286 = await super.executeCommand(server, session, command); + const res: TODO_NODE_3286 = await super.executeCommand( + server, + session, + command, + timeoutContext + ); return res; } } @@ -107,9 +114,10 @@ export class DeleteOneOperation extends DeleteOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -127,9 +135,10 @@ export class DeleteManyOperation extends DeleteOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); diff --git a/src/operations/distinct.ts b/src/operations/distinct.ts index 4fda285d88..51f2a362d8 100644 --- a/src/operations/distinct.ts +++ b/src/operations/distinct.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { decorateWithCollation, decorateWithReadConcern } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -42,7 +43,11 @@ export class DistinctOperation extends CommandOperation { return 'distinct' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const coll = this.collection; const key = this.key; const query = this.query; @@ -72,7 +77,7 @@ export class DistinctOperation extends CommandOperation { // Have we specified collation decorateWithCollation(cmd, coll, options); - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return this.explain ? result : result.values; } diff --git a/src/operations/drop.ts b/src/operations/drop.ts index 15624d4c07..787bb6e7d0 100644 --- a/src/operations/drop.ts +++ b/src/operations/drop.ts @@ -3,6 +3,7 @@ import type { Db } from '../db'; import { MONGODB_ERROR_CODES, MongoServerError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -29,7 +30,11 @@ export class DropCollectionOperation extends CommandOperation { return 'drop' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const db = this.db; const options = this.options; const name = this.name; @@ -57,7 +62,7 @@ export class DropCollectionOperation extends CommandOperation { // Drop auxilliary collections, ignoring potential NamespaceNotFound errors. const dropOp = new DropCollectionOperation(db, collectionName); try { - await dropOp.executeWithoutEncryptedFieldsCheck(server, session); + await dropOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } catch (err) { if ( !(err instanceof MongoServerError) || @@ -69,14 +74,15 @@ export class DropCollectionOperation extends CommandOperation { } } - return await this.executeWithoutEncryptedFieldsCheck(server, session); + return await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext); } private async executeWithoutEncryptedFieldsCheck( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - await super.executeCommand(server, session, { drop: this.name }); + await super.executeCommand(server, session, { drop: this.name }, timeoutContext); return true; } } @@ -96,8 +102,12 @@ export class DropDatabaseOperation extends CommandOperation { return 'dropDatabase' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - await super.executeCommand(server, session, { dropDatabase: 1 }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + await super.executeCommand(server, session, { dropDatabase: 1 }, timeoutContext); return true; } } diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index c1d6c38199..5ab5aa4c30 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -30,7 +31,11 @@ export class EstimatedDocumentCountOperation extends CommandOperation { return 'count' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const cmd: Document = { count: this.collectionName }; if (typeof this.options.maxTimeMS === 'number') { @@ -43,7 +48,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation { cmd.comment = this.options.comment; } - const response = await super.executeCommand(server, session, cmd); + const response = await super.executeCommand(server, session, cmd, timeoutContext); return response?.n || 0; } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 46a94e56ec..bf2db7b214 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -27,7 +27,7 @@ import { } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; -import { Timeout } from '../timeout'; +import { TimeoutContext } from '../timeout'; import { squashError, supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; @@ -73,7 +73,7 @@ export interface ExecutionResult { export async function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(client: MongoClient, operation: T): Promise { +>(client: MongoClient, operation: T, timeoutContext?: TimeoutContext): Promise { if (!(operation instanceof AbstractOperation)) { // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); @@ -118,6 +118,12 @@ export async function executeOperation< ); } + timeoutContext ??= TimeoutContext.create({ + serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, + timeoutMS: operation.options.timeoutMS + }); + const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -153,24 +159,21 @@ export async function executeOperation< selector = readPreference; } - const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined; - operation.timeout = timeout; - const server = await topology.selectServer(selector, { session, operationName: operation.commandName, - timeout + timeoutContext }); if (session == null) { // No session also means it is not retryable, early exit - return await operation.execute(server, undefined); + return await operation.execute(server, undefined, timeoutContext); } if (!operation.hasAspect(Aspect.RETRYABLE)) { // non-retryable operation, early exit try { - return await operation.execute(server, session); + return await operation.execute(server, session, timeoutContext); } finally { if (session?.owner != null && session.owner === owner) { try { @@ -198,14 +201,15 @@ export async function executeOperation< } try { - return await operation.execute(server, session); + return await operation.execute(server, session, timeoutContext); } catch (operationError) { if (willRetry && operationError instanceof MongoError) { return await retryOperation(operation, operationError, { session, topology, selector, - previousServer: server.description + previousServer: server.description, + timeoutContext }); } throw operationError; @@ -226,6 +230,7 @@ type RetryOptions = { topology: Topology; selector: ReadPreference | ServerSelector; previousServer: ServerDescription; + timeoutContext: TimeoutContext; }; async function retryOperation< @@ -234,7 +239,7 @@ async function retryOperation< >( operation: T, originalError: MongoError, - { session, topology, selector, previousServer }: RetryOptions + { session, topology, selector, previousServer, timeoutContext }: RetryOptions ): Promise { const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); @@ -270,9 +275,9 @@ async function retryOperation< // select a new server, and attempt to retry the operation const server = await topology.selectServer(selector, { session, - timeout: operation.timeout, operationName: operation.commandName, - previousServer + previousServer, + timeoutContext }); if (isWriteOperation && !supportsRetryableWrites(server)) { @@ -282,7 +287,7 @@ async function retryOperation< } try { - return await operation.execute(server, session); + return await operation.execute(server, session, timeoutContext); } catch (retryError) { if ( retryError instanceof MongoError && diff --git a/src/operations/find.ts b/src/operations/find.ts index c5cf0da140..173b27e67d 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -4,6 +4,7 @@ import { ReadConcern } from '../read_concern'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort } from '../sort'; +import { type TimeoutContext } from '../timeout'; import { decorateWithExplain, type MongoDBNamespace, normalizeHintField } from '../utils'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; @@ -95,7 +96,11 @@ export class FindOperation extends CommandOperation { return 'find' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { this.server = server; const options = this.options; @@ -113,7 +118,7 @@ export class FindOperation extends CommandOperation { ...this.bsonOptions, documentsReturnedIn: 'firstBatch', session, - timeout: this.timeout + timeoutContext }, undefined ); diff --git a/src/operations/find_and_modify.ts b/src/operations/find_and_modify.ts index 68abe51a7e..7d7c38076e 100644 --- a/src/operations/find_and_modify.ts +++ b/src/operations/find_and_modify.ts @@ -5,6 +5,7 @@ import { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortForCmd } from '../sort'; +import { type TimeoutContext } from '../timeout'; import { decorateWithCollation, hasAtomicOperators, maxWireVersion } from '../utils'; import type { WriteConcern, WriteConcernSettings } from '../write_concern'; import { CommandOperation, type CommandOperationOptions } from './command'; @@ -180,7 +181,11 @@ export class FindAndModifyOperation extends CommandOperation { return 'findAndModify' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const coll = this.collection; const query = this.query; const options = { ...this.options, ...this.bsonOptions }; @@ -213,7 +218,7 @@ export class FindAndModifyOperation extends CommandOperation { } // Execute the command - const result = await super.executeCommand(server, session, cmd); + const result = await super.executeCommand(server, session, cmd, timeoutContext); return options.includeResultMetadata ? result : result.value ?? null; } } diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 05f54b0b57..d95f873ccc 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -3,6 +3,7 @@ import { CursorResponse } from '../cmap/wire_protocol/responses'; import { MongoRuntimeError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation'; @@ -58,7 +59,11 @@ export class GetMoreOperation extends AbstractOperation { * Although there is a server already associated with the get more operation, the signature * for execute passes a server so we will just use that one. */ - override async execute(server: Server, _session: ClientSession | undefined): Promise { + override async execute( + server: Server, + _session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { if (server !== this.server) { throw new MongoRuntimeError('Getmore must run on the same server operation began on'); } @@ -96,6 +101,7 @@ export class GetMoreOperation extends AbstractOperation { const commandOptions = { returnFieldSelector: null, documentsReturnedIn: 'nextBatch', + timeoutContext, ...this.options }; diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index 8bf390f28b..88fcce48dc 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -5,6 +5,7 @@ import { MongoCompatibilityError } from '../error'; import { type OneOrMore } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { isObject, maxWireVersion, type MongoDBNamespace } from '../utils'; import { type CollationOptions, @@ -295,7 +296,11 @@ export class CreateIndexesOperation extends CommandOperation { return 'createIndexes'; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options; const indexes = this.indexes; @@ -315,7 +320,7 @@ export class CreateIndexesOperation extends CommandOperation { // collation is set on each index, it should not be defined at the root this.options.collation = undefined; - await super.executeCommand(server, session, cmd); + await super.executeCommand(server, session, cmd, timeoutContext); const indexNames = indexes.map(index => index.name || ''); return indexNames; @@ -343,9 +348,13 @@ export class DropIndexOperation extends CommandOperation { return 'dropIndexes' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const cmd = { dropIndexes: this.collection.collectionName, index: this.indexName }; - return await super.executeCommand(server, session, cmd); + return await super.executeCommand(server, session, cmd, timeoutContext); } } @@ -376,7 +385,11 @@ export class ListIndexesOperation extends CommandOperation { return 'listIndexes' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const serverWireVersion = maxWireVersion(server); const cursor = this.options.batchSize ? { batchSize: this.options.batchSize } : {}; @@ -389,7 +402,7 @@ export class ListIndexesOperation extends CommandOperation { command.comment = this.options.comment; } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 6621f6c0e4..45b3478e4a 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -5,6 +5,7 @@ import { MongoInvalidArgumentError, MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; import { maybeAddIdToDocuments } from '../utils'; import { WriteConcern } from '../write_concern'; @@ -28,7 +29,11 @@ export class InsertOperation extends CommandOperation { return 'insert' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { @@ -47,7 +52,7 @@ export class InsertOperation extends CommandOperation { command.comment = options.comment; } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } @@ -74,9 +79,10 @@ export class InsertOneOperation extends InsertOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res = await super.execute(server, session); + const res = await super.execute(server, session, timeoutContext); if (res.code) throw new MongoServerError(res); if (res.writeErrors) { // This should be a WriteError but we can't change it now because of error hierarchy @@ -124,7 +130,8 @@ export class InsertManyOperation extends AbstractOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const coll = this.collection; const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; @@ -138,7 +145,7 @@ export class InsertManyOperation extends AbstractOperation { ); try { - const res = await bulkWriteOperation.execute(server, session); + const res = await bulkWriteOperation.execute(server, session, timeoutContext); return { acknowledged: writeConcern?.w !== 0, insertedCount: res.insertedCount, diff --git a/src/operations/kill_cursors.ts b/src/operations/kill_cursors.ts index 356230e9c7..72c6a04b27 100644 --- a/src/operations/kill_cursors.ts +++ b/src/operations/kill_cursors.ts @@ -2,6 +2,7 @@ import type { Long } from '../bson'; import { MongoRuntimeError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { type MongoDBNamespace, squashError } from '../utils'; import { AbstractOperation, Aspect, defineAspects, type OperationOptions } from './operation'; @@ -29,7 +30,11 @@ export class KillCursorsOperation extends AbstractOperation { return 'killCursors' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { if (server !== this.server) { throw new MongoRuntimeError('Killcursor must run on the same server operation began on'); } @@ -46,7 +51,10 @@ export class KillCursorsOperation extends AbstractOperation { cursors: [this.cursorId] }; try { - await server.command(this.ns, killCursorsCommand, { session }); + await server.command(this.ns, killCursorsCommand, { + session, + timeoutContext + }); } catch (error) { // The driver should never emit errors from killCursors, this is spec-ed behavior squashError(error); diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 2d3819cac9..f3fd01b894 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -2,6 +2,7 @@ import type { Binary, Document } from '../bson'; import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -51,11 +52,16 @@ export class ListCollectionsOperation extends CommandOperation { return 'listCollections' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { return await super.executeCommand( server, session, - this.generateCommand(maxWireVersion(server)) + this.generateCommand(maxWireVersion(server)), + timeoutContext ); } diff --git a/src/operations/list_databases.ts b/src/operations/list_databases.ts index 5ad9142a1a..bd740d50c6 100644 --- a/src/operations/list_databases.ts +++ b/src/operations/list_databases.ts @@ -3,6 +3,7 @@ import type { Db } from '../db'; import { type TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { maxWireVersion, MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -41,7 +42,8 @@ export class ListDatabasesOperation extends CommandOperation { const cmd: Document = { listDatabases: 1 }; @@ -63,7 +65,12 @@ export class ListDatabasesOperation extends CommandOperation); + return await (super.executeCommand( + server, + session, + cmd, + timeoutContext + ) as Promise); } } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 114980893a..3e7c5fe7a3 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type Timeout, type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; export const Aspect = { @@ -82,15 +82,17 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; this.trySecondaryWrite = false; - - this.timeoutMS = options.timeoutMS; } /** Must match the first key of the command object sent to the server. Command name should be stateless (should not use 'this' keyword) */ abstract get commandName(): string; - abstract execute(server: Server, session: ClientSession | undefined): Promise; + abstract execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise; hasAspect(aspect: symbol): boolean { const ctor = this.constructor as OperationConstructor; diff --git a/src/operations/profiling_level.ts b/src/operations/profiling_level.ts index 383062c2a4..7c860a244b 100644 --- a/src/operations/profiling_level.ts +++ b/src/operations/profiling_level.ts @@ -2,6 +2,7 @@ import type { Db } from '../db'; import { MongoUnexpectedServerResponseError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; /** @public */ @@ -20,8 +21,12 @@ export class ProfilingLevelOperation extends CommandOperation { return 'profile' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - const doc = await super.executeCommand(server, session, { profile: -1 }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + const doc = await super.executeCommand(server, session, { profile: -1 }, timeoutContext); if (doc.ok === 1) { const was = doc.was; if (was === 0) return 'off'; diff --git a/src/operations/remove_user.ts b/src/operations/remove_user.ts index ced8e4e1ca..7f484ba89a 100644 --- a/src/operations/remove_user.ts +++ b/src/operations/remove_user.ts @@ -1,6 +1,7 @@ import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -22,8 +23,12 @@ export class RemoveUserOperation extends CommandOperation { return 'dropUser' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { - await super.executeCommand(server, session, { dropUser: this.username }); + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { + await super.executeCommand(server, session, { dropUser: this.username }, timeoutContext); return true; } } diff --git a/src/operations/rename.ts b/src/operations/rename.ts index a27d4afe45..883be282b6 100644 --- a/src/operations/rename.ts +++ b/src/operations/rename.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import { Collection } from '../collection'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { MongoDBNamespace } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -29,7 +30,11 @@ export class RenameOperation extends CommandOperation { return 'renameCollection' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { // Build the command const renameCollection = this.collection.namespace; const toCollection = this.collection.s.namespace.withCollection(this.newName).toString(); @@ -42,7 +47,7 @@ export class RenameOperation extends CommandOperation { dropTarget: dropTarget }; - await super.executeCommand(server, session, command); + await super.executeCommand(server, session, command, timeoutContext); return new Collection(this.collection.s.db, this.newName, this.collection.s.options); } } diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index 79198d9be7..bd1a9b38c8 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -4,6 +4,7 @@ import { type TODO_NODE_3286 } from '../mongo_types'; import type { ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { MongoDBNamespace } from '../utils'; import { AbstractOperation } from './operation'; @@ -28,12 +29,16 @@ export class RunCommandOperation extends AbstractOperation { return 'runCommand' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { this.server = server; const res: TODO_NODE_3286 = await server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, - timeout: this.timeout, + timeoutContext, session }); return res; @@ -56,13 +61,17 @@ export class RunAdminCommandOperation extends AbstractOperation return 'runCommand' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { this.server = server; const res: TODO_NODE_3286 = await server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session, - timeout: this.timeout + timeoutContext }); return res; } diff --git a/src/operations/search_indexes/create.ts b/src/operations/search_indexes/create.ts index 7e5e55d18d..9661026e3e 100644 --- a/src/operations/search_indexes/create.ts +++ b/src/operations/search_indexes/create.ts @@ -3,6 +3,7 @@ import type { Document } from 'bson'; import type { Collection } from '../../collection'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @@ -32,14 +33,21 @@ export class CreateSearchIndexesOperation extends AbstractOperation { return 'createSearchIndexes' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command = { createSearchIndexes: namespace.collection, indexes: this.descriptions }; - const res = await server.command(namespace, command, { session }); + const res = await server.command(namespace, command, { + session, + timeoutContext + }); const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? []; return indexesCreated.map(({ name }) => name); diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index 05a56f1c47..fa17e6135d 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -4,6 +4,7 @@ import type { Collection } from '../../collection'; import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @internal */ @@ -16,7 +17,11 @@ export class DropSearchIndexOperation extends AbstractOperation { return 'dropSearchIndex' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command: Document = { @@ -28,7 +33,7 @@ export class DropSearchIndexOperation extends AbstractOperation { } try { - await server.command(namespace, command, { session }); + await server.command(namespace, command, { session, timeoutContext }); } catch (error) { const isNamespaceNotFoundError = error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; diff --git a/src/operations/search_indexes/update.ts b/src/operations/search_indexes/update.ts index aad7f93536..e88e777d67 100644 --- a/src/operations/search_indexes/update.ts +++ b/src/operations/search_indexes/update.ts @@ -3,6 +3,7 @@ import type { Document } from 'bson'; import type { Collection } from '../../collection'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; +import { type TimeoutContext } from '../../timeout'; import { AbstractOperation } from '../operation'; /** @internal */ @@ -19,7 +20,11 @@ export class UpdateSearchIndexOperation extends AbstractOperation { return 'updateSearchIndex' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const namespace = this.collection.fullNamespace; const command = { updateSearchIndex: namespace.collection, @@ -27,7 +32,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation { definition: this.definition }; - await server.command(namespace, command, { session }); + await server.command(namespace, command, { session, timeoutContext }); return; } } diff --git a/src/operations/set_profiling_level.ts b/src/operations/set_profiling_level.ts index 9969b2ea3c..d76473f263 100644 --- a/src/operations/set_profiling_level.ts +++ b/src/operations/set_profiling_level.ts @@ -2,6 +2,7 @@ import type { Db } from '../db'; import { MongoInvalidArgumentError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { enumToString } from '../utils'; import { CommandOperation, type CommandOperationOptions } from './command'; @@ -53,7 +54,8 @@ export class SetProfilingLevelOperation extends CommandOperation override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { const level = this.level; @@ -64,7 +66,7 @@ export class SetProfilingLevelOperation extends CommandOperation } // TODO(NODE-3483): Determine error to put here - await super.executeCommand(server, session, { profile: this.profile }); + await super.executeCommand(server, session, { profile: this.profile }, timeoutContext); return level; } } diff --git a/src/operations/stats.ts b/src/operations/stats.ts index 41c9faf6e2..aafd3bf1ba 100644 --- a/src/operations/stats.ts +++ b/src/operations/stats.ts @@ -2,6 +2,7 @@ import type { Document } from '../bson'; import type { Db } from '../db'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects } from './operation'; @@ -24,13 +25,17 @@ export class DbStatsOperation extends CommandOperation { return 'dbStats' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const command: Document = { dbStats: true }; if (this.options.scale != null) { command.scale = this.options.scale; } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } diff --git a/src/operations/update.ts b/src/operations/update.ts index a863afdc68..2cff52ad76 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoInvalidArgumentError, MongoServerError } import type { InferIdType, TODO_NODE_3286 } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { hasAtomicOperators, type MongoDBNamespace } from '../utils'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; @@ -91,7 +92,11 @@ export class UpdateOperation extends CommandOperation { return this.statements.every(op => op.multi == null || op.multi === false); } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { @@ -122,7 +127,7 @@ export class UpdateOperation extends CommandOperation { } } - return await super.executeCommand(server, session, command); + return await super.executeCommand(server, session, command, timeoutContext); } } @@ -142,9 +147,10 @@ export class UpdateOneOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -176,9 +182,10 @@ export class UpdateManyOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -229,9 +236,10 @@ export class ReplaceOneOperation extends UpdateOperation { override async execute( server: Server, - session: ClientSession | undefined + session: ClientSession | undefined, + timeoutContext: TimeoutContext ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session); + const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); diff --git a/src/operations/validate_collection.ts b/src/operations/validate_collection.ts index 4880a703a7..16ae4cad9e 100644 --- a/src/operations/validate_collection.ts +++ b/src/operations/validate_collection.ts @@ -3,6 +3,7 @@ import type { Document } from '../bson'; import { MongoUnexpectedServerResponseError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { CommandOperation, type CommandOperationOptions } from './command'; /** @public */ @@ -37,10 +38,14 @@ export class ValidateCollectionOperation extends CommandOperation { return 'validate' as const; } - override async execute(server: Server, session: ClientSession | undefined): Promise { + override async execute( + server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { const collectionName = this.collectionName; - const doc = await super.executeCommand(server, session, this.command); + const doc = await super.executeCommand(server, session, this.command, timeoutContext); if (doc.result != null && typeof doc.result !== 'string') throw new MongoUnexpectedServerResponseError('Error with validation data'); if (doc.result != null && doc.result.match(/exception|corrupt/) != null) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index f7e40910d1..2dbe92039e 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -40,6 +40,7 @@ import type { ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; import type { GetMoreOptions } from '../operations/get_more'; import type { ClientSession } from '../sessions'; +import { type TimeoutContext } from '../timeout'; import { isTransactionCommand } from '../transactions'; import { type EventEmitterWithState, @@ -103,6 +104,11 @@ export type ServerEvents = { } & ConnectionPoolEvents & EventEmitterWithState; +/** @internal */ +export type ServerCommandOptions = Omit & { + timeoutContext: TimeoutContext; +}; + /** @internal */ export class Server extends TypedEventEmitter { /** @internal */ @@ -266,20 +272,20 @@ export class Server extends TypedEventEmitter { public async command( ns: MongoDBNamespace, command: Document, - options: CommandOptions | undefined, + options: ServerCommandOptions, responseType: T | undefined ): Promise>; public async command( ns: MongoDBNamespace, command: Document, - options?: CommandOptions + options: ServerCommandOptions ): Promise; public async command( ns: MongoDBNamespace, cmd: Document, - options: CommandOptions, + options: ServerCommandOptions, responseType?: MongoDBResponseConstructor ): Promise { if (ns.db == null || typeof ns === 'string') { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 4c9d71d807..6117b5317c 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -34,11 +34,10 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong import { TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; -import { Timeout, TimeoutError } from '../timeout'; +import { Timeout, TimeoutContext, TimeoutError } from '../timeout'; import type { Transaction } from '../transactions'; import { type Callback, - csotMin, type EventEmitterWithState, HostAddress, List, @@ -179,8 +178,11 @@ export interface SelectServerOptions { session?: ClientSession; operationName: string; previousServer?: ServerDescription; - /** @internal*/ - timeout?: Timeout; + /** + * @internal + * TODO(NODE-5685): Make this required + * */ + timeoutContext?: TimeoutContext; } /** @public */ @@ -458,13 +460,20 @@ export class Topology extends TypedEventEmitter { } } - const timeoutMS = this.client.options.timeoutMS; - const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined; + const timeoutMS = this.client.s.options.timeoutMS; + const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS; const readPreference = options.readPreference ?? ReadPreference.primary; + + const timeoutContext = TimeoutContext.create({ + timeoutMS, + serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS + }); + const selectServerOptions = { operationName: 'ping', - timeout, - ...options + ...options, + timeoutContext }; try { const server = await this.selectServer( @@ -474,7 +483,7 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout }); + await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext }); stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); @@ -563,24 +572,10 @@ export class Topology extends TypedEventEmitter { new ServerSelectionStartedEvent(selector, this.description, options.operationName) ); } - const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0; - let timeout: Timeout | null; - if (options.timeout) { - // CSOT Enabled - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - timeout = options.timeout; - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); - } - } else { - timeout = null; - } - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); + let timeout; + if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout; + else { + timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); } const isSharded = this.description.type === TopologyType.Sharded; @@ -604,7 +599,7 @@ export class Topology extends TypedEventEmitter { ) ); } - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); return transaction.server; } @@ -654,7 +649,7 @@ export class Topology extends TypedEventEmitter { ); } - if (options.timeout) { + if (options.timeoutContext?.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during server selection', { cause: timeoutError }); @@ -664,7 +659,7 @@ export class Topology extends TypedEventEmitter { // Other server selection error throw error; } finally { - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); } } /** diff --git a/src/timeout.ts b/src/timeout.ts index 02b87394e7..0fe333983f 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -1,7 +1,7 @@ import { clearTimeout, setTimeout } from 'timers'; -import { MongoInvalidArgumentError } from './error'; -import { noop } from './utils'; +import { MongoInvalidArgumentError, MongoRuntimeError } from './error'; +import { csotMin, noop } from './utils'; /** @internal */ export class TimeoutError extends Error { @@ -108,3 +108,165 @@ export class Timeout extends Promise { ); } } + +/** @internal */ +export type TimeoutContextOptions = LegacyTimeoutContextOptions | CSOTTimeoutContextOptions; + +/** @internal */ +export type LegacyTimeoutContextOptions = { + serverSelectionTimeoutMS: number; + waitQueueTimeoutMS: number; + socketTimeoutMS?: number; +}; + +/** @internal */ +export type CSOTTimeoutContextOptions = { + timeoutMS: number; + serverSelectionTimeoutMS: number; + socketTimeoutMS?: number; +}; + +function isLegacyTimeoutContextOptions(v: unknown): v is LegacyTimeoutContextOptions { + return ( + v != null && + typeof v === 'object' && + 'serverSelectionTimeoutMS' in v && + typeof v.serverSelectionTimeoutMS === 'number' && + 'waitQueueTimeoutMS' in v && + typeof v.waitQueueTimeoutMS === 'number' + ); +} + +function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions { + return ( + v != null && + typeof v === 'object' && + 'serverSelectionTimeoutMS' in v && + typeof v.serverSelectionTimeoutMS === 'number' && + 'timeoutMS' in v && + typeof v.timeoutMS === 'number' + ); +} + +/** @internal */ +export abstract class TimeoutContext { + static create(options: TimeoutContextOptions): TimeoutContext { + if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); + else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); + else throw new MongoRuntimeError('Unrecognized options'); + } + + abstract get serverSelectionTimeout(): Timeout | null; + + abstract get connectionCheckoutTimeout(): Timeout | null; + + abstract get clearServerSelectionTimeout(): boolean; + + abstract get clearConnectionCheckoutTimeout(): boolean; + + abstract csotEnabled(): this is CSOTTimeoutContext; +} + +/** @internal */ +export class CSOTTimeoutContext extends TimeoutContext { + timeoutMS: number; + serverSelectionTimeoutMS: number; + socketTimeoutMS?: number; + + clearConnectionCheckoutTimeout: boolean; + clearServerSelectionTimeout: boolean; + + private _maxTimeMS?: number; + + private _serverSelectionTimeout?: Timeout | null; + private _connectionCheckoutTimeout?: Timeout | null; + + constructor(options: CSOTTimeoutContextOptions) { + super(); + this.timeoutMS = options.timeoutMS; + + this.serverSelectionTimeoutMS = options.serverSelectionTimeoutMS; + + this.socketTimeoutMS = options.socketTimeoutMS; + + this.clearServerSelectionTimeout = false; + this.clearConnectionCheckoutTimeout = true; + } + + get maxTimeMS(): number { + return this._maxTimeMS ?? -1; + } + + set maxTimeMS(v: number) { + this._maxTimeMS = v; + } + + csotEnabled(): this is CSOTTimeoutContext { + return true; + } + + get serverSelectionTimeout(): Timeout | null { + // check for undefined + if (typeof this._serverSelectionTimeout !== 'object') { + const usingServerSelectionTimeoutMS = + this.serverSelectionTimeoutMS !== 0 && + csotMin(this.timeoutMS, this.serverSelectionTimeoutMS) === this.serverSelectionTimeoutMS; + + if (usingServerSelectionTimeoutMS) { + this._serverSelectionTimeout = Timeout.expires(this.serverSelectionTimeoutMS); + } else { + if (this.timeoutMS > 0) { + this._serverSelectionTimeout = Timeout.expires(this.timeoutMS); + } else { + this._serverSelectionTimeout = null; + } + } + } + + return this._serverSelectionTimeout; + } + + get connectionCheckoutTimeout(): Timeout | null { + if (typeof this._connectionCheckoutTimeout !== 'object') { + if (typeof this._serverSelectionTimeout === 'object') { + // null or Timeout + this._connectionCheckoutTimeout = this._serverSelectionTimeout; + } else { + throw new MongoRuntimeError( + 'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira' + ); + } + } + return this._connectionCheckoutTimeout; + } +} + +/** @internal */ +export class LegacyTimeoutContext extends TimeoutContext { + options: LegacyTimeoutContextOptions; + clearServerSelectionTimeout: boolean; + clearConnectionCheckoutTimeout: boolean; + + constructor(options: LegacyTimeoutContextOptions) { + super(); + this.options = options; + this.clearServerSelectionTimeout = true; + this.clearConnectionCheckoutTimeout = true; + } + + csotEnabled(): this is CSOTTimeoutContext { + return false; + } + + get serverSelectionTimeout(): Timeout | null { + if (this.options.serverSelectionTimeoutMS != null && this.options.serverSelectionTimeoutMS > 0) + return Timeout.expires(this.options.serverSelectionTimeoutMS); + return null; + } + + get connectionCheckoutTimeout(): Timeout | null { + if (this.options.waitQueueTimeoutMS != null && this.options.waitQueueTimeoutMS > 0) + return Timeout.expires(this.options.waitQueueTimeoutMS); + return null; + } +} diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts index c1426d8db1..c4989f58d7 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts @@ -33,16 +33,20 @@ describe('CSOT spec unit tests', function () { client = this.configuration.newClient({ timeoutMS: 1000 }); // Spy on connection checkout and pull options argument const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut'); - const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer'); const expiresSpy = sinon.spy(Timeout, 'expires'); await client.db('db').collection('collection').insertOne({ x: 1 }); expect(checkoutSpy).to.have.been.calledOnce; - expect(checkoutSpy.firstCall.args[0].timeout).to.exist; + const timeoutContext = checkoutSpy.lastCall.args[0].timeoutContext; + expect(timeoutContext).to.exist; // Check that we passed through the timeout - expect(checkoutSpy.firstCall.args[0].timeout).to.equal( - selectServerSpy.lastCall.lastArg.timeout + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.be.instanceOf(Timeout); + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.equal( + // @ts-expect-error accessing private properties + timeoutContext._connectionCheckoutTimeout ); // Check that no more Timeouts are constructed after we enter checkout diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 5636eb00db..17d85ba5b2 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -143,7 +143,7 @@ describe('CSOT driver tests', () => { }); it('throws a MongoOperationTimeoutError', { - metadata: { requires: { mongodb: '>=4.4' } }, + metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } }, test: async function () { const commandsStarted = []; client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index f6d7e68bed..9bb2abdb87 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -12,7 +12,8 @@ import { makeClientMetadata, type MongoClient, type Server, - shuffle + shuffle, + TimeoutContext } from '../mongodb'; import { isAnyRequirementSatisfied } from './unified-spec-runner/unified-utils'; import { type FailPoint, sleep } from './utils'; @@ -185,7 +186,14 @@ const compareInputToSpec = (input, expected, message) => { const getTestOpDefinitions = (threadContext: ThreadContext) => ({ checkOut: async function (op) { - const connection: Connection = await ConnectionPool.prototype.checkOut.call(threadContext.pool); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: threadContext.pool.options.waitQueueTimeoutMS + }); + const connection: Connection = await ConnectionPool.prototype.checkOut.call( + threadContext.pool, + { timeoutContext } + ); if (op.label != null) { threadContext.connections.set(op.label, connection); } else { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 18048befab..1604cd82d8 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -10,8 +10,10 @@ const { ns, isHello } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); const { topologyWithPlaceholderClient } = require('../../tools/utils'); const { MongoClientAuthProviders } = require('../../mongodb'); +const { TimeoutContext } = require('../../mongodb'); describe('Connection Pool', function () { + let timeoutContext; let mockMongod; const stubServer = { topology: { @@ -44,6 +46,10 @@ describe('Connection Pool', function () { }) ); + beforeEach(() => { + timeoutContext = TimeoutContext.create({ waitQueueTimeoutMS: 0, serverSelectionTimeoutMS: 0 }); + }); + it('should destroy connections which have been closed', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; @@ -64,8 +70,10 @@ describe('Connection Pool', function () { const events = []; pool.on('connectionClosed', event => events.push(event)); - const conn = await pool.checkOut(); - const error = await conn.command(ns('admin.$cmd'), { ping: 1 }, {}).catch(error => error); + const conn = await pool.checkOut({ timeoutContext }); + const error = await conn + .command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext }) + .catch(error => error); expect(error).to.be.instanceOf(Error); pool.checkIn(conn); @@ -93,7 +101,7 @@ describe('Connection Pool', function () { pool.ready(); - const conn = await pool.checkOut(); + const conn = await pool.checkOut({ timeoutContext }); const maybeError = await conn.command(ns('admin.$cmd'), { ping: 1 }, undefined).catch(e => e); expect(maybeError).to.be.instanceOf(MongoError); expect(maybeError).to.match(/timed out/); @@ -114,11 +122,15 @@ describe('Connection Pool', function () { waitQueueTimeoutMS: 200, hostAddress: mockMongod.hostAddress() }); + const timeoutContext = TimeoutContext.create({ + waitQueueTimeoutMS: 200, + serverSelectionTimeoutMS: 0 + }); pool.ready(); - const conn = await pool.checkOut(); - const err = await pool.checkOut().catch(e => e); + const conn = await pool.checkOut({ timeoutContext }); + const err = await pool.checkOut({ timeoutContext }).catch(e => e); expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 298e1f2191..d96590f841 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -28,6 +28,7 @@ import { ns, PoolClosedError as MongoPoolClosedError, setDifference, + TimeoutContext, type TopologyDescription, type TopologyOptions, WaitQueueTimeoutError as MongoWaitQueueTimeoutError @@ -381,11 +382,17 @@ describe('MongoErrors', () => { { replicaSet: 'rs' } as TopologyOptions ); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); return replSet .connect() - .then(topology => topology.selectServer('primary', {})) + .then(topology => topology.selectServer('primary', { timeoutContext })) .then(server => - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), { + timeoutContext + }) ) .then( () => expect.fail('expected command to fail'), @@ -424,10 +431,14 @@ describe('MongoErrors', () => { if (err) { return cleanup(err); } + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); - topology.selectServer('primary', {}).then(server => { + topology.selectServer('primary', { timeoutContext }).then(server => { server - .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), { timeoutContext }) .then(expect.fail, err => { let _err; try { diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index f79da44e22..17bc20f6fa 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -69,7 +69,7 @@ describe('GetMoreOperation', function () { const call = stub.getCall(0); expect(call.args[0]).to.equal(namespace); expect(call.args[1]).to.deep.equal(expectedGetMoreCommand); - expect(call.args[2]).to.deep.equal(opts); + expect(call.args[2]).to.containSubset(opts); }); }); diff --git a/test/unit/sdam/topology.test.ts b/test/unit/sdam/topology.test.ts index e4a34417d5..5264b5d9c4 100644 --- a/test/unit/sdam/topology.test.ts +++ b/test/unit/sdam/topology.test.ts @@ -17,6 +17,7 @@ import { Server, SrvPoller, SrvPollingEvent, + TimeoutContext, Topology, TopologyDescription, TopologyDescriptionChangedEvent, @@ -108,17 +109,28 @@ describe('Topology (unit)', function () { const topology = topologyWithPlaceholderClient(mockServer.hostAddress(), {}); topology.connect().then(() => { - topology.selectServer('primary', {}).then(server => { - server.command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }).then( - () => expect.fail('expected command to fail'), - err => { - expect(err).to.exist; - expect(err).to.match(/timed out/); - topology.close(); - done(); - } - ); - }, expect.fail); + const ctx = TimeoutContext.create({ + waitQueueTimeoutMS: 0, + serverSelectionTimeoutMS: 0, + socketTimeoutMS: 250 + }); + topology + .selectServer('primary', { + timeoutContext: ctx + }) + .then(server => { + server + .command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250, timeoutContext: ctx }) + .then( + () => expect.fail('expected command to fail'), + err => { + expect(err).to.exist; + expect(err).to.match(/timed out/); + topology.close(); + done(); + } + ); + }, expect.fail); }, expect.fail); }); }); @@ -217,10 +229,16 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; }); @@ -245,11 +263,17 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); + const timeoutContext = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; topology.close(); @@ -269,14 +293,20 @@ describe('Topology (unit)', function () { topology = topologyWithPlaceholderClient(mockServer.hostAddress(), {}); await topology.connect(); + const timeoutContext = TimeoutContext.create({ + waitQueueTimeoutMS: 0, + serverSelectionTimeoutMS: 0 + }); const server = await topology.selectServer('primary', {}); let serverDescription; server.on('descriptionReceived', sd => (serverDescription = sd)); - const err = await server.command(ns('test.test'), { insert: { a: 42 } }, {}).then( - () => null, - e => e - ); + const err = await server + .command(ns('test.test'), { insert: { a: 42 } }, { timeoutContext }) + .then( + () => null, + e => e + ); expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown'); }); diff --git a/test/unit/timeout.test.ts b/test/unit/timeout.test.ts index 3fafc21b35..119d0516a9 100644 --- a/test/unit/timeout.test.ts +++ b/test/unit/timeout.test.ts @@ -1,6 +1,14 @@ import { expect } from 'chai'; -import { MongoInvalidArgumentError, Timeout, TimeoutError } from '../mongodb'; +import { + CSOTTimeoutContext, + LegacyTimeoutContext, + MongoInvalidArgumentError, + MongoRuntimeError, + Timeout, + TimeoutContext, + TimeoutError +} from '../mongodb'; describe('Timeout', function () { let timeout: Timeout; @@ -115,3 +123,197 @@ describe('Timeout', function () { }); }); }); + +describe('TimeoutContext', function () { + describe('TimeoutContext.create', function () { + context('when timeoutMS is a number', function () { + it('returns a CSOTTimeoutContext instance', function () { + const ctx = TimeoutContext.create({ + timeoutMS: 0, + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + + expect(ctx).to.be.instanceOf(CSOTTimeoutContext); + }); + }); + + context('when timeoutMS is undefined', function () { + it('returns a LegacyTimeoutContext instance', function () { + const ctx = TimeoutContext.create({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 0 + }); + + expect(ctx).to.be.instanceOf(LegacyTimeoutContext); + }); + }); + }); + + describe('CSOTTimeoutContext', function () { + let ctx: CSOTTimeoutContext; + + describe('get serverSelectionTimeout()', function () { + let timeout: Timeout | null; + + afterEach(() => { + timeout?.clear(); + }); + + context('when timeoutMS is 0 and serverSelectionTimeoutMS is 0', function () { + it('returns null', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 0, + serverSelectionTimeoutMS: 0 + }); + + expect(ctx.serverSelectionTimeout).to.be.null; + }); + }); + + context('when timeoutMS is 0 and serverSelectionTimeoutMS is >0', function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 0, + serverSelectionTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.instanceOf(Timeout); + + expect(timeout.duration).to.equal(ctx.serverSelectionTimeoutMS); + }); + }); + + context( + 'when timeoutMS is >0 serverSelectionTimeoutMS is >0 and timeoutMS > serverSelectionTimeoutMS', + function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 15, + serverSelectionTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.exist; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.serverSelectionTimeoutMS); + }); + } + ); + + context( + 'when timeoutMS is >0, serverSelectionTimeoutMS is >0 and timeoutMS < serverSelectionTimeoutMS', + function () { + it('returns a Timeout instance with duration set to timeoutMS', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 10, + serverSelectionTimeoutMS: 15 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.exist; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.timeoutMS); + }); + } + ); + }); + + describe('get connectionCheckoutTimeout()', function () { + context('when called before get serverSelectionTimeout()', function () { + it('throws a MongoRuntimeError', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 100, + serverSelectionTimeoutMS: 15 + }); + + expect(() => ctx.connectionCheckoutTimeout).to.throw(MongoRuntimeError); + }); + }); + + context('when called after get serverSelectionTimeout()', function () { + let serverSelectionTimeout: Timeout; + let connectionCheckoutTimeout: Timeout; + + afterEach(() => { + serverSelectionTimeout.clear(); + connectionCheckoutTimeout.clear(); + }); + + it('returns same timeout as serverSelectionTimeout', function () { + ctx = new CSOTTimeoutContext({ + timeoutMS: 100, + serverSelectionTimeoutMS: 86 + }); + serverSelectionTimeout = ctx.serverSelectionTimeout; + connectionCheckoutTimeout = ctx.connectionCheckoutTimeout; + + expect(connectionCheckoutTimeout).to.exist; + expect(connectionCheckoutTimeout).to.equal(serverSelectionTimeout); + }); + }); + }); + }); + + describe('LegacyTimeoutContext', function () { + let timeout: Timeout | null; + + afterEach(() => { + timeout?.clear(); + }); + + describe('get serverSelectionTimeout()', function () { + context('when serverSelectionTimeoutMS > 0', function () { + it('returns a Timeout instance with duration set to serverSelectionTimeoutMS', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 100, + waitQueueTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.options.serverSelectionTimeoutMS); + }); + }); + + context('when serverSelectionTimeoutMS = 0', function () { + it('returns null', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 0, + waitQueueTimeoutMS: 10 + }); + + timeout = ctx.serverSelectionTimeout; + expect(timeout).to.be.null; + }); + }); + }); + + describe('get connectionCheckoutTimeout()', function () { + context('when waitQueueTimeoutMS > 0', function () { + it('returns a Timeout instance with duration set to waitQueueTimeoutMS', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 10, + waitQueueTimeoutMS: 20 + }); + timeout = ctx.connectionCheckoutTimeout; + + expect(timeout).to.be.instanceOf(Timeout); + expect(timeout.duration).to.equal(ctx.options.waitQueueTimeoutMS); + }); + }); + + context('when waitQueueTimeoutMS = 0', function () { + it('returns null', function () { + const ctx = new LegacyTimeoutContext({ + serverSelectionTimeoutMS: 10, + waitQueueTimeoutMS: 0 + }); + + expect(ctx.connectionCheckoutTimeout).to.be.null; + }); + }); + }); + }); +});