Skip to content

Commit

Permalink
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed Jul 8, 2024
1 parent 9e1d550 commit 2207e49
Show file tree
Hide file tree
Showing 45 changed files with 796 additions and 202 deletions.
4 changes: 4 additions & 0 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maybeAddIdToDocuments } from '../utils';
import {
applyRetryableWrites,
Expand Down Expand Up @@ -874,6 +875,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;

/** @internal */
timeoutContext?: TimeoutContext;
}

const executeCommandsAsync = promisify(executeCommands);
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import { type TimeoutContext } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
directConnection?: boolean;

/** @internal */
timeout?: Timeout;
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down
39 changes: 7 additions & 32 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { type TimeoutContext, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}
const timeout = options.timeoutContext.connectionCheckoutTimeout;

const waitQueueMember: WaitQueueMember = {
resolve,
Expand All @@ -403,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
timeout?.clear();
waitQueueMember[kCancelled] = true;

this.emitAndLog(
Expand All @@ -415,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
if (options.timeoutContext.csotEnabled()) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
Expand All @@ -424,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,13 @@ export type {
RTTSampler,
ServerMonitoringMode
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
export type {
Server,
ServerCommandOptions,
ServerEvents,
ServerOptions,
ServerPrivate
} from './sdam/server';
export type {
ServerDescription,
ServerDescriptionOptions,
Expand Down Expand Up @@ -565,7 +571,15 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type {
CSOTTimeoutContext,
CSOTTimeoutContextOptions,
LegacyTimeoutContext,
LegacyTimeoutContextOptions,
Timeout,
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
5 changes: 4 additions & 1 deletion src/operations/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/r
import { MongoInvalidArgumentError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
Expand Down Expand Up @@ -96,7 +97,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<CursorResponse> {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
Expand Down Expand Up @@ -141,6 +143,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
server,
session,
command,
timeoutContext,
this.explain ? ExplainedCursorResponse : CursorResponse
);
}
Expand Down
11 changes: 9 additions & 2 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand All @@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<BulkWriteResult> {
const coll = this.collection;
const operations = this.operations;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
const options = {
...this.options,
...this.bsonOptions,
readPreference: this.readPreference,
timeoutContext
};

// Create the bulk operation
const bulk: BulkOperationBase =
Expand Down
8 changes: 6 additions & 2 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { ReadPreference } from '../read_preference';
import type { Server } from '../sdam/server';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import {
commandSupportsReadConcern,
decorateWithExplain,
Expand Down Expand Up @@ -112,27 +113,30 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
server: Server,
session: ClientSession | undefined,
cmd: Document,
timeoutContext: TimeoutContext,
responseType: T | undefined
): Promise<typeof responseType extends undefined ? Document : InstanceType<T>>;

public async executeCommand(
server: Server,
session: ClientSession | undefined,
cmd: Document
cmd: Document,
timeoutContext: TimeoutContext
): Promise<Document>;

async executeCommand(
server: Server,
session: ClientSession | undefined,
cmd: Document,
timeoutContext: TimeoutContext,
responseType?: MongoDBResponseConstructor
): Promise<Document> {
this.server = server;

const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
timeoutContext,
readPreference: this.readPreference,
session
};
Expand Down
9 changes: 7 additions & 2 deletions src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Document } from '../bson';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';
import { CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects } from './operation';
Expand Down Expand Up @@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation<number> {
return 'count' as const;
}

override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<number> {
const options = this.options;
const cmd: Document = {
count: this.collectionName,
Expand All @@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation<number> {
cmd.maxTimeMS = options.maxTimeMS;
}

const result = await super.executeCommand(server, session, cmd);
const result = await super.executeCommand(server, session, cmd, timeoutContext);
return result ? result.n : 0;
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error';
import type { PkFactory } from '../mongo_client';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { CommandOperation, type CommandOperationOptions } from './command';
import { CreateIndexesOperation } from './indexes';
import { Aspect, defineAspects } from './operation';
Expand Down Expand Up @@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
return 'create' as const;
}

override async execute(server: Server, session: ClientSession | undefined): Promise<Collection> {
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
const options = this.options;
Expand Down Expand Up @@ -155,15 +160,15 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
unique: true
}
});
await createOp.executeWithoutEncryptedFieldsCheck(server, session);
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
}

if (!options.encryptedFields) {
this.options = { ...this.options, encryptedFields };
}
}

const coll = await this.executeWithoutEncryptedFieldsCheck(server, session);
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);

if (encryptedFields) {
// Create the required index for queryable encryption support.
Expand All @@ -173,15 +178,16 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
{ __safeContent__: 1 },
{}
);
await createIndexOp.execute(server, session);
await createIndexOp.execute(server, session, timeoutContext);
}

return coll;
}

private async executeWithoutEncryptedFieldsCheck(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
Expand All @@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
}
}
// otherwise just execute the command
await super.executeCommand(server, session, cmd);
await super.executeCommand(server, session, cmd, timeoutContext);
return new Collection(db, name, options);
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/operations/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoServerError } from '../error';
import { type TODO_NODE_3286 } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { type MongoDBNamespace } from '../utils';
import { type WriteConcernOptions } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
Expand Down Expand Up @@ -67,7 +68,8 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const options = this.options ?? {};
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
Expand Down Expand Up @@ -95,7 +97,12 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
}
}

const res: TODO_NODE_3286 = await super.executeCommand(server, session, command);
const res: TODO_NODE_3286 = await super.executeCommand(
server,
session,
command,
timeoutContext
);
return res;
}
}
Expand All @@ -107,9 +114,10 @@ export class DeleteOneOperation extends DeleteOperation {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const res: TODO_NODE_3286 = await super.execute(server, session);
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
if (this.explain) return res;
if (res.code) throw new MongoServerError(res);
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);
Expand All @@ -127,9 +135,10 @@ export class DeleteManyOperation extends DeleteOperation {

override async execute(
server: Server,
session: ClientSession | undefined
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const res: TODO_NODE_3286 = await super.execute(server, session);
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
if (this.explain) return res;
if (res.code) throw new MongoServerError(res);
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);
Expand Down
Loading

0 comments on commit 2207e49

Please sign in to comment.