Skip to content
This repository has been archived by the owner on May 5, 2022. It is now read-only.

propagate tracer for async code #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions appmetrics-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var PropertyReader = require('properties-reader');
var properties = PropertyReader(__dirname + '/appmetrics-zipkin.properties');
var tcpp = require('tcp-ping');

// create namespace
const { createNamespace } = require('./lib/request-context.js');
createNamespace('appmetrics-zipkin-ns');

const {
BatchRecorder
} = require('zipkin');
Expand Down
46 changes: 46 additions & 0 deletions lib/namespace.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict';

let semver = require('semver');
let asyncHooks;

if (semver.gte(process.version, '8.0.0')) {
asyncHooks = require('async_hooks');
}

class Namespace {

constructor() {
this.context = {};
}

run(fn) {
if (asyncHooks) {
const eid = asyncHooks.executionAsyncId();
this.context[eid] = {};
}
fn();
}

set(key, val) {
if (asyncHooks) {
const eid = asyncHooks.executionAsyncId();
this.context[eid][key] = val;
} else {
this.context[key] = val;
}
}

get(key) {
if (asyncHooks) {
const eid = asyncHooks.executionAsyncId();
if (this.context[eid])
return this.context[eid][key];
else
return undefined;
} else {
return this.context[key];
}
}
}

module.exports = Namespace;
54 changes: 54 additions & 0 deletions lib/request-context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

let semver = require('semver');
let asyncHooks;

if (semver.gte(process.version, '8.0.0')) {
asyncHooks = require('async_hooks');
}
const Namespace = require('./namespace');

const namespaces = {};

function createHooks(namespace) {
function init(asyncId, type, triggerId, resource) {
if (namespace.context[triggerId]) {
namespace.context[asyncId] = namespace.context[triggerId];
}
}

function destroy(asyncId) {
delete namespace.context[asyncId];
}

const asyncHook = asyncHooks.createHook({
init,
destroy
});

asyncHook.enable();
}

function createNamespace(name) {
if (namespaces[name]) {
return namespace;
}

const namespace = new Namespace();
namespaces[name] = namespace;

if (semver.gte(process.version, '8.0.0')) {
createHooks(namespace);
}
return namespace;
}

function getNamespace(name) {
return namespaces[name];
}


module.exports = {
createNamespace,
getNamespace
};
7 changes: 7 additions & 0 deletions probes/http-outbound-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var util = require('util');
var url = require('url');
var semver = require('semver');
const zipkin = require('zipkin');
const { getNamespace } = require('../lib/request-context.js');

var serviceName;

Expand Down Expand Up @@ -83,6 +84,12 @@ HttpOutboundProbeZipkin.prototype.attach = function(name, target) {
methodArgs[0] = Object.assign({}, parsedOptions);
}

// replace tracer from namespace
const namespace = getNamespace('appmetrics-zipkin-ns');
if (namespace) {
tracer.setId(namespace.get('tracer-id'));
}

if (!methodArgs[0].headers) methodArgs[0].headers = {};
let { headers } = Request.addZipkinHeaders(methodArgs[0], tracer.createChildId());
Object.assign(methodArgs[0].headers, { headers });
Expand Down
93 changes: 51 additions & 42 deletions probes/http-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var aspect = require('../lib/aspect.js');
var util = require('util');
const zipkin = require('zipkin');

// get namespace
const { getNamespace } = require('../lib/request-context.js');
const namespace = getNamespace('appmetrics-zipkin-ns');

var serviceName;

const {
Expand Down Expand Up @@ -84,50 +88,55 @@ HttpProbeZipkin.prototype.attach = function(name, target) {
if (obj.__zipkinhttpProbe__) return;
obj.__zipkinhttpProbe__ = true;
aspect.aroundCallback(args, probeData, function(obj, args, probeData) {
var httpReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpReq.url);
// console.log(util.inspect(httpReq));
if (traceUrl !== '') {
const method = httpReq.method;
if (hasZipkinHeader(httpReq)) {
const headers = httpReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
namespace.run(() => {
var httpReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpReq.url);
// console.log(util.inspect(httpReq));
if (traceUrl !== '') {
const method = httpReq.method;
if (hasZipkinHeader(httpReq)) {
const headers = httpReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(
headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
// Must assign new options back to args[0]
const { headers } = Request.addZipkinHeaders(args[0], tracer.id);
Object.assign(args[0].headers, headers);
// store tracer in namespace
namespace.set('tracer-id', tracer.id);
// Must assign new options back to args[0]
const { headers } = Request.addZipkinHeaders(args[0], tracer.id);
Object.assign(args[0].headers, headers);
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));

aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));

aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}
});
});
});
}
Expand Down
8 changes: 8 additions & 0 deletions probes/https-outbound-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var util = require('util');
var url = require('url');
var semver = require('semver');
const zipkin = require('zipkin');
const { getNamespace } = require('../lib/request-context.js');

var serviceName;

Expand Down Expand Up @@ -78,6 +79,13 @@ HttpsOutboundProbeZipkin.prototype.attach = function(name, target) {
requestMethod = parsedOptions.method;
}
}

// replace tracer from namespace
const namespace = getNamespace('appmetrics-zipkin-ns');
if (namespace) {
tracer.setId(namespace.get('tracer-id'));
}

// Must assign new options back to methodArgs[0]
methodArgs[0] = Request.addZipkinHeaders(methodArgs[0], tracer.createChildId());
tracer.recordServiceName(serviceName);
Expand Down
93 changes: 51 additions & 42 deletions probes/https-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var aspect = require('../lib/aspect.js');
var util = require('util');
const zipkin = require('zipkin');

// get namespace
const { getNamespace } = require('../lib/request-context.js');
const namespace = getNamespace('appmetrics-zipkin-ns');

var serviceName;

const {
Expand Down Expand Up @@ -84,50 +88,55 @@ HttpsProbeZipkin.prototype.attach = function(name, target) {
if (obj.__zipkinhttpsProbe__) return;
obj.__zipkinhttpsProbe__ = true;
aspect.aroundCallback(args, probeData, function(obj, args, probeData) {
var httpsReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpsReq.url);
if (traceUrl !== '') {
const method = httpsReq.method;

if (hasZipkinHeader(httpsReq)) {
const headers = httpsReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
namespace.run(() => {
var httpsReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpsReq.url);
if (traceUrl !== '') {
const method = httpsReq.method;

if (hasZipkinHeader(httpsReq)) {
const headers = httpsReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(
headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
// Must assign new options back to args[0]
args[0] = Request.addZipkinHeaders(args[0], tracer.id);
// store tracer in namespace
namespace.set('tracer-id', tracer.id);
// Must assign new options back to args[0]
args[0] = Request.addZipkinHeaders(args[0], tracer.id);
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpsReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));


aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpsReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));


aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}
});
});
});
}
Expand Down