Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6171): RTT set to zero when serverMonitoringMode=stream #4110

Merged
merged 12 commits into from
May 15, 2024
16 changes: 7 additions & 9 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
return this.rttSampler.min();
}

get latestRtt(): number {
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
get latestRtt(): number | null {
return this.rttSampler.last;
}

addRttSample(rtt: number) {
Expand Down Expand Up @@ -304,7 +304,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

// NOTE: here we use the latestRtt as this measurement corresponds with the value
// obtained for this successful heartbeat
// obtained for this successful heartbeat, if there is no latestRtt, then we calculate the
// duration
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start)
Expand Down Expand Up @@ -498,7 +499,7 @@ export class RTTPinger {
this[kCancellationToken] = monitor[kCancellationToken];
this.closed = false;
this.monitor = monitor;
this.latestRtt = monitor.latestRtt;
this.latestRtt = monitor.latestRtt ?? undefined;

const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
Expand All @@ -520,10 +521,7 @@ export class RTTPinger {
this.connection = undefined;
}

private measureAndReschedule(start?: number, conn?: Connection) {
if (start == null) {
start = now();
}
private measureAndReschedule(start: number, conn?: Connection) {
if (this.closed) {
conn?.destroy();
return;
Expand Down Expand Up @@ -565,7 +563,7 @@ export class RTTPinger {
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => this.measureAndReschedule(),
() => this.measureAndReschedule(start),
() => {
this.connection?.destroy();
this.connection = undefined;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import { EventEmitter, once } from 'node:events';
import { setTimeout } from 'node:timers/promises';

import { expect } from 'chai';
import * as sinon from 'sinon';

import { Connection, type MongoClient, type ServerHeartbeatSucceededEvent } from '../../mongodb';
import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

Expand All @@ -8,3 +15,112 @@ describe('SDAM Unified Tests (Node Driver)', function () {
);
runUnifiedSuite(clonedAndAlteredSpecTests);
});

describe('Monitoring rtt tests', function () {
let client: MongoClient;
let heartbeatDurations: Record<string, number[]>;
const HEARTBEATS_TO_COLLECT = 200; // Wait for 200 total heartbeats. This is high enough to work for standalone, sharded and our typical 3-node replica set topology tests
const IGNORE_SIZE = 20;
const DELAY_MS = 10;
let count: number;
const ee = new EventEmitter();

const listener = (ev: ServerHeartbeatSucceededEvent) => {
if (!client.topology.s.servers.has(ev.connectionId)) return;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
count++;
if (count < IGNORE_SIZE) {
return;
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

heartbeatDurations[ev.connectionId].push(ev.duration);

nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
// We ignore the first few heartbeats since the problem reported in NODE-6172 showed that the
// first few heartbeats were recorded properly
if (count === IGNORE_SIZE) {
return;
}

if (count >= HEARTBEATS_TO_COLLECT + IGNORE_SIZE) {
client.off('serverHeartbeatSucceeded', listener);
ee.emit('done');
}
};

beforeEach(function () {
count = 0;
heartbeatDurations = {};
});

afterEach(async function () {
if (client) {
await client.close();
}
sinon.restore();
});

for (const serverMonitoringMode of ['poll', 'stream']) {
context(`when serverMonitoringMode is set to '${serverMonitoringMode}'`, function () {
context('after collecting a number of heartbeats', function () {
beforeEach(async function () {
client = this.configuration.newClient({
heartbeatFrequencyMS: 100,
serverMonitoringMode
});

// make send command delay for DELAY_MS ms to ensure that the actual time between sending
// a heartbeat and receiving a response don't drop below 1ms. This is done since our
// testing is colocated with its mongo deployment so network latency is very low
const stub = sinon
// @ts-expect-error accessing private method
.stub(Connection.prototype, 'sendCommand')
.callsFake(async function* (...args) {
await setTimeout(DELAY_MS);
yield* stub.wrappedMethod.call(this, ...args);
});
await client.connect();

client.on('serverHeartbeatSucceeded', listener);

for (const k of client.topology.s.servers.keys()) {
heartbeatDurations[k] = [];
}

await once(ee, 'done');
});

it(
'heartbeat duration is not incorrectly reported as zero on ServerHeartbeatSucceededEvents',
{
metadata: {
requires: { topology: '!load-balanced' }
},
test: async function () {
for (const server in heartbeatDurations) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const averageDuration =
heartbeatDurations[server].reduce((acc, x) => acc + x) /
heartbeatDurations[server].length;
expect(averageDuration).to.be.gt(DELAY_MS);
}
}
}
);

it('ServerDescription.roundTripTime is not incorrectly reported as zero', {
metadata: {
requires: { topology: '!load-balanced' }
},
test: async function () {
for (const server in heartbeatDurations) {
const averageDuration =
heartbeatDurations[server].reduce((acc, x) => acc + x) /
heartbeatDurations[server].length;
expect(
client.topology.description.servers.get(server).roundTripTime
).to.be.approximately(averageDuration, 1);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}
}
});
});
});
}
});