diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 159dd1946a..9d8ff6921d 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type TimeoutContext } from '../timeout'; import { BufferPool, calculateDurationInMs, @@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions { directConnection?: boolean; /** @internal */ - timeout?: Timeout; + timeoutContext?: TimeoutContext; } /** @public */ diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 482bf571a5..25591ca1d9 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -27,8 +27,8 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { Timeout, TimeoutError } from '../timeout'; -import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils'; +import { type TimeoutContext, TimeoutError } from '../timeout'; +import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - async checkOut(options?: { timeout?: Timeout }): Promise { + async checkOut(options: { timeoutContext: TimeoutContext }): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) ); - const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; - const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS; - const { promise, resolve, reject } = promiseWithResolvers(); - let timeout: Timeout | null = null; - if (options?.timeout) { - // CSOT enabled - // Determine if we're using the timeout passed in or a new timeout - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - // This check determines whether or not Topology.selectServer used the configured - // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - // server selection used `timeoutMS`, so we should use the existing timeout as the timeout - // here - timeout = options.timeout; - } else { - // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with - // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut - // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking - timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed); - } - } - } else { - timeout = Timeout.expires(waitQueueTimeoutMS); - } + const timeout = options.timeoutContext.connectionCheckoutTimeout; const waitQueueMember: WaitQueueMember = { resolve, @@ -416,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter { : 'Timed out while checking out a connection from connection pool', this.address ); - if (options?.timeout) { + if (options.timeoutContext.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during connection checkout', { cause: timeoutError }); @@ -425,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter { } throw error; } finally { - if (timeout !== options?.timeout) timeout?.clear(); + if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear(); } } diff --git a/src/operations/command.ts b/src/operations/command.ts index 27d1c04ff8..5bd80f796d 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -136,7 +136,7 @@ export abstract class CommandOperation extends AbstractOperation { const options = { ...this.options, ...this.bsonOptions, - timeout: this.timeout, + timeoutContext, readPreference: this.readPreference, session }; diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 44c086b890..aae73b2d28 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -24,7 +24,7 @@ import { } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; -import { Timeout } from '../timeout'; +import { TimeoutContext } from '../timeout'; import { squashError, supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; @@ -88,6 +88,12 @@ export async function executeOperation< ); } + timeoutContext ??= TimeoutContext.create({ + serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, + timeoutMS: operation.options.timeoutMS + }); + const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -200,18 +206,31 @@ async function tryOperation< selector = readPreference; } - const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined; - operation.timeout = timeout; - const server = await topology.selectServer(selector, { session, operationName: operation.commandName, - timeout + timeoutContext }); - const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION); - const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION); - const inTransaction = session?.inTransaction() ?? false; + if (session == null) { + // No session also means it is not retryable, early exit + return await operation.execute(server, undefined, timeoutContext); + } + + if (!operation.hasAspect(Aspect.RETRYABLE)) { + // non-retryable operation, early exit + try { + return await operation.execute(server, session, timeoutContext); + } finally { + if (session?.owner != null && session.owner === owner) { + try { + await session.endSession(); + } catch (error) { + squashError(error); + } + } + } + } const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead; @@ -231,42 +250,16 @@ async function tryOperation< session.incrementTransactionNumber(); } - // TODO(NODE-6231): implement infinite retry within CSOT timeout here - const maxTries = willRetry ? 2 : 1; - let previousOperationError: MongoError | undefined; - let previousServer: ServerDescription | undefined; - - // TODO(NODE-6231): implement infinite retry within CSOT timeout here - for (let tries = 0; tries < maxTries; tries++) { - if (previousOperationError) { - if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { - throw new MongoServerError({ - message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, - errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, - originalError: previousOperationError - }); - } - - if (hasWriteAspect && !isRetryableWriteError(previousOperationError)) - throw previousOperationError; - - if (hasReadAspect && !isRetryableReadError(previousOperationError)) - throw previousOperationError; - - if ( - previousOperationError instanceof MongoNetworkError && - operation.hasAspect(Aspect.CURSOR_CREATING) && - session != null && - session.isPinned && - !session.inTransaction() - ) { - session.unpin({ force: true, forceClear: true }); - } - - server = await topology.selectServer(selector, { + try { + return await operation.execute(server, session, timeoutContext); + } catch (operationError) { + if (willRetry && operationError instanceof MongoError) { + return await retryOperation(operation, operationError, { session, - operationName: operation.commandName, - previousServer + topology, + selector, + previousServer: server.description, + timeoutContext }); if (hasWriteAspect && !supportsRetryableWrites(server)) { @@ -276,10 +269,14 @@ async function tryOperation< } } - try { - return await operation.execute(server, session, timeoutContext); - } catch (operationError) { - if (!(operationError instanceof MongoError)) throw operationError; +/** @internal */ +type RetryOptions = { + session: ClientSession; + topology: Topology; + selector: ReadPreference | ServerSelector; + previousServer: ServerDescription; + timeoutContext: TimeoutContext; +}; async function retryOperation< T extends AbstractOperation, @@ -287,7 +284,7 @@ async function retryOperation< >( operation: T, originalError: MongoError, - { session, topology, selector, previousServer }: RetryOptions + { session, topology, selector, previousServer, timeoutContext }: RetryOptions ): Promise { const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); @@ -323,9 +320,9 @@ async function retryOperation< // select a new server, and attempt to retry the operation const server = await topology.selectServer(selector, { session, - timeout: operation.timeout, operationName: operation.commandName, - previousServer + previousServer, + timeoutContext }); if (isWriteOperation && !supportsRetryableWrites(server)) { @@ -335,7 +332,7 @@ async function retryOperation< } try { - return await operation.execute(server, session); + return await operation.execute(server, session, timeoutContext); } catch (retryError) { if ( retryError instanceof MongoError && diff --git a/src/operations/find.ts b/src/operations/find.ts index 1e90d199b1..5f359324d5 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation { ...this.bsonOptions, documentsReturnedIn: 'firstBatch', session, - timeout: this.timeout + timeoutContext }, this.explain ? ExplainedCursorResponse : CursorResponse ); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 081f9fac39..e56b2173d6 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type Timeout } from '../timeout'; +import { type Timeout, type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; export const Aspect = { @@ -82,8 +82,6 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; this.trySecondaryWrite = false; - - this.timeoutMS = options.timeoutMS; } /** Must match the first key of the command object sent to the server. diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index 1cc4d684ee..b91e2d0344 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -47,7 +47,7 @@ export class RunCommandOperation extends AbstractOperation { ...this.options, readPreference: this.readPreference, session, - timeout: this.timeout + timeoutContext }, this.options.responseType ); @@ -82,7 +82,7 @@ export class RunAdminCommandOperation extends AbstractOperation ...this.options, readPreference: this.readPreference, session, - timeout: this.timeout + timeoutContext }); return res; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 942aa5528c..6117b5317c 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -38,7 +38,6 @@ import { Timeout, TimeoutContext, TimeoutError } from '../timeout'; import type { Transaction } from '../transactions'; import { type Callback, - csotMin, type EventEmitterWithState, HostAddress, List, @@ -461,13 +460,20 @@ export class Topology extends TypedEventEmitter { } } - const timeoutMS = this.client.options.timeoutMS; - const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined; + const timeoutMS = this.client.s.options.timeoutMS; + const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS; const readPreference = options.readPreference ?? ReadPreference.primary; + + const timeoutContext = TimeoutContext.create({ + timeoutMS, + serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS + }); + const selectServerOptions = { operationName: 'ping', - timeout, - ...options + ...options, + timeoutContext }; try { const server = await this.selectServer( @@ -477,7 +483,7 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout }); + await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext }); stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); @@ -566,24 +572,10 @@ export class Topology extends TypedEventEmitter { new ServerSelectionStartedEvent(selector, this.description, options.operationName) ); } - const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0; - let timeout: Timeout | null; - if (options.timeout) { - // CSOT Enabled - if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { - if ( - options.timeout.duration === serverSelectionTimeoutMS || - csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS - ) { - timeout = options.timeout; - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); - } - } else { - timeout = null; - } - } else { - timeout = Timeout.expires(serverSelectionTimeoutMS); + let timeout; + if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout; + else { + timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); } const isSharded = this.description.type === TopologyType.Sharded; @@ -607,7 +599,7 @@ export class Topology extends TypedEventEmitter { ) ); } - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); return transaction.server; } @@ -657,7 +649,7 @@ export class Topology extends TypedEventEmitter { ); } - if (options.timeout) { + if (options.timeoutContext?.csotEnabled()) { throw new MongoOperationTimeoutError('Timed out during server selection', { cause: timeoutError }); @@ -667,7 +659,7 @@ export class Topology extends TypedEventEmitter { // Other server selection error throw error; } finally { - if (timeout !== options.timeout) timeout?.clear(); + if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear(); } } /** diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts index c1426d8db1..c4989f58d7 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts @@ -33,16 +33,20 @@ describe('CSOT spec unit tests', function () { client = this.configuration.newClient({ timeoutMS: 1000 }); // Spy on connection checkout and pull options argument const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut'); - const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer'); const expiresSpy = sinon.spy(Timeout, 'expires'); await client.db('db').collection('collection').insertOne({ x: 1 }); expect(checkoutSpy).to.have.been.calledOnce; - expect(checkoutSpy.firstCall.args[0].timeout).to.exist; + const timeoutContext = checkoutSpy.lastCall.args[0].timeoutContext; + expect(timeoutContext).to.exist; // Check that we passed through the timeout - expect(checkoutSpy.firstCall.args[0].timeout).to.equal( - selectServerSpy.lastCall.lastArg.timeout + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.be.instanceOf(Timeout); + // @ts-expect-error accessing private properties + expect(timeoutContext._serverSelectionTimeout).to.equal( + // @ts-expect-error accessing private properties + timeoutContext._connectionCheckoutTimeout ); // Check that no more Timeouts are constructed after we enter checkout diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 5636eb00db..17d85ba5b2 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -143,7 +143,7 @@ describe('CSOT driver tests', () => { }); it('throws a MongoOperationTimeoutError', { - metadata: { requires: { mongodb: '>=4.4' } }, + metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } }, test: async function () { const commandsStarted = []; client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true }); diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 70cee74c2b..1604cd82d8 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -129,8 +129,8 @@ describe('Connection Pool', function () { pool.ready(); - const conn = await pool.checkOut(); - const err = await pool.checkOut().catch(e => e); + const conn = await pool.checkOut({ timeoutContext }); + const err = await pool.checkOut({ timeoutContext }).catch(e => e); expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn);