Skip to content

Commit

Permalink
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored and nbbeeken committed Jul 11, 2024
1 parent 4d2a0c8 commit 6f7dc0c
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 126 deletions.
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import { type TimeoutContext } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
directConnection?: boolean;

/** @internal */
timeout?: Timeout;
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down
38 changes: 6 additions & 32 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { type TimeoutContext, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}
const timeout = options.timeoutContext.connectionCheckoutTimeout;

const waitQueueMember: WaitQueueMember = {
resolve,
Expand Down Expand Up @@ -416,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
if (options.timeoutContext.csotEnabled()) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
Expand All @@ -425,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
timeoutContext,
readPreference: this.readPreference,
session
};
Expand Down
99 changes: 48 additions & 51 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

Expand Down Expand Up @@ -88,6 +88,12 @@ export async function executeOperation<
);
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand Down Expand Up @@ -200,18 +206,31 @@ async function tryOperation<
selector = readPreference;
}

const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
operation.timeout = timeout;

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
timeout
timeoutContext
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
const inTransaction = session?.inTransaction() ?? false;
if (session == null) {
// No session also means it is not retryable, early exit
return await operation.execute(server, undefined, timeoutContext);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.execute(server, session, timeoutContext);
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}

const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;

Expand All @@ -231,42 +250,16 @@ async function tryOperation<
session.incrementTransactionNumber();
}

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
const maxTries = willRetry ? 2 : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
});
}

if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;

if (hasReadAspect && !isRetryableReadError(previousOperationError))
throw previousOperationError;

if (
previousOperationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
!session.inTransaction()
) {
session.unpin({ force: true, forceClear: true });
}

server = await topology.selectServer(selector, {
try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
session,
operationName: operation.commandName,
previousServer
topology,
selector,
previousServer: server.description,
timeoutContext
});

if (hasWriteAspect && !supportsRetryableWrites(server)) {
Expand All @@ -276,18 +269,22 @@ async function tryOperation<
}
}

try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
/** @internal */
type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
timeoutContext: TimeoutContext;
};

async function retryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
originalError: MongoError,
{ session, topology, selector, previousServer }: RetryOptions
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down Expand Up @@ -323,9 +320,9 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.timeout,
operationName: operation.commandName,
previousServer
previousServer,
timeoutContext
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand All @@ -335,7 +332,7 @@ async function retryOperation<
}

try {
return await operation.execute(server, session);
return await operation.execute(server, session, timeoutContext);
} catch (retryError) {
if (
retryError instanceof MongoError &&
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session,
timeout: this.timeout
timeoutContext
},
this.explain ? ExplainedCursorResponse : CursorResponse
);
Expand Down
4 changes: 1 addition & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import { type Timeout, type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -82,8 +82,6 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
4 changes: 2 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
...this.options,
readPreference: this.readPreference,
session,
timeout: this.timeout
timeoutContext
},
this.options.responseType
);
Expand Down Expand Up @@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
...this.options,
readPreference: this.readPreference,
session,
timeout: this.timeout
timeoutContext
});
return res;
}
Expand Down
Loading

0 comments on commit 6f7dc0c

Please sign in to comment.