Skip to content

Commit

Permalink
V1 (#15)
Browse files Browse the repository at this point in the history
* Use v1 deps

* rm todo

* Put logs in registerLogger function

* Clean up logs

* Use latest scrape-client API

* Use published dht-prom-client

* Update logs to v1 scrape client

* Simplify: no async life cycle for alias entries

* Complete readme
  • Loading branch information
HDegroote authored Sep 17, 2024
1 parent a96f885 commit 27c527e
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 163 deletions.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# DHT Prometheus

A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections.
A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections (not http).

Its main advantage is that it does not use http: service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries.
Service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries.

Another advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret.
An advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret.

## Deployment

Expand All @@ -16,6 +16,14 @@ The DHT-prometheus service fulfils two complementary roles:

### Run

Configuration is done through environment variables:

- `DHT_PROM_KEY_PAIR_SEED`: 32-byte seed passed to `HyperDHT.keyPair()`, set as hex or z32. Set this to have a consistent public key (otherwise random, which is only useful for tests).
- `DHT_PROM_SHARED_SECRET`: 32-byte secret key, set as hex or z32.
- `DHT_PROM_LOG_LEVEL`: defaults to info
- `DHT_PROM_HTTP_PORT`: port where the http server listens. Defaults to a random port.
- `DHT_PROM_HTTP_HOST`: host where the http server listens. Defaults to 127.0.0.1

#### Docker

```
Expand All @@ -26,6 +34,8 @@ The intent is for the prometheus service to read its config from a read-only bin

Note: `/etc/prometheus/config/prometheus-dht-targets` should be writable by the container's user.

Note: `--network=host` is optional, but HyperDHT holepunching can struggle using the default bridge network, particularly for LAN and localhost connections.

#### CLI

Install:
Expand Down
136 changes: 90 additions & 46 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ class PrometheusDhtBridge extends ReadyResource {
}

async _open () {
await this._loadAliases()

// It is important that the aliases are first loaded
// otherwise the old aliases might get overwritten
await this.aliasRpcServer.ready()
await this._loadAliases()
await this.swarm.listen()

this._checkExpiredsInterval = setInterval(
() => this.cleanupExpireds(),
Expand All @@ -94,13 +93,9 @@ class PrometheusDhtBridge extends ReadyResource {
clearInterval(this._checkExpiredsInterval)
}

await this.aliasRpcServer.close()

await Promise.all([
[...this.aliases.values()].map(a => {
return a.close().catch(safetyCatch)
})]
)
for (const entry of this.aliases.values()) {
entry.close()
}

await this.swarm.destroy()

Expand All @@ -120,7 +115,7 @@ class PrometheusDhtBridge extends ReadyResource {
return updated
}

current.close().catch(safetyCatch)
current.close()
}

const entry = new AliasesEntry(
Expand All @@ -131,7 +126,6 @@ class PrometheusDhtBridge extends ReadyResource {
)

this.aliases.set(alias, entry)
// TODO: just emit entry?
this.emit('set-alias', { alias, entry })
const updated = true

Expand All @@ -142,25 +136,6 @@ class PrometheusDhtBridge extends ReadyResource {
return updated
}

// Should be kept sync (or think hard)
cleanupExpireds () {
const toRemove = []
for (const [alias, entry] of this.aliases) {
if (entry.isExpired) toRemove.push(alias)
}

for (const alias of toRemove) {
const entry = this.aliases.get(alias)
this.aliases.delete(alias)
entry.close().catch(safetyCatch)
this.emit('alias-expired', { publicKey: entry.targetKey, alias })
}

if (toRemove.length > 0) {
this._writeAliases().catch(safetyCatch)
}
}

async _handleGet (req, reply) {
const alias = req.params.alias

Expand All @@ -172,16 +147,16 @@ class PrometheusDhtBridge extends ReadyResource {
return
}

if (!entry.opened) {
await entry.ready()
if (this._forceFlushOnClientReady) await entry.scrapeClient.swarm.flush()
if (this._forceFlushOnClientReady && !entry.hasHandledGet) {
await entry.scrapeClient.swarm.flush()
}
entry.hasHandledGet = true

const scrapeClient = entry.scrapeClient

let res
try {
res = await scrapeClient.lookup()
res = await scrapeClient.requestMetrics()
} catch (e) {
this.emit('upstream-error', e)
reply.code(502)
Expand Down Expand Up @@ -217,19 +192,94 @@ class PrometheusDhtBridge extends ReadyResource {
this.putAlias(alias, z32PubKey, hostname, service, { write: false })
}
} catch (e) {
// An error is expected if the file does not yet exist
// (typically first run only)
this.emit('load-aliases-error', e)
}
}

// Should be kept sync (or think hard)
cleanupExpireds () {
const toRemove = []
for (const [alias, entry] of this.aliases) {
if (entry.isExpired) toRemove.push(alias)
}

for (const alias of toRemove) {
const entry = this.aliases.get(alias)
this.aliases.delete(alias)
entry.close()
this.emit('alias-expired', { publicKey: entry.targetKey, alias })
}

if (toRemove.length > 0) {
this._writeAliases().catch(safetyCatch)
}
}

registerLogger (logger) {
this.on('set-alias', ({ alias, entry }) => {
const scrapeClient = entry.scrapeClient
const publicKey = scrapeClient.targetKey
const { service, hostname } = entry

logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`)

scrapeClient.on('connection-open', ({ uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} opened connection to ${remoteAddress} (uid: ${uid})`)
})
scrapeClient.on('connection-close', ({ uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} closed connection to ${remoteAddress} (uid: ${uid})`)
})
scrapeClient.on('connection-error', ({ error, uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} connection error (uid: ${uid}): ${error.stack}`)
})

if (logger.level === 'debug') {
scrapeClient.on('connection-ignore', ({ uid, remotePublicKey, remoteAddress }) => {
logger.debug(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} ignored connection (uid: ${uid})`)
})
}
})

this.on('aliases-updated', (loc) => {
logger.info(`Updated the aliases file at ${loc}`)
})

this.on('alias-expired', ({ alias, publicKey }) => {
logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`)
})

this.on('load-aliases-error', e => {
// Expected first time the service starts (creates it then)
logger.error(`failed to load aliases file: ${e.stack}`)
})

this.on('upstream-error', e => {
logger.info(`upstream error: ${e.stack}`)
})

this.on('write-aliases-error', e => {
logger.error(`Failed to write aliases file ${e.stack}`)
})

this.aliasRpcServer.registerLogger(logger)
}
}

class AliasesEntry extends ReadyResource {
class AliasesEntry {
constructor (scrapeClient, hostname, service, expiry) {
super()

this.scrapeClient = scrapeClient
this.hostname = hostname
this.service = service
this.expiry = expiry
this.hasHandledGet = false

this.scrapeClient.ready()
}

get closed () {
return this.scrapeClient.closed
}

get targetKey () {
Expand All @@ -244,14 +294,8 @@ class AliasesEntry extends ReadyResource {
this.expiry = expiry
}

async _open () {
await this.scrapeClient.ready()
}

async _close () {
if (this.scrapeClient.opening) {
await this.scrapeClient.close()
}
close () {
this.scrapeClient.close()
}
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"dependencies": {
"b4a": "^1.6.6",
"debounceify": "^1.1.0",
"dht-prom-alias-rpc": "^0.0.1-alpha.1",
"dht-prom-client": "^0.0.1-alpha.10",
"dht-prom-alias-rpc": "^1.0.0",
"dht-prom-client": "^1.0.1",
"fastify": "^4.28.0",
"graceful-goodbye": "^1.3.0",
"hypercore-id-encoding": "^1.3.0",
Expand Down
100 changes: 2 additions & 98 deletions run.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function loadConfig () {
prometheusTargetsLoc: process.env.DHT_PROM_PROMETHEUS_TARGETS_LOC || './prometheus/targets.json',
logLevel: (process.env.DHT_PROM_LOG_LEVEL || 'info').toLowerCase(),
httpPort: process.env.DHT_PROM_HTTP_PORT || 0,
httpHost: '127.0.0.1',
httpHost: process.env.DHT_PROM_HTTP_HOST || '127.0.0.1',
_forceFlushOnClientReady: process.env._DHT_PROM_FORCE_FLUSH || 'false' // Tests only
}

Expand Down Expand Up @@ -75,7 +75,7 @@ async function main () {
serverLogLevel
})

setupLogging(bridge, logger)
bridge.registerLogger(logger)

goodbye(async () => {
logger.info('Shutting down')
Expand All @@ -90,100 +90,4 @@ async function main () {
logger.info(`DHT RPC ready at public key ${idEnc.normalize(bridge.publicKey)}`)
}

function setupLogging (bridge, logger) {
bridge.on('set-alias', ({ alias, entry }) => {
const scrapeClient = entry.scrapeClient
const publicKey = scrapeClient.targetKey
const { service, hostname } = entry

logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`)

scrapeClient.on('connection-open', ({ uid, targetKey, peerInfo }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(targetKey)} opened connection from ${idEnc.normalize(peerInfo.publicKey)} (uid: ${uid})`)
})
scrapeClient.on('connection-close', ({ uid }) => {
logger.info(`Scraper for ${alias} closed connection (uid: ${uid})`)
})
scrapeClient.on('connection-error', ({ error, uid }) => {
logger.info(`Scraper for ${alias} connection error (uid: ${uid})`)
logger.info(error)
})

if (logger.level === 'debug') {
scrapeClient.on('connection-ignore', ({ uid }) => {
logger.debug(`Scraper for ${alias} ignored connection (uid: ${uid})`)
})
}
})

bridge.on('aliases-updated', (loc) => {
logger.info(`Updated the aliases file at ${loc}`)
})

bridge.on('alias-expired', ({ alias, publicKey }) => {
logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`)
})

