Skip to content

Commit

Permalink
refactor(NODE-5742): add streaming responses support (#3944)
Browse files Browse the repository at this point in the history
Co-authored-by: Durran Jordan <[email protected]>
  • Loading branch information
nbbeeken and durran authored Dec 13, 2023
1 parent b13144a commit 70a2ef9
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 165 deletions.
221 changes: 114 additions & 107 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: (
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined
) => Promise<Document>;
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -217,15 +212,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = promisify(
(
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined,
callback: Callback
) => this.command(ns, cmd, options, callback as any)
);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand Down Expand Up @@ -262,6 +248,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kMessageStream].pipe(this[kStream]);
}

// This whole class is temporary,
// Need to move this to be defined on the prototype for spying.
async commandAsync(ns: MongoDBNamespace, cmd: Document, opt?: CommandOptions) {
return promisify(this.command.bind(this))(ns, cmd, opt);
}

get description(): StreamDescription {
return this[kDescription];
}
Expand Down Expand Up @@ -791,7 +783,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: ModernConnection['command'];
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -831,8 +822,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = this.command.bind(this);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand All @@ -852,6 +841,10 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
this.socket.on('timeout', this.onTimeout.bind(this));
}

async commandAsync(...args: Parameters<typeof this.command>) {
return this.command(...args);
}

/** Indicates that the connection (including underlying TCP socket) has been closed. */
get closed(): boolean {
return this.controller.signal.aborted;
Expand Down Expand Up @@ -1036,62 +1029,68 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
return message;
}

private async sendCommand(
message: WriteProtocolMessageType,
options: CommandOptions
): Promise<Document> {
const { signal } = this.controller;

signal.throwIfAborted();
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}

let response;
try {
await writeCommand(this, message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal
signal: this.controller.signal
});

if (options.noResponse) return { ok: 1 };
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

signal.throwIfAborted();
if (options.noResponse) {
yield { ok: 1 };
return;
}

response = await read(this, { signal });
} finally {
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
if (!signal.aborted) this.controller = new AbortController();
}
this.controller.signal.throwIfAborted();

response.parse(options);
for await (const response of readMany(this, { signal: this.controller.signal })) {
this.socket.setTimeout(0);
response.parse(options);

const [document] = response.documents;
const [document] = response.documents;

if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}
if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}
}
} finally {
this.socket.setTimeout(0);
}

return document;
}

async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
async *sendCommand(ns: MongoDBNamespace, command: Document, options: CommandOptions = {}) {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
Expand All @@ -1103,76 +1102,84 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
);
}

let document = null;
let document;
try {
document = await this.sendCommand(message, options);
} catch (ioError) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, ioError, started)
);
}
throw ioError;
}
this.controller.signal.throwIfAborted();
for await (document of this.sendWire(message, options)) {
if (!Buffer.isBuffer(document) && document.writeConcernError) {
throw new MongoWriteConcernError(document.writeConcernError, document);
}

if (document == null) {
const unexpected = new MongoUnexpectedServerResponseError(
'sendCommand did not throw and did not return a document'
);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, unexpected, started)
);
}
throw unexpected;
}
if (
!Buffer.isBuffer(document) &&
(document.ok === 0 || document.$err || document.errmsg || document.code)
) {
throw new MongoServerError(document);
}

if (document.writeConcernError) {
const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(this as unknown as Connection, message, document, started)
);
}
throw writeConcernError;
}
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
const serverError = new MongoServerError(document);
yield document;
this.controller.signal.throwIfAborted();
}
} catch (error) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, serverError, started)
);
error.name === 'MongoWriteConcernError'
? this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
)
: this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, error, started)
);
}
throw serverError;
throw error;
}
}

if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
this.controller.signal.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {
return document;
}

return document;
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
}

exhaustCommand(
_ns: MongoDBNamespace,
_command: Document,
_options: CommandOptions,
_replyListener: Callback
ns: MongoDBNamespace,
command: Document,
options: CommandOptions,
replyListener: Callback
) {
throw new Error('NODE-5742: not implemented.');
const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
}
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
};
exhaustLoop().catch(replyListener);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { clearTimeout, setTimeout } from 'timers';

import { type Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, type ConnectionOptions } from '../cmap/connection';
import type { Connection, ConnectionOptions } from '../cmap/connection';
import { getFAASEnv } from '../cmap/handshake/client_metadata';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
Expand Down Expand Up @@ -132,9 +132,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
useBigInt64: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true,
// TODO(NODE-5741): override monitors to use old connection
connectionType: Connection
promoteBuffers: true
};

// ensure no authentication is used for monitoring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ describe('class RTTPinger', () => {
const rttPingers = await getRTTPingers(client);

for (const rtt of rttPingers) {
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
sinon.stub(rtt.connection, 'commandAsync').rejects(new Error('any error'));
}
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));

Expand Down
37 changes: 22 additions & 15 deletions test/integration/mongodb-handshake/mongodb-handshake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as sinon from 'sinon';
import {
Connection,
LEGACY_HELLO_COMMAND,
ModernConnection,
MongoServerError,
MongoServerSelectionError,
OpMsgRequest,
Expand All @@ -19,20 +20,24 @@ describe('MongoDB Handshake', () => {

context('when hello is too large', () => {
before(() => {
sinon.stub(Connection.prototype, 'command').callsFake(function (ns, cmd, options, callback) {
// @ts-expect-error: sinon will place wrappedMethod there
const command = Connection.prototype.command.wrappedMethod.bind(this);

if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
return command(
ns,
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
options,
callback
);
}
return command(ns, cmd, options, callback);
});
const connectionType =
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;

sinon
.stub(connectionType.prototype, 'commandAsync')
.callsFake(async function (ns, cmd, options) {
// @ts-expect-error: sinon will place wrappedMethod there
const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this);

if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
return commandAsync(
ns,
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
options
);
}
return commandAsync(ns, cmd, options);
});
});

after(() => sinon.restore());
Expand All @@ -53,7 +58,9 @@ describe('MongoDB Handshake', () => {
let spy: Sinon.SinonSpy;

before(() => {
spy = sinon.spy(Connection.prototype, 'command');
const connectionType =
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;
spy = sinon.spy(connectionType.prototype, 'commandAsync');
});

after(() => sinon.restore());
Expand Down
Loading

0 comments on commit 70a2ef9

Please sign in to comment.