diff --git a/lib/queuePopulator/BucketFileLogReader.js b/lib/queuePopulator/BucketFileLogReader.js index dcf5b7dad..af98abe51 100644 --- a/lib/queuePopulator/BucketFileLogReader.js +++ b/lib/queuePopulator/BucketFileLogReader.js @@ -6,13 +6,14 @@ const LogReader = require('./LogReader'); class BucketFileLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, dmdConfig, logger, - extensions, metricsProducer, metricsHandler } = params; + extensions, extensionNames, metricsProducer, metricsHandler } = params; super({ zkClient, kafkaConfig, logConsumer: null, logId: `bucketFile_${dmdConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._dmdConfig = dmdConfig; this._log = logger; + this._extensionNames = extensionNames; this._log.info('initializing bucketfile log reader', { method: 'BucketFileLogReader.constructor', dmdConfig, @@ -45,6 +46,7 @@ class BucketFileLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'bucket-file', logId: this._dmdConfig.logName, }; diff --git a/lib/queuePopulator/KafkaLogReader.js b/lib/queuePopulator/KafkaLogReader.js index a5be6cae4..0d7637321 100644 --- a/lib/queuePopulator/KafkaLogReader.js +++ b/lib/queuePopulator/KafkaLogReader.js @@ -19,7 +19,7 @@ class KafkaLogReader extends LogReader { */ constructor(params) { const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; // conf contains global kafka and queuePoplator kafka configs const conf = { hosts: kafkaConfig.hosts, @@ -33,6 +33,7 @@ class KafkaLogReader extends LogReader { logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._kafkaConfig = conf; + this._extensionNames = extensionNames; } /** @@ -67,6 +68,7 @@ class KafkaLogReader extends LogReader { */ getMetricLabels() { return { + origin: this._extensionNames, logName: 'kafka-log', logId: this._kafkaConfig.logName, }; diff --git a/lib/queuePopulator/MongoLogReader.js b/lib/queuePopulator/MongoLogReader.js index ac9a93568..e215214f2 100644 --- a/lib/queuePopulator/MongoLogReader.js +++ b/lib/queuePopulator/MongoLogReader.js @@ -5,7 +5,7 @@ const LogReader = require('./LogReader'); class MongoLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, zkConfig, mongoConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; logger.info('initializing mongo log reader', { method: 'MongoLogReader.constructor', mongoConfig }); @@ -14,6 +14,7 @@ class MongoLogReader extends LogReader { logId: `mongo_${mongoConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._mongoConfig = mongoConfig; + this._extensionNames = extensionNames; } /** @@ -44,6 +45,7 @@ class MongoLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'mongo-log', logId: this._mongoConfig.logName, }; diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 45a38bfdc..85b9b596a 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -71,10 +71,6 @@ const byteMetrics = ZenkoMetrics.createCounter({ labelNames: metricLabels, }); -const defaultLabels = { - origin: 'replication', -}; - const notificationEvent = ZenkoMetrics.createCounter({ name: 's3_notification_queue_populator_events_total', help: 'Total number of oplog events processed by notification extension', @@ -90,12 +86,12 @@ const notificationEvent = ZenkoMetrics.createCounter({ * @property {GaugeSet} logSize - Set the log size metric */ const metricsHandler = { - messages: wrapCounterInc(messageMetrics, defaultLabels), - objects: wrapCounterInc(objectMetrics, defaultLabels), - bytes: wrapCounterInc(byteMetrics, defaultLabels), - logReadOffset: wrapGaugeSet(logReadOffsetMetric, defaultLabels), - logSize: wrapGaugeSet(logSizeMetric, defaultLabels), - logTimestamp: wrapGaugeSet(logTimestamp, defaultLabels), + messages: wrapCounterInc(messageMetrics, {}), + objects: wrapCounterInc(objectMetrics, {}), + bytes: wrapCounterInc(byteMetrics, {}), + logReadOffset: wrapGaugeSet(logReadOffsetMetric, {}), + logSize: wrapGaugeSet(logSizeMetric, {}), + logTimestamp: wrapGaugeSet(logTimestamp, {}), notifEvent: wrapCounterInc(notificationEvent, {}), }; @@ -301,6 +297,7 @@ class QueuePopulator { } _setupLogSources() { + const extensionNames = this._loadedExtensions.join(','); switch (this.qpConfig.logSource) { case 'bucketd': // initialization of log source is deferred until the @@ -317,6 +314,7 @@ class QueuePopulator { mongoConfig: this.qpConfig.mongo, logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -334,6 +332,7 @@ class QueuePopulator { ), logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -347,6 +346,7 @@ class QueuePopulator { dmdConfig: this.qpConfig.dmd, logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -385,6 +385,7 @@ class QueuePopulator { raftId: token, logger: this.log, extensions: this._extensions, + extensionNames: this._loadedExtensions.join(','), metricsProducer: this._mProducer, metricsHandler, })); diff --git a/lib/queuePopulator/RaftLogReader.js b/lib/queuePopulator/RaftLogReader.js index 24061e037..a0622c9b9 100644 --- a/lib/queuePopulator/RaftLogReader.js +++ b/lib/queuePopulator/RaftLogReader.js @@ -7,7 +7,7 @@ const LogReader = require('./LogReader'); class RaftLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, bucketdConfig, httpsConfig, - raftId, logger, extensions, metricsProducer, metricsHandler } = params; + raftId, logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; const { host, port } = bucketdConfig; logger.info('initializing raft log reader', { method: 'RaftLogReader.constructor', @@ -27,6 +27,7 @@ class RaftLogReader extends LogReader { super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`, logger, extensions, metricsProducer, metricsHandler }); this.raftId = raftId; + this._extensionNames = extensionNames; } getLogInfo() { @@ -35,6 +36,7 @@ class RaftLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'raft-log', logId: this.raftId, }; diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 87b45a905..d541b0b73 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -9,6 +9,10 @@ const { errors } = require('arsenal'); const { Logger } = require('werelogs'); const LogReader = require('../../../../lib/queuePopulator/LogReader'); +const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader'); +const BucketFileLogReader = require('../../../../lib/queuePopulator/BucketFileLogReader'); +const RaftLogReader = require('../../../../lib/queuePopulator/RaftLogReader'); +const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader'); class MockLogConsumer { @@ -505,4 +509,67 @@ describe('LogReader', () => { }); }); }); + + describe('getMetricLabels', () => { + [{ + name: 'KafkaLogReader', + Reader: KafkaLogReader, + config: { + kafkaConfig: { + hosts: 'localhost:9092', + }, + qpKafkaConfig: { + logName: 'test-log', + }, + }, + logName: 'kafka-log', + }, { + name: 'BucketFileLogReader', + Reader: BucketFileLogReader, + config: { + dmdConfig: { + logName: 'test-log', + host: 'localhost', + port: 8000, + }, + }, + logName: 'bucket-file', + }, { + name: 'RaftLogReader', + Reader: RaftLogReader, + config: { + raftId: 'test-log', + bucketdConfig: { + host: 'localhost', + port: 8000, + }, + }, + logName: 'raft-log', + }, { + name: 'RaftLogReader', + Reader: MongoLogReader, + config: { + mongoConfig: { + logName: 'test-log', + host: 'localhost', + port: 8000, + }, + }, + logName: 'mongo-log', + }].forEach(params => { + it(`should return proper ${params.name} metrics labels`, () => { + const reader = new params.Reader({ + ...params.config, + logger: new Logger('test:LogReader'), + extensionNames: 'replication,lifecycle,notification', + }); + const expectedLabels = { + logId: 'test-log', + logName: params.logName, + origin: 'replication,lifecycle,notification', + }; + assert.deepStrictEqual(reader.getMetricLabels(), expectedLabels); + }); + }); + }); });