bridge.on('load-aliases-error', e => { // TODO: test
// Expected first time the service starts (creates it then)
logger.error('failed to load aliases file')
logger.error(e)
})

bridge.on('upstream-error', e => { // TODO: test
logger.info('upstream error:')
logger.info(e)
})

bridge.on('write-aliases-error', e => {
logger.error('Failed to write aliases file')
logger.error(e)
})

bridge.aliasRpcServer.on(
'alias-request',
({ uid, remotePublicKey, targetPublicKey, alias }) => {
logger.info(`Alias request from ${idEnc.normalize(remotePublicKey)} to set ${alias}->${idEnc.normalize(targetPublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'register-success', ({ uid, alias, targetPublicKey, updated }) => {
logger.info(`Alias success for ${alias}->${idEnc.normalize(targetPublicKey)}--updated: ${updated} (uid: ${uid})`)
}
)
// TODO: log IP address + rate limit
bridge.aliasRpcServer.on(
'alias-unauthorised', ({ uid, remotePublicKey, targetPublicKey, alias }) => {
logger.info(`Unauthorised alias request from ${idEnc.normalize(remotePublicKey)} to set alias ${alias}->${idEnc.normalize(targetPublicKey)} (uid: ${uid})`)
}
)
bridge.aliasRpcServer.on(
'register-error', ({ uid, error }) => {
logger.info(`Alias error: ${error} (${uid})`)
}
)

bridge.aliasRpcServer.on(
'connection-open',
({ uid, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server opened connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'connection-close',
({ uid, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server closed connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'connection-error',
({ uid, error, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server socket error: ${error.stack} on connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
}

main()
Loading

0 comments on commit 27c527e

Please sign in to comment.