From 839f2d153dd5772e43610966be7801d93c96cf2f Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 25 Jun 2024 00:09:43 +0200 Subject: [PATCH 1/5] Basic happy path --- client.js | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++ index.js | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 11 +++++++++++ test.js | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 client.js create mode 100644 test.js diff --git a/client.js b/client.js new file mode 100644 index 0000000..b589c74 --- /dev/null +++ b/client.js @@ -0,0 +1,53 @@ +const { once } = require('events') +const ReadyResource = require('ready-resource') +const safetyCatch = require('safety-catch') +const RPC = require('protomux-rpc') +const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings') + +class ScraperClient extends ReadyResource { + constructor (dht, promClientPublicKey) { + super() + + this.dht = dht + this.key = promClientPublicKey + this.rpc = null + this.socket = null + } + + async _open () { + // TODO: auto reconnect + // TODO: retry on failure + // TODO: define a keepAlive + // TODO: handle error paths (peer not available etc) + this.socket = this.dht.connect(this.key) + this.socket.on('error', safetyCatch) + + await this.socket.opened + + if (!this.socket.connected) { + throw new Error('Could not open socket') + } + + this.rpc = new RPC(this.socket, { protocol: 'prometheus-metrics' }) + await once(this.rpc, 'open') + } + + async _close () { + this.rpc.destroy() + this.socket.destroy() + } + + async lookup () { + if (!this.opened) await this.ready() + + const res = await this.rpc.request( + 'metrics', + null, + { responseEncoding: MetricsReplyEnc } + ) + + return res + } +} + +module.exports = ScraperClient diff --git a/index.js b/index.js index e69de29..b87e07c 100644 --- a/index.js +++ b/index.js @@ -0,0 +1,51 @@ +const ScraperClient = require('./client') +const ReadyResource = require('ready-resource') + +class PrometheusDhtBridge extends ReadyResource { + constructor (dht, server) { + super() + + this.dht = dht + + this.server = server + this.server.get( + '/scrape/:alias/metrics', + { logLevel: 'info' }, + this._handleGet.bind(this) + ) + + this.aliases = new Map() // alias->scrapeClient + } + + get publicKey () { + return this.dht.defaultKeyPair.publicKey + } + + putAlias (alias, targetPubKey) { + // TODO: only reset if new or not the same key + const scrapeClient = new ScraperClient(this.dht, targetPubKey) + this.aliases.set(alias, scrapeClient) + } + + async _handleGet (req, reply) { + const alias = req.params.alias + + const scrapeClient = this.aliases.get(alias) + + if (!scrapeClient) { + // TODO: 404 code + throw new Error('Unkown alias') + } + + if (!scrapeClient.opened) await scrapeClient.ready() + + const res = await scrapeClient.lookup() + if (res.success) { + reply.send(res.metrics) + } else { + // TODO: + } + } +} + +module.exports = PrometheusDhtBridge diff --git a/package.json b/package.json index 2e21181..c123a20 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,17 @@ }, "homepage": "https://github.com/HDegroote/dht-prometheus#readme", "devDependencies": { + "axios": "^1.7.2", + "brittle": "^3.5.2", + "hyperdht": "^6.15.1", + "prom-client": "^15.1.2", "standard": "^17.1.0" + }, + "dependencies": { + "dht-prom-client": "^0.0.1-alpha.1", + "fastify": "^4.28.0", + "protomux-rpc": "^1.5.2", + "ready-resource": "^1.1.1", + "safety-catch": "^1.0.2" } } diff --git a/test.js b/test.js new file mode 100644 index 0000000..89a7d8b --- /dev/null +++ b/test.js @@ -0,0 +1,52 @@ +const test = require('brittle') +const PrometheusDhtBridge = require('./index') +const promClient = require('prom-client') +const DhtPromClient = require('dht-prom-client') +const createTestnet = require('hyperdht/testnet') +const HyperDHT = require('hyperdht') +const fastify = require('fastify') +const axios = require('axios') + +test('put alias + lookup happy flow', async t => { + const { bridge, dhtPromClient } = await setup(t) + + await dhtPromClient.ready() + await bridge.ready() + + const baseUrl = await bridge.server.listen() + + bridge.putAlias('dummy', dhtPromClient.publicKey) + + const res = await axios.get(`${baseUrl}/scrape/dummy/metrics`) + t.is(res.status, 200, 'correct status') + t.is( + res.data.includes('process_cpu_user_seconds_total'), + true, + 'Successfully scraped metrics' + ) +}) + +async function setup (t) { + promClient.collectDefaultMetrics() // So we have something to scrape + t.teardown(() => promClient.register.clear()) + + const testnet = await createTestnet() + const bootstrap = testnet.bootstrap + + const dht = new HyperDHT({ bootstrap }) + const server = fastify() // TODO: no logs + const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 }) + const scraperPubKey = bridge.publicKey + + const dhtClient = new HyperDHT({ bootstrap }) + const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey) + + t.teardown(async () => { + await server.close() + await dhtPromClient.close() + await dht.destroy() + await testnet.destroy() + }) + + return { dhtPromClient, bridge, bootstrap } +} From 641a0e1f6ae1d93791482c3386e9d303f750b46e Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 25 Jun 2024 00:17:57 +0200 Subject: [PATCH 2/5] Run tests --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index c123a20..45ea965 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "Bridge to scrape Prometheus metrics exposed over hyperdht", "main": "index.js", "scripts": { - "test": "standard" + "test": "standard && brittle test.js" }, "repository": { "type": "git", From fc534c42d784e8375cd5b40ff1353ebd17df84be Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 25 Jun 2024 00:25:09 +0200 Subject: [PATCH 3/5] Explicit address --- test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.js b/test.js index 89a7d8b..b6f5593 100644 --- a/test.js +++ b/test.js @@ -13,7 +13,7 @@ test('put alias + lookup happy flow', async t => { await dhtPromClient.ready() await bridge.ready() - const baseUrl = await bridge.server.listen() + const baseUrl = await bridge.server.listen({ address: '127.0.0.1', port: 0 }) bridge.putAlias('dummy', dhtPromClient.publicKey) From 9b042bc91daa7f7fa26de1193db4e41fd14b7b60 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 25 Jun 2024 00:26:42 +0200 Subject: [PATCH 4/5] Correct fastify opt --- test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.js b/test.js index b6f5593..e0ee677 100644 --- a/test.js +++ b/test.js @@ -13,7 +13,7 @@ test('put alias + lookup happy flow', async t => { await dhtPromClient.ready() await bridge.ready() - const baseUrl = await bridge.server.listen({ address: '127.0.0.1', port: 0 }) + const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 }) bridge.putAlias('dummy', dhtPromClient.publicKey) From 14d405eff3895327de202472bd1abf22dd3101aa Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 25 Jun 2024 22:54:21 +0200 Subject: [PATCH 5/5] Add Lifecycle --- client.js | 4 ++-- index.js | 21 ++++++++++++++++++++- package.json | 1 + test.js | 22 +++++++++++++++++++++- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/client.js b/client.js index b589c74..98cc06b 100644 --- a/client.js +++ b/client.js @@ -33,8 +33,8 @@ class ScraperClient extends ReadyResource { } async _close () { - this.rpc.destroy() - this.socket.destroy() + this.rpc?.destroy() + this.socket?.destroy() } async lookup () { diff --git a/index.js b/index.js index b87e07c..3c63d7b 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,8 @@ const ScraperClient = require('./client') const ReadyResource = require('ready-resource') +const idEnc = require('hypercore-id-encoding') +const b4a = require('b4a') +const safetyCatch = require('safety-catch') class PrometheusDhtBridge extends ReadyResource { constructor (dht, server) { @@ -21,8 +24,24 @@ class PrometheusDhtBridge extends ReadyResource { return this.dht.defaultKeyPair.publicKey } + async _close () { + await Promise.all([ + [...this.aliases.values()].map(a => a.close()) + ]) + } + putAlias (alias, targetPubKey) { - // TODO: only reset if new or not the same key + targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey)) + const current = this.aliases.get(alias) + + if (current) { + if (b4a.equals(current.key, targetPubKey)) { + return // Idempotent + } + + current.close().catch(safetyCatch) + } + const scrapeClient = new ScraperClient(this.dht, targetPubKey) this.aliases.set(alias, scrapeClient) } diff --git a/package.json b/package.json index 45ea965..5de00a2 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "dependencies": { "dht-prom-client": "^0.0.1-alpha.1", "fastify": "^4.28.0", + "hypercore-id-encoding": "^1.3.0", "protomux-rpc": "^1.5.2", "ready-resource": "^1.1.1", "safety-catch": "^1.0.2" diff --git a/test.js b/test.js index e0ee677..daffbf7 100644 --- a/test.js +++ b/test.js @@ -26,6 +26,25 @@ test('put alias + lookup happy flow', async t => { ) }) +test('No new alias if adding same key', async t => { + const { bridge } = await setup(t) + const key = 'a'.repeat(64) + const key2 = 'b'.repeat(64) + + await bridge.ready() + bridge.putAlias('dummy', key) + const clientA = bridge.aliases.get('dummy') + + t.is(clientA != null, true, 'sanity check') + bridge.putAlias('dummy', key) + t.is(clientA, bridge.aliases.get('dummy'), 'no new client') + + t.is(clientA.closing == null, true, 'sanity check') + bridge.putAlias('dummy', key2) + t.not(clientA, bridge.aliases.get('dummy'), 'sanity check') + t.is(clientA.closing != null, true, 'lifecycle ok') +}) + async function setup (t) { promClient.collectDefaultMetrics() // So we have something to scrape t.teardown(() => promClient.register.clear()) @@ -34,7 +53,7 @@ async function setup (t) { const bootstrap = testnet.bootstrap const dht = new HyperDHT({ bootstrap }) - const server = fastify() // TODO: no logs + const server = fastify({ logger: false }) const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 }) const scraperPubKey = bridge.publicKey @@ -43,6 +62,7 @@ async function setup (t) { t.teardown(async () => { await server.close() + await bridge.close() await dhtPromClient.close() await dht.destroy() await testnet.destroy()