Skip to content

Commit

Permalink
Expose a register-alias service (#3)
Browse files Browse the repository at this point in the history
* Partial implementation

* More experiments

* Use dht-prom-client scraper

* (WIP) put rpc-client in dht-prom-client

* Rm unused comments

* Use published dht-prom-client

* rm log
  • Loading branch information
HDegroote authored Jun 29, 2024
1 parent b97de40 commit 263187b
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 90 deletions.
81 changes: 0 additions & 81 deletions client.js

This file was deleted.

37 changes: 32 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
const ScraperClient = require('./client')
const ReadyResource = require('ready-resource')
const idEnc = require('hypercore-id-encoding')
const b4a = require('b4a')
const safetyCatch = require('safety-catch')
const Hyperswarm = require('hyperswarm')
const HyperDht = require('hyperdht')
const AliasRpcServer = require('./lib/alias-rpc')

const ScraperClient = require('dht-prom-client/scraper')

class PrometheusDhtBridge extends ReadyResource {
constructor (dht, server) {
constructor (dht, server, sharedSecret, { _forceFlushOnClientReady = false } = {}) {
super()

this.swarm = new Hyperswarm({ dht })
const keyPair = HyperDht.keyPair()
this.swarm = new Hyperswarm({
dht,
keyPair
})

this.secret = sharedSecret // Shared with clients

this.server = server
this.server.get(
Expand All @@ -18,7 +27,12 @@ class PrometheusDhtBridge extends ReadyResource {
this._handleGet.bind(this)
)

this.aliasRpcServer = new AliasRpcServer(this.swarm, this.secret, this.putAlias.bind(this))

this.aliases = new Map() // alias->scrapeClient

// for tests, to ensure we're connected to the scraper on first scrape
this._forceFlushOnCLientReady = _forceFlushOnClientReady
}

get dht () {
Expand All @@ -29,7 +43,13 @@ class PrometheusDhtBridge extends ReadyResource {
return this.swarm.keyPair.publicKey
}

async _open () {
await this.aliasRpcServer.ready()
}

async _close () {
await this.aliasRpcServer.close()

await Promise.all([
[...this.aliases.values()].map(a => a.close())
])
Expand All @@ -43,14 +63,18 @@ class PrometheusDhtBridge extends ReadyResource {

if (current) {
if (b4a.equals(current.targetKey, targetPubKey)) {
return // Idempotent
const updated = false // Idempotent
return updated
}

current.close().catch(safetyCatch)
}

const scrapeClient = new ScraperClient(this.swarm, targetPubKey)
this.aliases.set(alias, scrapeClient)

const updated = true
return updated
}

async _handleGet (req, reply) {
Expand All @@ -64,7 +88,10 @@ class PrometheusDhtBridge extends ReadyResource {
return
}

if (!scrapeClient.opened) await scrapeClient.ready()
if (!scrapeClient.opened) {
await scrapeClient.ready()
if (this._forceFlushOnCLientReady) await scrapeClient.swarm.flush()
}

let res
try {
Expand Down
75 changes: 75 additions & 0 deletions lib/alias-rpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const ReadyResource = require('ready-resource')
const RPC = require('protomux-rpc')
const crypto = require('crypto')
const b4a = require('b4a')

// TODO: cleaner dep management (don't want to store the encodings
// twice, but there's a ciruclar dep of sorts)
const { AliasReqEnc, AliasRespEnc } = require('dht-prom-client/lib/encodings')

const PROTOCOL_NAME = 'register-alias'

class AliasRpcServer extends ReadyResource {
constructor (swarm, secret, putAliasCb) {
super()

this.swarm = swarm
this._putAlias = putAliasCb
this.secret = secret

this.swarm.on('connection', this._onconnection.bind(this))
}

get publicKey () {
return this.swarm.keyPair.pubicKey
}

async _open () {
await this.swarm.listen()
}

async _close () {
await this.swarm.destroy()
}

_onconnection (socket) {
const uid = crypto.randomUUID()
const remotePublicKey = socket.remotePublicKey

socket.on('error', (error) => {
this.emit('socket-error', { error, uid, remotePublicKey })
})

const rpc = new RPC(socket, { protocol: PROTOCOL_NAME })
rpc.respond(
'alias',
{ responseEncoding: AliasRespEnc, requestEncoding: AliasReqEnc },
async (req) => {
if (!b4a.equals(req.secret, this.secret)) {
return { success: false, errorMessage: 'unauthorised' }
}

const targetPublicKey = req.targetPublicKey
const alias = req.alias

this.emit('alias-request', { uid, remotePublicKey, targetPublicKey, alias })
try {
const updated = await this._putAlias(alias, targetPublicKey)
this.emit('register-success', { uid, alias, targetPublicKey, updated })
return {
success: true,
updated
}
} catch (error) {
this.emit('register-error', { error, uid })
return {
success: false,
errorMessage: `Failed to register alias (uid ${uid})`
}
}
}
)
}
}

module.exports = AliasRpcServer
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"dependencies": {
"b4a": "^1.6.6",
"debounceify": "^1.1.0",
"dht-prom-client": "^0.0.1-alpha.1",
"dht-prom-client": "^0.0.1-alpha.3",
"fastify": "^4.28.0",
"hypercore-id-encoding": "^1.3.0",
"hyperswarm": "^4.7.15",
Expand Down
45 changes: 42 additions & 3 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const createTestnet = require('hyperdht/testnet')
const HyperDHT = require('hyperdht')
const fastify = require('fastify')
const axios = require('axios')
const hypCrypto = require('hypercore-crypto')

test('put alias + lookup happy flow', async t => {
const { bridge, dhtPromClient } = await setup(t)
Expand Down Expand Up @@ -126,20 +127,57 @@ test('No new alias if adding same key', async t => {
t.is(clientA.closing != null, true, 'lifecycle ok')
})

test('A client which registers itself can get scraped', async t => {
t.plan(4)

const { bridge, dhtPromClient } = await setup(t)

await bridge.ready()

bridge.aliasRpcServer.on('alias-request', ({ uid, remotePublicKey, alias, targetPublicKey }) => {
t.is(alias, 'dummy', 'correct alias')
t.alike(targetPublicKey, dhtPromClient.publicKey, 'correct target key got registered')
})
bridge.aliasRpcServer.on('register-error', ({ error, uid }) => {
console.error(error)
t.fail('unexpected error')
})

const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })

await bridge.swarm.flush() // To avoid race conditions
await dhtPromClient.ready()

const res = await axios.get(
`${baseUrl}/scrape/dummy/metrics`,
{ validateStatus: null }
)
t.is(res.status, 200, 'correct status')
t.is(
res.data.includes('process_cpu_user_seconds_total'),
true,
'Successfully scraped metrics'
)
})

async function setup (t) {
promClient.collectDefaultMetrics() // So we have something to scrape
t.teardown(() => promClient.register.clear())

const testnet = await createTestnet()
const bootstrap = testnet.bootstrap

const sharedSecret = hypCrypto.randomBytes(32)

const dht = new HyperDHT({ bootstrap })
const server = fastify({ logger: false })
const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 })
const bridge = new PrometheusDhtBridge(dht, server, sharedSecret,
{ _forceFlushOnClientReady: true } // to avoid race conditions
)
const scraperPubKey = bridge.publicKey

const dhtClient = new HyperDHT({ bootstrap })
const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey)
const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey, 'dummy', sharedSecret, { bootstrap })

t.teardown(async () => {
await server.close()
Expand All @@ -149,5 +187,6 @@ async function setup (t) {
await testnet.destroy()
})

return { dhtPromClient, bridge, bootstrap }
const ownPublicKey = dhtPromClient.dht.defaultKeyPair.publicKey
return { dhtPromClient, bridge, bootstrap, ownPublicKey }
}

0 comments on commit 263187b

Please sign in to comment.