Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6187): refactor to use TimeoutContext abstraction #4131

Merged
merged 23 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maybeAddIdToDocuments } from '../utils';
import {
applyRetryableWrites,
Expand Down Expand Up @@ -874,6 +875,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;

/** @internal */
timeoutContext?: TimeoutContext;
}

const executeCommandsAsync = promisify(executeCommands);
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import { type TimeoutContext } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -95,7 +95,7 @@ export interface CommandOptions extends BSONSerializeOptions {
directConnection?: boolean;

/** @internal */
timeout?: Timeout;
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down
39 changes: 7 additions & 32 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { type TimeoutContext, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
this.emitAndLog(
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}
const timeout = options.timeoutContext.connectionCheckoutTimeout;

const waitQueueMember: WaitQueueMember = {
resolve,
Expand All @@ -403,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
timeout?.clear();
waitQueueMember[kCancelled] = true;

this.emitAndLog(
Expand All @@ -415,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
if (options.timeoutContext.csotEnabled()) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
Expand All @@ -424,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,13 @@ export type {
RTTSampler,
ServerMonitoringMode
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
export type {
Server,
ServerCommandOptions,
ServerEvents,
ServerOptions,
ServerPrivate
} from './sdam/server';
export type {
ServerDescription,
ServerDescriptionOptions,
Expand Down Expand Up @@ -557,7 +563,15 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type {
CSOTTimeoutContext,
CSOTTimeoutContextOptions,
LegacyTimeoutContext,
LegacyTimeoutContextOptions,
Timeout,
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
14 changes: 12 additions & 2 deletions src/operations/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MongoInvalidArgumentError } from '../error';
import { type TODO_NODE_3286 } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
Expand Down Expand Up @@ -94,7 +95,11 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
this.pipeline.push(stage);
}

override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<T> {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
Expand Down Expand Up @@ -134,7 +139,12 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
command.cursor.batchSize = options.batchSize;
}

const res: TODO_NODE_3286 = await super.executeCommand(server, session, command);
const res: TODO_NODE_3286 = await super.executeCommand(
server,
session,
command,
timeoutContext
);
return res;
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand All @@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<BulkWriteResult> {
const coll = this.collection;
const operations = this.operations;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
const options = {
...this.options,
...this.bsonOptions,
readPreference: this.readPreference,
timeoutContext
};

// Create the bulk operation
const bulk: BulkOperationBase =
Expand Down
6 changes: 4 additions & 2 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { ReadPreference } from '../read_preference';
import type { Server } from '../sdam/server';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import {
commandSupportsReadConcern,
decorateWithExplain,
Expand Down Expand Up @@ -110,15 +111,16 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
async executeCommand(
server: Server,
session: ClientSession | undefined,
cmd: Document
cmd: Document,
timeoutContext: TimeoutContext
): Promise<Document> {
// TODO: consider making this a non-enumerable property
this.server = server;

const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
timeoutContext,
readPreference: this.readPreference,
session
};
Expand Down
9 changes: 7 additions & 2 deletions src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Document } from '../bson';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';
import { CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects } from './operation';
Expand Down Expand Up @@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation<number> {
return 'count' as const;
}

override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<number> {
const options = this.options;
const cmd: Document = {
count: this.collectionName,
Expand All @@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation<number> {
cmd.maxTimeMS = options.maxTimeMS;
}

const result = await super.executeCommand(server, session, cmd);
const result = await super.executeCommand(server, session, cmd, timeoutContext);
return result ? result.n : 0;
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/operations/count_documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Document } from '../bson';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { AggregateOperation, type AggregateOptions } from './aggregate';

/** @public */
Expand Down Expand Up @@ -31,8 +32,12 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
super(collection.s.namespace, pipeline, options);
}

override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
const result = await super.execute(server, session);
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<number> {
const result = await super.execute(server, session, timeoutContext);

// NOTE: We're avoiding creating a cursor here to reduce the callstack.
const response = result as unknown as Document;
Expand Down
18 changes: 12 additions & 6 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error';
import type { PkFactory } from '../mongo_client';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { CommandOperation, type CommandOperationOptions } from './command';
import { CreateIndexesOperation } from './indexes';
import { Aspect, defineAspects } from './operation';
Expand Down Expand Up @@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
return 'create' as const;
}

override async execute(server: Server, session: ClientSession | undefined): Promise<Collection> {
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
const options = this.options;
Expand Down Expand Up @@ -155,15 +160,15 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
unique: true
}
});
await createOp.executeWithoutEncryptedFieldsCheck(server, session);
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
}

if (!options.encryptedFields) {
this.options = { ...this.options, encryptedFields };
}
}

const coll = await this.executeWithoutEncryptedFieldsCheck(server, session);
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);

if (encryptedFields) {
// Create the required index for queryable encryption support.
Expand All @@ -173,15 +178,16 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
{ __safeContent__: 1 },
{}
);
await createIndexOp.execute(server, session);
await createIndexOp.execute(server, session, timeoutContext);
}

return coll;
}

private async executeWithoutEncryptedFieldsCheck(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
Expand All @@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
}
}
// otherwise just execute the command
await super.executeCommand(server, session, cmd);
await super.executeCommand(server, session, cmd, timeoutContext);
return new Collection(db, name, options);
}
}
Expand Down
Loading