Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing to workers #149

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ HAWK_CATCHER_TOKEN=

## If true, Grouper worker will send messages about new events to Notifier worker
IS_NOTIFIER_WORKER_ENABLED=false

# Tracing

## Is tracing enabled (only `true` to enable)
TRACING_ENABLED=false

## Jaeger-collector endpoint
JAEGER_ENDPOINT=http://jaeger-collector:14268/api/traces
24 changes: 12 additions & 12 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
module.exports = {
env: {
es6: true,
node: true,
jest: true
},
extends: ['codex'],
rules: {
"@typescript-eslint/no-unused-expressions": "error",
"no-unused-expressions": "off"
},
env: {
es6: true,
node: true,
jest: true,
},
extends: ['codex', 'prettier', 'prettier/@typescript-eslint'],
rules: {
'@typescript-eslint/no-unused-expressions': 'error',
'no-unused-expressions': 'off',
},
globals: {
NodeJS: true
}
NodeJS: true,
},
};
6 changes: 6 additions & 0 deletions .prettierrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = {
trailingComma: 'es5',
tabWidth: 2,
semi: true,
singleQuote: true,
}
37 changes: 30 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

Workers are services for processing hawk's background tasks



## Requirements

- [Registry](https://github.com/codex-team/hawk.registry)
Expand All @@ -29,8 +27,7 @@ For simplicity, Hawk workers can be used as part of the [Mono repository](https:
- Set `LOG_LEVEL` to `verbose` if you want message logs

> Also you can use `worker.logger` which is [`winston.Logger`](https://github.com/winstonjs/winston) to log something
>
>

## How to run workers

1. Make sure you are in Workers root directory
Expand Down Expand Up @@ -73,21 +70,24 @@ SIMULTANEOUS_TASKS=1 yarn worker hawk-worker-sourcemaps

## Running workers with Docker

Basic configuration is in `docker-compose.dev.yml`.
Basic configuration is in `docker-compose.dev.yml`.
Pull image from https://hub.docker.com/r/codexteamuser/hawk-workers

```
docker-compose -f docker-compose.dev.yml pull
```

If you run mongodb and rabbitmq with `hawk.mono` repository, by default your docker network will be named `hawkmono_default`.
If you run mongodb and rabbitmq with `hawk.mono` repository, by default your docker network will be named `hawkmono_default`.
This network name is written as external for workers.

Run chosen worker (say hawk-worker-javascript)

```
docker-compose -f docker-compose.dev.yml up hawk-worker-javascript
```

### Adding new workers

Make sure that your `.env` configurations exists.

Add new section to the `docker-compose.{dev,prod}.yml` files.
Expand Down Expand Up @@ -145,7 +145,7 @@ You can tweak it (add schemas, etc) and use it in your workers to handle databas
### Example

```javascript
const db = require("lib/db/mongoose-controller");
const db = require('lib/db/mongoose-controller');

await db.connect(); // Requires `MONGO_URL`

Expand Down Expand Up @@ -201,3 +201,26 @@ yarn migrate
Refactor mongo-migrate commands to have an opportunity to create or rollback

[More details](https://www.npmjs.com/package/migrate-mongo)

## Tracing

Worker supports Opentracing with Jaeger exporter

To test add `JAEGER_ENDPOINT` in `.env`, set `TRACING_ENABLED=true`, and run Jaeger:

```shell
docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:latest
```

Start worker, send events

Navigate to `http://localhost:16686`
38 changes: 38 additions & 0 deletions lib/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { NodeTracerProvider } from '@opentelemetry/node';
import {
BatchSpanProcessor,
SimpleSpanProcessor,
ConsoleSpanExporter,
} from '@opentelemetry/tracing';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';

let provider: NodeTracerProvider;
let exporter: JaegerExporter;

export const setupTracing = (name: string): void => {
provider = new NodeTracerProvider();

if (process.env.TRACING_ENABLED === 'true') {
exporter = new JaegerExporter({
serviceName: 'worker',
tags: [{ key: 'type', value: name }],
endpoint: process.env.JAEGER_ENDPOINT,
});

provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
console.log('Jaeger set up');
}

// Print spans to console if not in prod
if (process.env.NODE_ENV !== 'PRODUCTION') {
provider.addSpanProcessor(
new SimpleSpanProcessor(new ConsoleSpanExporter())
);
}

provider.register();
};

export const stopTracing = (): void => {
if (exporter) exporter.shutdown();
};
56 changes: 44 additions & 12 deletions lib/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as amqp from 'amqplib';
import * as client from 'prom-client';
import opentelemetry, { Tracer, Span } from '@opentelemetry/api';
import { createLogger, format, transports, Logger } from 'winston';
import { WorkerTask } from './types/worker-task';
import { CriticalError, NonCriticalError, ParsingError } from './workerErrors';
Expand Down Expand Up @@ -53,12 +54,19 @@ export abstract class Worker {
format.timestamp(),
format.colorize(),
format.simple(),
format.printf((msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`)
format.printf(
(msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`
)
),
}),
],
});

/**
* Opentracing tracer
*/
protected tracer: Tracer;

/**
* Prometheus metrics
* metricProcessedMessages: prom-client.Counter – number of successfully processed messages
Expand All @@ -73,7 +81,8 @@ export abstract class Worker {
/**
* How many task Worker should do concurrently
*/
private readonly simultaneousTasks: number = +process.env.SIMULTANEOUS_TASKS || 1;
private readonly simultaneousTasks: number =
+process.env.SIMULTANEOUS_TASKS || 1;

