diff --git a/client.js b/client.js deleted file mode 100644 index d6675d4..0000000 --- a/client.js +++ /dev/null @@ -1,81 +0,0 @@ -const crypto = require('crypto') -const ReadyResource = require('ready-resource') -const safetyCatch = require('safety-catch') -const RPC = require('protomux-rpc') -const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings') -const b4a = require('b4a') - -const PROTOCOL_NAME = 'prometheus-metrics' - -class ScraperClient extends ReadyResource { - constructor (swarm, promClientPubKey) { - super() - - this.swarm = swarm - this.targetKey = promClientPubKey - - this.rpc = null - this.socket = null - this._currentConnUid = null - - this._boundConnectionHandler = this._connectionHandler.bind(this) - this.swarm.on('connection', this._boundConnectionHandler) - - // Handles reconnects/suspends - this.swarm.joinPeer(this.targetKey) - } - - _open () { } - - _close () { - this.swarm.off('connection', this._boundConnectionHandler) - this.swarm.leavePeer(this.targetKey) - - if (this.rpc) this.rpc.destroy() - if (this.socket) this.socket.destroy() - } - - _connectionHandler (socket, peerInfo) { - if (!b4a.equals(peerInfo.publicKey, this.targetKey)) { - // Not our connection - return - } - - const connUid = crypto.randomUUID() // TODO: check if actually needed - this._currentConnUid = connUid - - const rpc = new RPC(socket, { protocol: PROTOCOL_NAME }) - - socket.on('error', safetyCatch) - socket.on('close', () => { - if (connUid === this._currentConnUid) { - // No other connection arrived in the the mean time - this.socket = null - this.rpc = null - this._currentConnUid = null - } - - rpc.destroy() - }) - - this.socket = socket - this.rpc = rpc - } - - async lookup () { - if (!this.rpc) throw new Error('Not connected') - - if (this.rpc && !this.rpc.opened) await this.rpc.fullyOpened() - - // Note: can throw (for example if rpc closed in the mean time) - const res = await this.rpc.request( - 'metrics', - null, - { responseEncoding: MetricsReplyEnc } - ) - - return res - } -} - -module.exports = ScraperClient diff --git a/index.js b/index.js index c2411ab..f37cdcd 100644 --- a/index.js +++ b/index.js @@ -1,15 +1,24 @@ -const ScraperClient = require('./client') const ReadyResource = require('ready-resource') const idEnc = require('hypercore-id-encoding') const b4a = require('b4a') const safetyCatch = require('safety-catch') const Hyperswarm = require('hyperswarm') +const HyperDht = require('hyperdht') +const AliasRpcServer = require('./lib/alias-rpc') + +const ScraperClient = require('dht-prom-client/scraper') class PrometheusDhtBridge extends ReadyResource { - constructor (dht, server) { + constructor (dht, server, sharedSecret, { _forceFlushOnClientReady = false } = {}) { super() - this.swarm = new Hyperswarm({ dht }) + const keyPair = HyperDht.keyPair() + this.swarm = new Hyperswarm({ + dht, + keyPair + }) + + this.secret = sharedSecret // Shared with clients this.server = server this.server.get( @@ -18,7 +27,12 @@ class PrometheusDhtBridge extends ReadyResource { this._handleGet.bind(this) ) + this.aliasRpcServer = new AliasRpcServer(this.swarm, this.secret, this.putAlias.bind(this)) + this.aliases = new Map() // alias->scrapeClient + + // for tests, to ensure we're connected to the scraper on first scrape + this._forceFlushOnCLientReady = _forceFlushOnClientReady } get dht () { @@ -29,7 +43,13 @@ class PrometheusDhtBridge extends ReadyResource { return this.swarm.keyPair.publicKey } + async _open () { + await this.aliasRpcServer.ready() + } + async _close () { + await this.aliasRpcServer.close() + await Promise.all([ [...this.aliases.values()].map(a => a.close()) ]) @@ -43,7 +63,8 @@ class PrometheusDhtBridge extends ReadyResource { if (current) { if (b4a.equals(current.targetKey, targetPubKey)) { - return // Idempotent + const updated = false // Idempotent + return updated } current.close().catch(safetyCatch) @@ -51,6 +72,9 @@ class PrometheusDhtBridge extends ReadyResource { const scrapeClient = new ScraperClient(this.swarm, targetPubKey) this.aliases.set(alias, scrapeClient) + + const updated = true + return updated } async _handleGet (req, reply) { @@ -64,7 +88,10 @@ class PrometheusDhtBridge extends ReadyResource { return } - if (!scrapeClient.opened) await scrapeClient.ready() + if (!scrapeClient.opened) { + await scrapeClient.ready() + if (this._forceFlushOnCLientReady) await scrapeClient.swarm.flush() + } let res try { diff --git a/lib/alias-rpc.js b/lib/alias-rpc.js new file mode 100644 index 0000000..1bd99fd --- /dev/null +++ b/lib/alias-rpc.js @@ -0,0 +1,75 @@ +const ReadyResource = require('ready-resource') +const RPC = require('protomux-rpc') +const crypto = require('crypto') +const b4a = require('b4a') + +// TODO: cleaner dep management (don't want to store the encodings +// twice, but there's a ciruclar dep of sorts) +const { AliasReqEnc, AliasRespEnc } = require('dht-prom-client/lib/encodings') + +const PROTOCOL_NAME = 'register-alias' + +class AliasRpcServer extends ReadyResource { + constructor (swarm, secret, putAliasCb) { + super() + + this.swarm = swarm + this._putAlias = putAliasCb + this.secret = secret + + this.swarm.on('connection', this._onconnection.bind(this)) + } + + get publicKey () { + return this.swarm.keyPair.pubicKey + } + + async _open () { + await this.swarm.listen() + } + + async _close () { + await this.swarm.destroy() + } + + _onconnection (socket) { + const uid = crypto.randomUUID() + const remotePublicKey = socket.remotePublicKey + + socket.on('error', (error) => { + this.emit('socket-error', { error, uid, remotePublicKey }) + }) + + const rpc = new RPC(socket, { protocol: PROTOCOL_NAME }) + rpc.respond( + 'alias', + { responseEncoding: AliasRespEnc, requestEncoding: AliasReqEnc }, + async (req) => { + if (!b4a.equals(req.secret, this.secret)) { + return { success: false, errorMessage: 'unauthorised' } + } + + const targetPublicKey = req.targetPublicKey + const alias = req.alias + + this.emit('alias-request', { uid, remotePublicKey, targetPublicKey, alias }) + try { + const updated = await this._putAlias(alias, targetPublicKey) + this.emit('register-success', { uid, alias, targetPublicKey, updated }) + return { + success: true, + updated + } + } catch (error) { + this.emit('register-error', { error, uid }) + return { + success: false, + errorMessage: `Failed to register alias (uid ${uid})` + } + } + } + ) + } +} + +module.exports = AliasRpcServer diff --git a/package.json b/package.json index 37a92be..512e993 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "dependencies": { "b4a": "^1.6.6", "debounceify": "^1.1.0", - "dht-prom-client": "^0.0.1-alpha.1", + "dht-prom-client": "^0.0.1-alpha.3", "fastify": "^4.28.0", "hypercore-id-encoding": "^1.3.0", "hyperswarm": "^4.7.15", diff --git a/test.js b/test.js index 3dfcd07..64c5944 100644 --- a/test.js +++ b/test.js @@ -6,6 +6,7 @@ const createTestnet = require('hyperdht/testnet') const HyperDHT = require('hyperdht') const fastify = require('fastify') const axios = require('axios') +const hypCrypto = require('hypercore-crypto') test('put alias + lookup happy flow', async t => { const { bridge, dhtPromClient } = await setup(t) @@ -126,6 +127,39 @@ test('No new alias if adding same key', async t => { t.is(clientA.closing != null, true, 'lifecycle ok') }) +test('A client which registers itself can get scraped', async t => { + t.plan(4) + + const { bridge, dhtPromClient } = await setup(t) + + await bridge.ready() + + bridge.aliasRpcServer.on('alias-request', ({ uid, remotePublicKey, alias, targetPublicKey }) => { + t.is(alias, 'dummy', 'correct alias') + t.alike(targetPublicKey, dhtPromClient.publicKey, 'correct target key got registered') + }) + bridge.aliasRpcServer.on('register-error', ({ error, uid }) => { + console.error(error) + t.fail('unexpected error') + }) + + const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 }) + + await bridge.swarm.flush() // To avoid race conditions + await dhtPromClient.ready() + + const res = await axios.get( + `${baseUrl}/scrape/dummy/metrics`, + { validateStatus: null } + ) + t.is(res.status, 200, 'correct status') + t.is( + res.data.includes('process_cpu_user_seconds_total'), + true, + 'Successfully scraped metrics' + ) +}) + async function setup (t) { promClient.collectDefaultMetrics() // So we have something to scrape t.teardown(() => promClient.register.clear()) @@ -133,13 +167,17 @@ async function setup (t) { const testnet = await createTestnet() const bootstrap = testnet.bootstrap + const sharedSecret = hypCrypto.randomBytes(32) + const dht = new HyperDHT({ bootstrap }) const server = fastify({ logger: false }) - const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 }) + const bridge = new PrometheusDhtBridge(dht, server, sharedSecret, + { _forceFlushOnClientReady: true } // to avoid race conditions + ) const scraperPubKey = bridge.publicKey const dhtClient = new HyperDHT({ bootstrap }) - const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey) + const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey, 'dummy', sharedSecret, { bootstrap }) t.teardown(async () => { await server.close() @@ -149,5 +187,6 @@ async function setup (t) { await testnet.destroy() }) - return { dhtPromClient, bridge, bootstrap } + const ownPublicKey = dhtPromClient.dht.defaultKeyPair.publicKey + return { dhtPromClient, bridge, bootstrap, ownPublicKey } }