Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6187): refactor to use TimeoutContext abstraction #4131

Merged
merged 23 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import { type TimeoutContext } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -95,7 +95,7 @@ export interface CommandOptions extends BSONSerializeOptions {
directConnection?: boolean;

/** @internal */
timeout?: Timeout;
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down
41 changes: 9 additions & 32 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { Timeout, type TimeoutContext, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -354,41 +354,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
async checkOut(options?: { timeoutContext?: TimeoutContext }): Promise<Connection> {
this.emitAndLog(
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}
let timeout;
if (options?.timeoutContext) timeout = options.timeoutContext.connectionCheckoutTimeout;
else timeout = Timeout.expires(this.options.waitQueueTimeoutMS);

const waitQueueMember: WaitQueueMember = {
resolve,
Expand All @@ -403,6 +379,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
timeout?.clear();
waitQueueMember[kCancelled] = true;

this.emitAndLog(
Expand All @@ -415,7 +392,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
if (options?.timeoutContext?.csotEnabled()) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
Expand All @@ -424,7 +401,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
if (options?.timeoutContext?.clearConnectionCheckoutTimeout) timeout?.clear();
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,13 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type {
CSOTTimeoutContext,
LegacyTimeoutContext,
Timeout,
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
timeoutContext: this.timeoutContext,
readPreference: this.readPreference,
session
};
Expand Down
19 changes: 12 additions & 7 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

Expand Down Expand Up @@ -118,6 +118,14 @@ export async function executeOperation<
);
}

const timeoutContext = TimeoutContext.create({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity - why are we instantiating a timeout here and not in resolveOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was that we construct exactly one TimeoutContext per operation with a lifetime equal to that of the operation. The first convergent place we have access to both the client and operation options would be in executeOperation, but the absolute first place is in each of our operation helper methods (insertOne, updateOne, etc).

I am open to updating our AbstractOperation constructor to accept the client options we'd need to construct the TimeoutContext and updating these methods since that would be cleaner and make clear that each operation "owns" its TimeoutContext.

Wouldn't constructing the TimeoutContext in resolveOptions lead to multiple TimeoutContext objects being constructed that likely wouldn't be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to updating our AbstractOperation constructor to accept the client options we'd need to construct the TimeoutContext and updating these methods since that would be cleaner and make clear that each operation "owns" its TimeoutContext.

Yeah, you're right that resolveOptions would lead to instantiation of timeouts we don't need, specifically when we construct dbs and collections. I'm not sure an operation owning a timeout context is the right choice either - I think that cursors / change streams will need to "own" their timeout context and they'll perform multiple operations inside the same context. Maybe we can leave it as-is for now and reconsider once we've implemented CSOT for cursors.

serverSelectionTimeoutMS: client.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

operation.timeoutContext = timeoutContext;

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand Down Expand Up @@ -153,13 +161,10 @@ export async function executeOperation<
selector = readPreference;
}

const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
operation.timeout = timeout;

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
timeout
timeoutContext: operation.timeoutContext
});

if (session == null) {
Expand Down Expand Up @@ -270,9 +275,9 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.timeout,
operationName: operation.commandName,
previousServer
previousServer,
timeoutContext: operation.timeoutContext
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class FindOperation extends CommandOperation<Document> {
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session,
timeout: this.timeout
timeoutContext: this.timeoutContext
},
undefined
);
Expand Down
1 change: 1 addition & 0 deletions src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class GetMoreOperation extends AbstractOperation {
const commandOptions = {
returnFieldSelector: null,
documentsReturnedIn: 'nextBatch',
timeoutContext: this.timeoutContext,
...this.options
};

Expand Down
5 changes: 4 additions & 1 deletion src/operations/kill_cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ export class KillCursorsOperation extends AbstractOperation {
cursors: [this.cursorId]
};
try {
await server.command(this.ns, killCursorsCommand, { session });
await server.command(this.ns, killCursorsCommand, {
session,
timeoutContext: this.timeoutContext
});
} catch (error) {
// The driver should never emit errors from killCursors, this is spec-ed behavior
squashError(error);
Expand Down
7 changes: 4 additions & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import { type Timeout, type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -67,6 +67,9 @@ export abstract class AbstractOperation<TResult = any> {
/** @internal */
timeoutMS?: number;

/** @internal */
timeoutContext!: TimeoutContext;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: This is marked with a non-null assertion because the field will be set during execute_operation and only referenced within that function in the following places at which point, if it is not defined, this would indicate a bug in the driver. I can add an inline comment explaining this

  1. Topology.selectServer
  2. Server.command
  3. ConnectionPool.checkOut
  4. Connection.command

[kSession]: ClientSession | undefined;

constructor(options: OperationOptions = {}) {
Expand All @@ -82,8 +85,6 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
4 changes: 2 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
timeout: this.timeout,
timeoutContext: this.timeoutContext,
session
});
return res;
Expand Down Expand Up @@ -62,7 +62,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
...this.options,
readPreference: this.readPreference,
session,
timeout: this.timeout
timeoutContext: this.timeoutContext
});
return res;
}
Expand Down
5 changes: 4 additions & 1 deletion src/operations/search_indexes/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export class CreateSearchIndexesOperation extends AbstractOperation<string[]> {
indexes: this.descriptions
};

const res = await server.command(namespace, command, { session });
const res = await server.command(namespace, command, {
session,
timeoutContext: this.timeoutContext
});

const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? [];
return indexesCreated.map(({ name }) => name);
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation<void> {
}

try {
await server.command(namespace, command, { session });
await server.command(namespace, command, { session, timeoutContext: this.timeoutContext });
} catch (error) {
const isNamespaceNotFoundError =
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation<void> {
definition: this.definition
};

await server.command(namespace, command, { session });
await server.command(namespace, command, { session, timeoutContext: this.timeoutContext });
return;
}
}
2 changes: 1 addition & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
public async command(
ns: MongoDBNamespace,
command: Document,
options?: CommandOptions
options: CommandOptions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are no instances in the driver where we don't pass options into this method

): Promise<Document>;

public async command(
Expand Down
Loading