Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic happy path #1

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const { once } = require('events')
const ReadyResource = require('ready-resource')
const safetyCatch = require('safety-catch')
const RPC = require('protomux-rpc')
const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings')

class ScraperClient extends ReadyResource {
constructor (dht, promClientPublicKey) {
super()

this.dht = dht
this.key = promClientPublicKey
this.rpc = null
this.socket = null
}

async _open () {
// TODO: auto reconnect
// TODO: retry on failure
// TODO: define a keepAlive
// TODO: handle error paths (peer not available etc)
this.socket = this.dht.connect(this.key)
this.socket.on('error', safetyCatch)

await this.socket.opened

if (!this.socket.connected) {
throw new Error('Could not open socket')
}

this.rpc = new RPC(this.socket, { protocol: 'prometheus-metrics' })
await once(this.rpc, 'open')
}

async _close () {
this.rpc?.destroy()
this.socket?.destroy()
}

async lookup () {
if (!this.opened) await this.ready()

const res = await this.rpc.request(
'metrics',
null,
{ responseEncoding: MetricsReplyEnc }
)

return res
}
}

module.exports = ScraperClient
70 changes: 70 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
const ScraperClient = require('./client')
const ReadyResource = require('ready-resource')
const idEnc = require('hypercore-id-encoding')
const b4a = require('b4a')
const safetyCatch = require('safety-catch')

class PrometheusDhtBridge extends ReadyResource {
constructor (dht, server) {
super()

this.dht = dht

this.server = server
this.server.get(
'/scrape/:alias/metrics',
{ logLevel: 'info' },
this._handleGet.bind(this)
)

this.aliases = new Map() // alias->scrapeClient
}

get publicKey () {
return this.dht.defaultKeyPair.publicKey
}

async _close () {
await Promise.all([
[...this.aliases.values()].map(a => a.close())
])
}

putAlias (alias, targetPubKey) {
targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey))
const current = this.aliases.get(alias)

if (current) {
if (b4a.equals(current.key, targetPubKey)) {
return // Idempotent
}

current.close().catch(safetyCatch)
}

const scrapeClient = new ScraperClient(this.dht, targetPubKey)
this.aliases.set(alias, scrapeClient)
}

async _handleGet (req, reply) {
const alias = req.params.alias

const scrapeClient = this.aliases.get(alias)

if (!scrapeClient) {
// TODO: 404 code
throw new Error('Unkown alias')
}

if (!scrapeClient.opened) await scrapeClient.ready()

const res = await scrapeClient.lookup()
if (res.success) {
reply.send(res.metrics)
} else {
// TODO:
}
}
}

module.exports = PrometheusDhtBridge
14 changes: 13 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Bridge to scrape Prometheus metrics exposed over hyperdht",
"main": "index.js",
"scripts": {
"test": "standard"
"test": "standard && brittle test.js"
},
"repository": {
"type": "git",
Expand All @@ -24,6 +24,18 @@
},
"homepage": "https://github.com/HDegroote/dht-prometheus#readme",
"devDependencies": {
"axios": "^1.7.2",
"brittle": "^3.5.2",
"hyperdht": "^6.15.1",
"prom-client": "^15.1.2",
"standard": "^17.1.0"
},
"dependencies": {
"dht-prom-client": "^0.0.1-alpha.1",
"fastify": "^4.28.0",
"hypercore-id-encoding": "^1.3.0",
"protomux-rpc": "^1.5.2",
"ready-resource": "^1.1.1",
"safety-catch": "^1.0.2"
}
}
72 changes: 72 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const test = require('brittle')
const PrometheusDhtBridge = require('./index')
const promClient = require('prom-client')
const DhtPromClient = require('dht-prom-client')
const createTestnet = require('hyperdht/testnet')
const HyperDHT = require('hyperdht')
const fastify = require('fastify')
const axios = require('axios')

test('put alias + lookup happy flow', async t => {
const { bridge, dhtPromClient } = await setup(t)

await dhtPromClient.ready()
await bridge.ready()

const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })

bridge.putAlias('dummy', dhtPromClient.publicKey)

const res = await axios.get(`${baseUrl}/scrape/dummy/metrics`)
t.is(res.status, 200, 'correct status')
t.is(
res.data.includes('process_cpu_user_seconds_total'),
true,
'Successfully scraped metrics'
)
})

test('No new alias if adding same key', async t => {
const { bridge } = await setup(t)
const key = 'a'.repeat(64)
const key2 = 'b'.repeat(64)

await bridge.ready()
bridge.putAlias('dummy', key)
const clientA = bridge.aliases.get('dummy')

t.is(clientA != null, true, 'sanity check')
bridge.putAlias('dummy', key)
t.is(clientA, bridge.aliases.get('dummy'), 'no new client')

t.is(clientA.closing == null, true, 'sanity check')
bridge.putAlias('dummy', key2)
t.not(clientA, bridge.aliases.get('dummy'), 'sanity check')
t.is(clientA.closing != null, true, 'lifecycle ok')
})

async function setup (t) {
promClient.collectDefaultMetrics() // So we have something to scrape
t.teardown(() => promClient.register.clear())

const testnet = await createTestnet()
const bootstrap = testnet.bootstrap

const dht = new HyperDHT({ bootstrap })
const server = fastify({ logger: false })
const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 })
const scraperPubKey = bridge.publicKey

const dhtClient = new HyperDHT({ bootstrap })
const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey)

t.teardown(async () => {
await server.close()
await bridge.close()
await dhtPromClient.close()
await dht.destroy()
await testnet.destroy()
})

return { dhtPromClient, bridge, bootstrap }
}
Loading