diff --git a/client.js b/client.js new file mode 100644 index 0000000..98cc06b --- /dev/null +++ b/client.js @@ -0,0 +1,53 @@ +const { once } = require('events') +const ReadyResource = require('ready-resource') +const safetyCatch = require('safety-catch') +const RPC = require('protomux-rpc') +const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings') + +class ScraperClient extends ReadyResource { + constructor (dht, promClientPublicKey) { + super() + + this.dht = dht + this.key = promClientPublicKey + this.rpc = null + this.socket = null + } + + async _open () { + // TODO: auto reconnect + // TODO: retry on failure + // TODO: define a keepAlive + // TODO: handle error paths (peer not available etc) + this.socket = this.dht.connect(this.key) + this.socket.on('error', safetyCatch) + + await this.socket.opened + + if (!this.socket.connected) { + throw new Error('Could not open socket') + } + + this.rpc = new RPC(this.socket, { protocol: 'prometheus-metrics' }) + await once(this.rpc, 'open') + } + + async _close () { + this.rpc?.destroy() + this.socket?.destroy() + } + + async lookup () { + if (!this.opened) await this.ready() + + const res = await this.rpc.request( + 'metrics', + null, + { responseEncoding: MetricsReplyEnc } + ) + + return res + } +} + +module.exports = ScraperClient diff --git a/index.js b/index.js index e69de29..3c63d7b 100644 --- a/index.js +++ b/index.js @@ -0,0 +1,70 @@ +const ScraperClient = require('./client') +const ReadyResource = require('ready-resource') +const idEnc = require('hypercore-id-encoding') +const b4a = require('b4a') +const safetyCatch = require('safety-catch') + +class PrometheusDhtBridge extends ReadyResource { + constructor (dht, server) { + super() + + this.dht = dht + + this.server = server + this.server.get( + '/scrape/:alias/metrics', + { logLevel: 'info' }, + this._handleGet.bind(this) + ) + + this.aliases = new Map() // alias->scrapeClient + } + + get publicKey () { + return this.dht.defaultKeyPair.publicKey + } + + async _close () { + await Promise.all([ + [...this.aliases.values()].map(a => a.close()) + ]) + } + + putAlias (alias, targetPubKey) { + targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey)) + const current = this.aliases.get(alias) + + if (current) { + if (b4a.equals(current.key, targetPubKey)) { + return // Idempotent + } + + current.close().catch(safetyCatch) + } + + const scrapeClient = new ScraperClient(this.dht, targetPubKey) + this.aliases.set(alias, scrapeClient) + } + + async _handleGet (req, reply) { + const alias = req.params.alias + + const scrapeClient = this.aliases.get(alias) + + if (!scrapeClient) { + // TODO: 404 code + throw new Error('Unkown alias') + } + + if (!scrapeClient.opened) await scrapeClient.ready() + + const res = await scrapeClient.lookup() + if (res.success) { + reply.send(res.metrics) + } else { + // TODO: + } + } +} + +module.exports = PrometheusDhtBridge diff --git a/package.json b/package.json index 2e21181..5de00a2 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "Bridge to scrape Prometheus metrics exposed over hyperdht", "main": "index.js", "scripts": { - "test": "standard" + "test": "standard && brittle test.js" }, "repository": { "type": "git", @@ -24,6 +24,18 @@ }, "homepage": "https://github.com/HDegroote/dht-prometheus#readme", "devDependencies": { + "axios": "^1.7.2", + "brittle": "^3.5.2", + "hyperdht": "^6.15.1", + "prom-client": "^15.1.2", "standard": "^17.1.0" + }, + "dependencies": { + "dht-prom-client": "^0.0.1-alpha.1", + "fastify": "^4.28.0", + "hypercore-id-encoding": "^1.3.0", + "protomux-rpc": "^1.5.2", + "ready-resource": "^1.1.1", + "safety-catch": "^1.0.2" } } diff --git a/test.js b/test.js new file mode 100644 index 0000000..daffbf7 --- /dev/null +++ b/test.js @@ -0,0 +1,72 @@ +const test = require('brittle') +const PrometheusDhtBridge = require('./index') +const promClient = require('prom-client') +const DhtPromClient = require('dht-prom-client') +const createTestnet = require('hyperdht/testnet') +const HyperDHT = require('hyperdht') +const fastify = require('fastify') +const axios = require('axios') + +test('put alias + lookup happy flow', async t => { + const { bridge, dhtPromClient } = await setup(t) + + await dhtPromClient.ready() + await bridge.ready() + + const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 }) + + bridge.putAlias('dummy', dhtPromClient.publicKey) + + const res = await axios.get(`${baseUrl}/scrape/dummy/metrics`) + t.is(res.status, 200, 'correct status') + t.is( + res.data.includes('process_cpu_user_seconds_total'), + true, + 'Successfully scraped metrics' + ) +}) + +test('No new alias if adding same key', async t => { + const { bridge } = await setup(t) + const key = 'a'.repeat(64) + const key2 = 'b'.repeat(64) + + await bridge.ready() + bridge.putAlias('dummy', key) + const clientA = bridge.aliases.get('dummy') + + t.is(clientA != null, true, 'sanity check') + bridge.putAlias('dummy', key) + t.is(clientA, bridge.aliases.get('dummy'), 'no new client') + + t.is(clientA.closing == null, true, 'sanity check') + bridge.putAlias('dummy', key2) + t.not(clientA, bridge.aliases.get('dummy'), 'sanity check') + t.is(clientA.closing != null, true, 'lifecycle ok') +}) + +async function setup (t) { + promClient.collectDefaultMetrics() // So we have something to scrape + t.teardown(() => promClient.register.clear()) + + const testnet = await createTestnet() + const bootstrap = testnet.bootstrap + + const dht = new HyperDHT({ bootstrap }) + const server = fastify({ logger: false }) + const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 }) + const scraperPubKey = bridge.publicKey + + const dhtClient = new HyperDHT({ bootstrap }) + const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey) + + t.teardown(async () => { + await server.close() + await bridge.close() + await dhtPromClient.close() + await dht.destroy() + await testnet.destroy() + }) + + return { dhtPromClient, bridge, bootstrap } +}