Skip to content

Commit

Permalink
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
Browse files Browse the repository at this point in the history
…ver selection
  • Loading branch information
W-A-James committed Jun 5, 2024
1 parent f790cc1 commit 61564cd
Show file tree
Hide file tree
Showing 21 changed files with 575 additions and 176 deletions.
3 changes: 2 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
})
);
}
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -92,6 +93,9 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

/** @internal */
timeout?: Timeout;
}

/** @public */
Expand Down
51 changes: 38 additions & 13 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import {
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
export interface WaitQueueMember {
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeout: Timeout;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -354,35 +354,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(): Promise<Connection> {
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);
let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}

const waitQueueMember: WaitQueueMember = {
resolve,
reject,
timeout
reject
};

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

try {
return await Promise.race([promise, waitQueueMember.timeout]);
timeout?.throwIfExpired();
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
waitQueueMember[kCancelled] = true;

waitQueueMember.timeout.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
Expand All @@ -393,9 +415,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
}
throw timeoutError;
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
}
}

Expand Down Expand Up @@ -761,7 +790,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
waitQueueMember.timeout.clear();
this[kWaitQueue].shift();
waitQueueMember.reject(error);
continue;
Expand All @@ -782,7 +810,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.timeout.clear();

this[kWaitQueue].shift();
waitQueueMember.resolve(connection);
Expand Down Expand Up @@ -820,8 +847,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
waitQueueMember.resolve(connection);
}

waitQueueMember.timeout.clear();
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
5 changes: 5 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ export class Collection<TSchema extends Document = Document> {
this.s.collectionHint = normalizeHintField(v);
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options.timeoutMS;
}

/**
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
Expand Down
6 changes: 6 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ export class Db {
return this.s.namespace.toString();
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options?.timeoutMS;
}

/**
* Create a new collection on a server with the specified options. Use this to create capped collections.
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
Expand Down Expand Up @@ -272,6 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
9 changes: 9 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
}
}

/**
* @internal
*/
export class MongoOperationTimeoutError extends MongoRuntimeError {
override get name(): string {
return 'MongoOperationTimeoutError';
}
}

/**
* An error thrown when the user attempts to add options to a cursor that has already been
* initialized
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export {
MongoNetworkTimeoutError,
MongoNotConnectedError,
MongoOIDCError,
MongoOperationTimeoutError,
MongoParseError,
MongoRuntimeError,
MongoServerClosedError,
Expand Down
2 changes: 2 additions & 0 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface OperationParent {
writeConcern?: WriteConcern;
readPreference?: ReadPreference;
bsonOptions?: BSONSerializeOptions;
timeoutMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -117,6 +118,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
readPreference: this.readPreference,
session
};
Expand Down
8 changes: 7 additions & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

Expand Down Expand Up @@ -152,9 +153,13 @@ export async function executeOperation<
selector = readPreference;
}

const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
operation.timeout = timeout;

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
timeout
});

if (session == null) {
Expand Down Expand Up @@ -265,6 +270,7 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.timeout,
operationName: operation.commandName,
previousServer
});
Expand Down
3 changes: 2 additions & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ export class FindOperation extends CommandOperation<Document> {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
session,
timeout: this.timeout
},
undefined
);
Expand Down
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -61,6 +62,11 @@ export abstract class AbstractOperation<TResult = any> {

options: OperationOptions;

/** @internal */
timeout?: Timeout;
/** @internal */
timeoutMS?: number;

[kSession]: ClientSession | undefined;

constructor(options: OperationOptions = {}) {
Expand All @@ -76,6 +82,8 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
6 changes: 5 additions & 1 deletion src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export type RunCommandOptions = {
session?: ClientSession;
/** The read preference */
readPreference?: ReadPreferenceLike;
/** @internal */
timeoutMS?: number;
} & BSONSerializeOptions;

/** @internal */
Expand All @@ -31,6 +33,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
timeout: this.timeout,
session
});
return res;
Expand Down Expand Up @@ -58,7 +61,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
});
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.incrementOperationCount();
if (conn == null) {
try {
conn = await this.pool.checkOut();
conn = await this.pool.checkOut(options);
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
session?.pin(conn);
}
Expand All @@ -333,6 +333,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
await this.pool.reauthenticate(conn);
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
try {
return await conn.command(ns, cmd, finalOptions, responseType);
} catch (commandError) {
Expand Down
Loading

0 comments on commit 61564cd

Please sign in to comment.