/**
* Registry connection status true/false
Expand Down Expand Up @@ -122,7 +131,14 @@ export abstract class Worker {
* Get array of available prometheus metrics
*/
public getMetrics(): client.Counter<string>[] {
return [ this.metricSuccessfullyProcessedMessages ];
return [this.metricSuccessfullyProcessedMessages];
}

/**
* Initialize opentracing tracer
*/
public initTracing(): void {
this.tracer = opentelemetry.trace.getTracer(this.type);
}

/**
Expand All @@ -133,16 +149,21 @@ export abstract class Worker {
throw new Error('Worker type is not defined');
}

this.initTracing();

if (!this.registryConnected) {
await this.connect();
}

const { consumerTag } = await this.channelWithRegistry.consume(this.type, (msg: amqp.ConsumeMessage) => {
const promise = this.processMessage(msg) as Promise<void>;
const { consumerTag } = await this.channelWithRegistry.consume(
this.type,
(msg: amqp.ConsumeMessage) => {
const promise = this.processMessage(msg) as Promise<void>;

this.tasksMap.set(msg, promise);
promise.then(() => this.tasksMap.delete(msg));
});
this.tasksMap.set(msg, promise);
promise.then(() => this.tasksMap.delete(msg));
}
);

/**
* Remember consumer tag to cancel subscription in future
Expand Down Expand Up @@ -240,6 +261,8 @@ export abstract class Worker {
private async processMessage(msg: amqp.ConsumeMessage): Promise<void> {
let event: WorkerTask;

const span = this.tracer.startSpan('processMessage');

try {
const stringifiedEvent = msg.content.toString();

Expand All @@ -255,7 +278,11 @@ export abstract class Worker {
}

try {
await this.handle(event);
const handleSpan = this.tracer.startSpan('handle', { parent: span });

await this.handle(event, handleSpan);

handleSpan.end();

/**
* Let RabbitMQ know that we processed the message
Expand All @@ -270,8 +297,12 @@ export abstract class Worker {
HawkCatcher.send(e);
this.logger.error('Worker::processMessage: An error occurred:\n', e);

this.logger.debug('instanceof CriticalError? ' + (e instanceof CriticalError));
this.logger.debug('instanceof NonCriticalError? ' + (e instanceof NonCriticalError));
this.logger.debug(
'instanceof CriticalError? ' + (e instanceof CriticalError)
);
this.logger.debug(
'instanceof NonCriticalError? ' + (e instanceof NonCriticalError)
);

/**
* Send back message to registry since we failed to handle it
Expand All @@ -290,6 +321,7 @@ export abstract class Worker {
this.logger.error('Unknown error:\n', e);
}
}
span.end();
}

/**
Expand Down Expand Up @@ -334,5 +366,5 @@ export abstract class Worker {
*
* @param {WorkerTask} event - Event object from consume method
*/
protected abstract handle(event: WorkerTask): Promise<void>;
protected abstract handle(event: WorkerTask, span: Span): Promise<void>;
}
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@
},
"dependencies": {
"@hawk.so/nodejs": "^2.0.7",
"@opentelemetry/api": "^0.11.0",
"@opentelemetry/exporter-jaeger": "^0.11.0",
"@opentelemetry/node": "^0.11.0",
"@opentelemetry/plugin-mongodb": "^0.10.0",
"@opentelemetry/tracing": "^0.11.0",
"@types/amqplib": "^0.5.13",
"@types/jest": "^25.2.1",
"@types/mongodb": "^3.5.15",
"@types/node": "^13.13.4",
"amqplib": "^0.5.5",
"debug": "^4.1.1",
"dotenv": "^8.2.0",
"jaeger-client": "^3.18.1",
"jsonwebtoken": "^8.5.1",
"lodash.mergewith": "^4.6.2",
"migrate-mongo": "^7.2.1",
"mongodb": "^3.5.7",
Expand All @@ -68,6 +75,7 @@
"@shelf/jest-mongodb": "^1.1.5",
"eslint": "^6.8.0",
"eslint-config-codex": "^1.3.4",
"eslint-config-prettier": "^6.11.0",
"jest": "25.5.4",
"nodemon": "^2.0.3",
"random-words": "^1.1.1",
Expand All @@ -76,4 +84,4 @@
"webpack": "^4.43.0",
"yargs": "^15.3.1"
}
}
}
11 changes: 11 additions & 0 deletions runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as url from 'url';
import { Worker } from './lib/worker';
import HawkCatcher from '@hawk.so/nodejs';
import * as dotenv from 'dotenv';
import { setupTracing, stopTracing } from './lib/tracer';

dotenv.config();

Expand Down Expand Up @@ -56,6 +57,7 @@ class WorkerRunner {
.then(() => {
try {
this.startMetrics();
this.startTracing();
} catch (e) {
HawkCatcher.send(e);
console.error(`Metrics not started: ${e}`);
Expand Down Expand Up @@ -131,6 +133,14 @@ class WorkerRunner {
}, 1000);
}

/**
* Start tracing
*/
private startTracing(): void {
setupTracing(workerNames[0]); // If many workers running, bug here
console.log("Trasing initialized")
}

/**
* Dynamically loads workers through the yarn workspaces
*/
Expand Down Expand Up @@ -258,6 +268,7 @@ class WorkerRunner {
private async stopWorker(worker: Worker): Promise<void> {
try {
await worker.finish();
stopTracing();

console.log(
'\x1b[33m%s\x1b[0m',
Expand Down
Loading