Skip to content

Commit

Permalink
use proper label in queue populator metrics
Browse files Browse the repository at this point in the history
Queue Populator can be used for a single or multiple
extensions at once. Currently the 'origin' label in the
metrics is always set to "replication" which is not always
true,  we change this to make it hold the list of loaded
extensions in the current queue populator instance.

Issue: BB-527
  • Loading branch information
Kerkesni committed Jan 16, 2025
1 parent d561e49 commit 34de938
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 14 deletions.
4 changes: 3 additions & 1 deletion lib/queuePopulator/BucketFileLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ const LogReader = require('./LogReader');
class BucketFileLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, dmdConfig, logger,
extensions, metricsProducer, metricsHandler } = params;
extensions, extensionNames, metricsProducer, metricsHandler } = params;
super({ zkClient, kafkaConfig, logConsumer: null,
logId: `bucketFile_${dmdConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });

this._dmdConfig = dmdConfig;
this._log = logger;
this._extensionNames = extensionNames;
this._log.info('initializing bucketfile log reader', {
method: 'BucketFileLogReader.constructor',
dmdConfig,
Expand Down Expand Up @@ -45,6 +46,7 @@ class BucketFileLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'bucket-file',
logId: this._dmdConfig.logName,
};
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/KafkaLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KafkaLogReader extends LogReader {
*/
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
// conf contains global kafka and queuePoplator kafka configs
const conf = {
hosts: kafkaConfig.hosts,
Expand All @@ -33,6 +33,7 @@ class KafkaLogReader extends LogReader {
logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
this._kafkaConfig = conf;
this._extensionNames = extensionNames;
}

/**
Expand Down Expand Up @@ -67,6 +68,7 @@ class KafkaLogReader extends LogReader {
*/
getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'kafka-log',
logId: this._kafkaConfig.logName,
};
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/MongoLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const LogReader = require('./LogReader');
class MongoLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, mongoConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
logger.info('initializing mongo log reader',
{ method: 'MongoLogReader.constructor',
mongoConfig });
Expand All @@ -14,6 +14,7 @@ class MongoLogReader extends LogReader {
logId: `mongo_${mongoConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
this._mongoConfig = mongoConfig;
this._extensionNames = extensionNames;
}

/**
Expand Down Expand Up @@ -44,6 +45,7 @@ class MongoLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'mongo-log',
logId: this._mongoConfig.logName,
};
Expand Down
21 changes: 11 additions & 10 deletions lib/queuePopulator/QueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ const byteMetrics = ZenkoMetrics.createCounter({
labelNames: metricLabels,
});

const defaultLabels = {
origin: 'replication',
};

const notificationEvent = ZenkoMetrics.createCounter({
name: 's3_notification_queue_populator_events_total',
help: 'Total number of oplog events processed by notification extension',
Expand All @@ -90,12 +86,12 @@ const notificationEvent = ZenkoMetrics.createCounter({
* @property {GaugeSet} logSize - Set the log size metric
*/
const metricsHandler = {
messages: wrapCounterInc(messageMetrics, defaultLabels),
objects: wrapCounterInc(objectMetrics, defaultLabels),
bytes: wrapCounterInc(byteMetrics, defaultLabels),
logReadOffset: wrapGaugeSet(logReadOffsetMetric, defaultLabels),
logSize: wrapGaugeSet(logSizeMetric, defaultLabels),
logTimestamp: wrapGaugeSet(logTimestamp, defaultLabels),
messages: wrapCounterInc(messageMetrics, {}),
objects: wrapCounterInc(objectMetrics, {}),
bytes: wrapCounterInc(byteMetrics, {}),
logReadOffset: wrapGaugeSet(logReadOffsetMetric, {}),
logSize: wrapGaugeSet(logSizeMetric, {}),
logTimestamp: wrapGaugeSet(logTimestamp, {}),
notifEvent: wrapCounterInc(notificationEvent, {}),
};

Expand Down Expand Up @@ -301,6 +297,7 @@ class QueuePopulator {
}

_setupLogSources() {
const extensionNames = this._loadedExtensions.join(',');

Check warning on line 300 in lib/queuePopulator/QueuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/queuePopulator/QueuePopulator.js#L300

Added line #L300 was not covered by tests
switch (this.qpConfig.logSource) {
case 'bucketd':
// initialization of log source is deferred until the
Expand All @@ -317,6 +314,7 @@ class QueuePopulator {
mongoConfig: this.qpConfig.mongo,
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand All @@ -334,6 +332,7 @@ class QueuePopulator {
),
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand All @@ -347,6 +346,7 @@ class QueuePopulator {
dmdConfig: this.qpConfig.dmd,
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand Down Expand Up @@ -385,6 +385,7 @@ class QueuePopulator {
raftId: token,
logger: this.log,
extensions: this._extensions,
extensionNames: this._loadedExtensions.join(','),
metricsProducer: this._mProducer,
metricsHandler,
}));
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/RaftLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const LogReader = require('./LogReader');
class RaftLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, bucketdConfig, httpsConfig,
raftId, logger, extensions, metricsProducer, metricsHandler } = params;
raftId, logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
const { host, port } = bucketdConfig;
logger.info('initializing raft log reader',
{ method: 'RaftLogReader.constructor',
Expand All @@ -27,6 +27,7 @@ class RaftLogReader extends LogReader {
super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`,
logger, extensions, metricsProducer, metricsHandler });
this.raftId = raftId;
this._extensionNames = extensionNames;
}

getLogInfo() {
Expand All @@ -35,6 +36,7 @@ class RaftLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'raft-log',
logId: this.raftId,
};
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ const { errors } = require('arsenal');
const { Logger } = require('werelogs');

const LogReader = require('../../../../lib/queuePopulator/LogReader');
const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader');
const BucketFileLogReader = require('../../../../lib/queuePopulator/BucketFileLogReader');
const RaftLogReader = require('../../../../lib/queuePopulator/RaftLogReader');
const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader');


class MockLogConsumer {
Expand Down Expand Up @@ -505,4 +509,67 @@ describe('LogReader', () => {
});
});
});

describe('getMetricLabels', () => {
[{
name: 'KafkaLogReader',
Reader: KafkaLogReader,
config: {
kafkaConfig: {
hosts: 'localhost:9092',
},
qpKafkaConfig: {
logName: 'test-log',
},
},
logName: 'kafka-log',
}, {
name: 'BucketFileLogReader',
Reader: BucketFileLogReader,
config: {
dmdConfig: {
logName: 'test-log',
host: 'localhost',
port: 8000,
},
},
logName: 'bucket-file',
}, {
name: 'RaftLogReader',
Reader: RaftLogReader,
config: {
raftId: 'test-log',
bucketdConfig: {
host: 'localhost',
port: 8000,
},
},
logName: 'raft-log',
}, {
name: 'RaftLogReader',
Reader: MongoLogReader,
config: {
mongoConfig: {
logName: 'test-log',
host: 'localhost',
port: 8000,
},
},
logName: 'mongo-log',
}].forEach(params => {
it(`should return proper ${params.name} metrics labels`, () => {
const reader = new params.Reader({
...params.config,
logger: new Logger('test:LogReader'),
extensionNames: 'replication,lifecycle,notification',
});
const expectedLabels = {
logId: 'test-log',
logName: params.logName,
origin: 'replication,lifecycle,notification',
};
assert.deepStrictEqual(reader.getMetricLabels(), expectedLabels);
});
});
});
});

0 comments on commit 34de938

Please sign in to comment.