From d5310acd73a4e75f3531b2d59dc21bae3e081ac1 Mon Sep 17 00:00:00 2001 From: Reinhard Steiner Date: Thu, 10 Jun 2021 23:10:53 +0200 Subject: [PATCH] Merge master registries with cluster workers (#183) --- README.md | 2 +- lib/cluster.js | 76 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 67d3f0ad..aba57a8e 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ label. (See `example/server.js` for an example using Metrics are aggregated from the global registry by default. To use a different registry, call `client.AggregatorRegistry.setRegistries(registryOrArrayOfRegistries)` from the -worker processes. +master or worker processes. ## API diff --git a/lib/cluster.js b/lib/cluster.js index 822214ca..161b75fa 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -2,7 +2,7 @@ /** * Extends the Registry class with a `clusterMetrics` method that returns - * aggregated metrics for all workers. + * aggregated metrics for master and all workers. * * In cluster workers, listens for and responds to requests for metrics by the * cluster master. @@ -34,8 +34,8 @@ class AggregatorRegistry extends Registry { } /** - * Gets aggregated metrics for all workers. The optional callback and - * returned Promise resolve with the same value; either may be used. + * Gets aggregated metrics for master and all workers. The optional callback + * and returned Promise resolve with the same value; either may be used. * @return {Promise} Promise that resolves with the aggregated * metrics. */ @@ -62,6 +62,27 @@ class AggregatorRegistry extends Registry { }; requests.set(requestId, request); + // Get metrics from master + if (registries && registries.length > 0) { + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + processMetricsResponse({ + type: GET_METRICS_RES, + requestId, + metrics, + }); + }) + .catch(error => { + processMetricsResponse({ + type: GET_METRICS_RES, + requestId, + error: error.message, + }); + }); + request.pending++; + } + + // Get metrics from workers const message = { type: GET_METRICS_REQ, requestId, @@ -78,6 +99,7 @@ class AggregatorRegistry extends Registry { if (request.pending === 0) { // No workers were up + requests.delete(requestId); clearTimeout(request.errorTimeout); process.nextTick(() => done(null, '')); } @@ -145,6 +167,34 @@ class AggregatorRegistry extends Registry { } } +/** + * Adds metrics from master and worker to request and finalizes request when + * all metrics are collected. + * @param {object} message - GET_METRICS_RES message object containing metrics + * @return {void} + */ +function processMetricsResponse(message) { + const request = requests.get(message.requestId); + + if (message.error) { + request.done(new Error(message.error)); + return; + } + + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; + + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics().then(metrics => metrics.trim()); + request.done(null, promString); + } +} + /** * Adds event listeners for cluster aggregation. Idempotent (safe to call more * than once). @@ -158,25 +208,7 @@ function addListeners() { // Listen for worker responses to requests for local metrics cluster().on('message', (worker, message) => { if (message.type === GET_METRICS_RES) { - const request = requests.get(message.requestId); - - if (message.error) { - request.done(new Error(message.error)); - return; - } - - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; - - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); - } + processMetricsResponse(message); } }); }