Skip to content

Commit

Permalink
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored and nbbeeken committed Jul 22, 2024
1 parent 3f4313e commit 4aa6575
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 123 deletions.
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import { type TimeoutContext } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
directConnection?: boolean;

/** @internal */
timeout?: Timeout;
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down
32 changes: 3 additions & 29 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,35 +362,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}
const timeout = options.timeoutContext.connectionCheckoutTimeout;

const waitQueueMember: WaitQueueMember = {
resolve,
Expand Down Expand Up @@ -419,7 +393,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
if (options.timeoutContext.csotEnabled()) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
Expand All @@ -428,7 +402,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
timeoutContext,
readPreference: this.readPreference,
session
};
Expand Down
99 changes: 48 additions & 51 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

Expand Down Expand Up @@ -88,6 +88,12 @@ export async function executeOperation<
);
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand Down Expand Up @@ -200,18 +206,31 @@ async function tryOperation<
selector = readPreference;
}

const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
operation.timeout = timeout;

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
timeout
timeoutContext
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
const inTransaction = session?.inTransaction() ?? false;
if (session == null) {
// No session also means it is not retryable, early exit
return await operation.execute(server, undefined, timeoutContext);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.execute(server, session, timeoutContext);
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}

const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;

Expand All @@ -231,42 +250,16 @@ async function tryOperation<
session.incrementTransactionNumber();
}

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
const maxTries = willRetry ? 2 : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
});
}

if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;

if (hasReadAspect && !isRetryableReadError(previousOperationError))
throw previousOperationError;

if (
previousOperationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
!session.inTransaction()
) {
session.unpin({ force: true, forceClear: true });
}

server = await topology.selectServer(selector, {
try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
session,
operationName: operation.commandName,
previousServer
topology,
selector,
previousServer: server.description,
timeoutContext
});

if (hasWriteAspect && !supportsRetryableWrites(server)) {
Expand All @@ -276,18 +269,22 @@ async function tryOperation<
}
}

try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
/** @internal */
type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
timeoutContext: TimeoutContext;
};

async function retryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
originalError: MongoError,
{ session, topology, selector, previousServer }: RetryOptions
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down Expand Up @@ -323,9 +320,9 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.timeout,
operationName: operation.commandName,
previousServer
previousServer,
timeoutContext
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand All @@ -335,7 +332,7 @@ async function retryOperation<
}

try {
return await operation.execute(server, session);
return await operation.execute(server, session, timeoutContext);
} catch (retryError) {
if (
retryError instanceof MongoError &&
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session,
timeout: this.timeout
timeoutContext
},
this.explain ? ExplainedCursorResponse : CursorResponse
);
Expand Down
4 changes: 1 addition & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import { type Timeout, type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -82,8 +82,6 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
4 changes: 2 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
...this.options,
readPreference: this.readPreference,
session,
timeout: this.timeout
timeoutContext
},
this.options.responseType
);
Expand Down Expand Up @@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
...this.options,
readPreference: this.readPreference,
session,
timeout: this.timeout
timeoutContext
});
return res;
}
Expand Down
46 changes: 19 additions & 27 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import type { Transaction } from '../transactions';
import {
type Callback,
csotMin,
type EventEmitterWithState,
HostAddress,
List,
Expand Down Expand Up @@ -461,13 +460,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

const timeoutMS = this.client.options.timeoutMS;
const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined;
const timeoutMS = this.client.s.options.timeoutMS;
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
const readPreference = options.readPreference ?? ReadPreference.primary;

const timeoutContext = TimeoutContext.create({
timeoutMS,
serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
});

const selectServerOptions = {
operationName: 'ping',
timeout,
...options
...options,
timeoutContext
};
try {
const server = await this.selectServer(
Expand All @@ -477,7 +483,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
if (!skipPingOnConnect && server && this.s.credentials) {
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout });
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
stateTransition(this, STATE_CONNECTED);
this.emit(Topology.OPEN, this);
this.emit(Topology.CONNECT, this);
Expand Down Expand Up @@ -566,24 +572,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
new ServerSelectionStartedEvent(selector, this.description, options.operationName)
);
}
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0;
let timeout: Timeout | null;
if (options.timeout) {
// CSOT Enabled
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
timeout = options.timeout;
} else {
timeout = Timeout.expires(serverSelectionTimeoutMS);
}
} else {
timeout = null;
}
} else {
timeout = Timeout.expires(serverSelectionTimeoutMS);
let timeout;
if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout;
else {
timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
}

const isSharded = this.description.type === TopologyType.Sharded;
Expand All @@ -607,7 +599,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
)
);
}
if (timeout !== options.timeout) timeout?.clear();
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
return transaction.server;
}

Expand Down Expand Up @@ -657,7 +649,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
);
}

if (options.timeout) {
if (options.timeoutContext?.csotEnabled()) {
throw new MongoOperationTimeoutError('Timed out during server selection', {
cause: timeoutError
});
Expand All @@ -667,7 +659,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
// Other server selection error
throw error;
} finally {
if (timeout !== options.timeout) timeout?.clear();
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
}
}
/**
Expand Down
Loading

0 comments on commit 4aa6575

Please sign in to comment.