Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6171): RTT set to zero when serverMonitoringMode=stream #4110

Merged
merged 12 commits into from
May 15, 2024
16 changes: 7 additions & 9 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
return this.rttSampler.min();
}

get latestRtt(): number {
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
get latestRtt(): number | null {
return this.rttSampler.last;
}

addRttSample(rtt: number) {
Expand Down Expand Up @@ -304,7 +304,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

// NOTE: here we use the latestRtt as this measurement corresponds with the value
// obtained for this successful heartbeat
// obtained for this successful heartbeat, if there is no latestRtt, then we calculate the
// duration
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start)
Expand Down Expand Up @@ -498,7 +499,7 @@ export class RTTPinger {
this[kCancellationToken] = monitor[kCancellationToken];
this.closed = false;
this.monitor = monitor;
this.latestRtt = monitor.latestRtt;
this.latestRtt = monitor.latestRtt ?? undefined;

const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
Expand All @@ -520,10 +521,7 @@ export class RTTPinger {
this.connection = undefined;
}

private measureAndReschedule(start?: number, conn?: Connection) {
if (start == null) {
start = now();
}
private measureAndReschedule(start: number, conn?: Connection) {
if (this.closed) {
conn?.destroy();
return;
Expand Down Expand Up @@ -565,7 +563,7 @@ export class RTTPinger {
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => this.measureAndReschedule(),
() => this.measureAndReschedule(start),
() => {
this.connection?.destroy();
this.connection = undefined;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
import { setTimeout } from 'node:timers/promises';

import { expect } from 'chai';
import * as sinon from 'sinon';

import {
Connection,
type MongoClient,
promiseWithResolvers,
type ServerHeartbeatSucceededEvent
} from '../../mongodb';
import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

Expand All @@ -8,3 +19,105 @@ describe('SDAM Unified Tests (Node Driver)', function () {
);
runUnifiedSuite(clonedAndAlteredSpecTests);
});

describe('Monitoring rtt tests', function () {
let client: MongoClient;
let heartbeatDurations: Record<string, number[]>;
const HEARTBEATS_TO_COLLECT_PER_NODE = 65;
const IGNORE_SIZE = 5;
const DELAY_MS = 10;

beforeEach(function () {
heartbeatDurations = Object.create(null);
});

afterEach(async function () {
if (client) {
await client.close();
}
sinon.restore();
});

for (const serverMonitoringMode of ['poll', 'stream']) {
context(`when serverMonitoringMode is set to '${serverMonitoringMode}'`, function () {
context('after collecting a number of heartbeats', function () {
beforeEach(async function () {
client = this.configuration.newClient({
heartbeatFrequencyMS: 100,
serverMonitoringMode
});

// make sendCommand delay for DELAY_MS ms to ensure that the actual time between sending
// a heartbeat and receiving a response don't drop below 1ms. This is done since our
// testing is colocated with its mongo deployment so network latency is very low
const stub = sinon
// @ts-expect-error accessing private method
.stub(Connection.prototype, 'sendCommand')
.callsFake(async function* (...args) {
await setTimeout(DELAY_MS);
yield* stub.wrappedMethod.call(this, ...args);
});
await client.connect();

const { promise, resolve } = promiseWithResolvers<void>();
client.on('serverHeartbeatSucceeded', (ev: ServerHeartbeatSucceededEvent) => {
heartbeatDurations[ev.connectionId] ??= [];
if (
heartbeatDurations[ev.connectionId].length <
HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE
)
heartbeatDurations[ev.connectionId].push(ev.duration);

// We ignore the first few heartbeats since the problem reported in NODE-6172 showed that the
// first few heartbeats were recorded properly
if (
Object.keys(heartbeatDurations).length === client.topology.s.servers.size &&
Object.values(heartbeatDurations).every(
d => d.length === HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE
)
) {
client.removeAllListeners('serverHeartbeatSucceeded');
resolve();
}
});
await promise;
});

it(
'heartbeat duration is not incorrectly reported as zero on ServerHeartbeatSucceededEvents',
{
metadata: {
requires: { topology: '!load-balanced' }
},
test: async function () {
for (const durations of Object.values(heartbeatDurations)) {
const relevantDurations = durations.slice(IGNORE_SIZE);
expect(relevantDurations).to.have.length.gt(0);
const averageDuration =
relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length;
expect(averageDuration).to.be.gt(DELAY_MS);
}
}
}
);

it('ServerDescription.roundTripTime is not incorrectly reported as zero', {
metadata: {
requires: { topology: '!load-balanced' }
},
test: async function () {
for (const [server, durations] of Object.entries(heartbeatDurations)) {
const relevantDurations = durations.slice(IGNORE_SIZE);
expect(relevantDurations).to.have.length.gt(0);
const averageDuration =
relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length;
const rtt = client.topology.description.servers.get(server).roundTripTime;
expect(rtt).to.not.equal(0);
expect(rtt).to.be.approximately(averageDuration, 3);
}
}
});
});
});
}
});