-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from HDegroote/happy-path
Basic happy path
- Loading branch information
Showing
4 changed files
with
208 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
const { once } = require('events') | ||
const ReadyResource = require('ready-resource') | ||
const safetyCatch = require('safety-catch') | ||
const RPC = require('protomux-rpc') | ||
const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings') | ||
|
||
class ScraperClient extends ReadyResource { | ||
constructor (dht, promClientPublicKey) { | ||
super() | ||
|
||
this.dht = dht | ||
this.key = promClientPublicKey | ||
this.rpc = null | ||
this.socket = null | ||
} | ||
|
||
async _open () { | ||
// TODO: auto reconnect | ||
// TODO: retry on failure | ||
// TODO: define a keepAlive | ||
// TODO: handle error paths (peer not available etc) | ||
this.socket = this.dht.connect(this.key) | ||
this.socket.on('error', safetyCatch) | ||
|
||
await this.socket.opened | ||
|
||
if (!this.socket.connected) { | ||
throw new Error('Could not open socket') | ||
} | ||
|
||
this.rpc = new RPC(this.socket, { protocol: 'prometheus-metrics' }) | ||
await once(this.rpc, 'open') | ||
} | ||
|
||
async _close () { | ||
this.rpc?.destroy() | ||
this.socket?.destroy() | ||
} | ||
|
||
async lookup () { | ||
if (!this.opened) await this.ready() | ||
|
||
const res = await this.rpc.request( | ||
'metrics', | ||
null, | ||
{ responseEncoding: MetricsReplyEnc } | ||
) | ||
|
||
return res | ||
} | ||
} | ||
|
||
module.exports = ScraperClient |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
const ScraperClient = require('./client') | ||
const ReadyResource = require('ready-resource') | ||
const idEnc = require('hypercore-id-encoding') | ||
const b4a = require('b4a') | ||
const safetyCatch = require('safety-catch') | ||
|
||
class PrometheusDhtBridge extends ReadyResource { | ||
constructor (dht, server) { | ||
super() | ||
|
||
this.dht = dht | ||
|
||
this.server = server | ||
this.server.get( | ||
'/scrape/:alias/metrics', | ||
{ logLevel: 'info' }, | ||
this._handleGet.bind(this) | ||
) | ||
|
||
this.aliases = new Map() // alias->scrapeClient | ||
} | ||
|
||
get publicKey () { | ||
return this.dht.defaultKeyPair.publicKey | ||
} | ||
|
||
async _close () { | ||
await Promise.all([ | ||
[...this.aliases.values()].map(a => a.close()) | ||
]) | ||
} | ||
|
||
putAlias (alias, targetPubKey) { | ||
targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey)) | ||
const current = this.aliases.get(alias) | ||
|
||
if (current) { | ||
if (b4a.equals(current.key, targetPubKey)) { | ||
return // Idempotent | ||
} | ||
|
||
current.close().catch(safetyCatch) | ||
} | ||
|
||
const scrapeClient = new ScraperClient(this.dht, targetPubKey) | ||
this.aliases.set(alias, scrapeClient) | ||
} | ||
|
||
async _handleGet (req, reply) { | ||
const alias = req.params.alias | ||
|
||
const scrapeClient = this.aliases.get(alias) | ||
|
||
if (!scrapeClient) { | ||
// TODO: 404 code | ||
throw new Error('Unkown alias') | ||
} | ||
|
||
if (!scrapeClient.opened) await scrapeClient.ready() | ||
|
||
const res = await scrapeClient.lookup() | ||
if (res.success) { | ||
reply.send(res.metrics) | ||
} else { | ||
// TODO: | ||
} | ||
} | ||
} | ||
|
||
module.exports = PrometheusDhtBridge |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
const test = require('brittle') | ||
const PrometheusDhtBridge = require('./index') | ||
const promClient = require('prom-client') | ||
const DhtPromClient = require('dht-prom-client') | ||
const createTestnet = require('hyperdht/testnet') | ||
const HyperDHT = require('hyperdht') | ||
const fastify = require('fastify') | ||
const axios = require('axios') | ||
|
||
test('put alias + lookup happy flow', async t => { | ||
const { bridge, dhtPromClient } = await setup(t) | ||
|
||
await dhtPromClient.ready() | ||
await bridge.ready() | ||
|
||
const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 }) | ||
|
||
bridge.putAlias('dummy', dhtPromClient.publicKey) | ||
|
||
const res = await axios.get(`${baseUrl}/scrape/dummy/metrics`) | ||
t.is(res.status, 200, 'correct status') | ||
t.is( | ||
res.data.includes('process_cpu_user_seconds_total'), | ||
true, | ||
'Successfully scraped metrics' | ||
) | ||
}) | ||
|
||
test('No new alias if adding same key', async t => { | ||
const { bridge } = await setup(t) | ||
const key = 'a'.repeat(64) | ||
const key2 = 'b'.repeat(64) | ||
|
||
await bridge.ready() | ||
bridge.putAlias('dummy', key) | ||
const clientA = bridge.aliases.get('dummy') | ||
|
||
t.is(clientA != null, true, 'sanity check') | ||
bridge.putAlias('dummy', key) | ||
t.is(clientA, bridge.aliases.get('dummy'), 'no new client') | ||
|
||
t.is(clientA.closing == null, true, 'sanity check') | ||
bridge.putAlias('dummy', key2) | ||
t.not(clientA, bridge.aliases.get('dummy'), 'sanity check') | ||
t.is(clientA.closing != null, true, 'lifecycle ok') | ||
}) | ||
|
||
async function setup (t) { | ||
promClient.collectDefaultMetrics() // So we have something to scrape | ||
t.teardown(() => promClient.register.clear()) | ||
|
||
const testnet = await createTestnet() | ||
const bootstrap = testnet.bootstrap | ||
|
||
const dht = new HyperDHT({ bootstrap }) | ||
const server = fastify({ logger: false }) | ||
const bridge = new PrometheusDhtBridge(dht, server, { address: '127.0.0.1', port: 30000 }) | ||
const scraperPubKey = bridge.publicKey | ||
|
||
const dhtClient = new HyperDHT({ bootstrap }) | ||
const dhtPromClient = new DhtPromClient(dhtClient, promClient, scraperPubKey) | ||
|
||
t.teardown(async () => { | ||
await server.close() | ||
await bridge.close() | ||
await dhtPromClient.close() | ||
await dht.destroy() | ||
await testnet.destroy() | ||
}) | ||
|
||
return { dhtPromClient, bridge, bootstrap } | ||
} |