Skip to content
This repository has been archived by the owner on Jun 17, 2021. It is now read-only.

Commit

Permalink
modif code check
Browse files Browse the repository at this point in the history
  • Loading branch information
QuanjieDeng committed Oct 20, 2020
1 parent f3e2d93 commit 9bfe84a
Showing 1 changed file with 53 additions and 48 deletions.
101 changes: 53 additions & 48 deletions modules/nodejs-agent/lib/plugins/amqp/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const componentDefine = require("skyapm-nodejs/lib/trace/component-define");
* @author Quanjie.Deng
*/
module.exports = function(amqpModule, instrumentation, contextManager) {
console.log("amqp hook");
instrumentation.enhanceMethod(amqpModule, "createConnection", wrapCreateConnection);
return amqpModule;

Expand All @@ -39,7 +38,6 @@ module.exports = function(amqpModule, instrumentation, contextManager) {
* @return {*}
*/
function wrapCreateConnection(original) {
console.log("amqp createConnection 拦截触发");
return function() {
let Connection = original.apply(this, arguments);
enhanceConnectionsMethod(Connection, instrumentation, contextManager);
Expand All @@ -58,61 +56,76 @@ module.exports = function(amqpModule, instrumentation, contextManager) {
function enhanceConnectionsMethod(obj, instrumentation, contextManager) {
let connection = obj;
instrumentation.enhanceMethod(obj, "exchange", wrapCreateExchange);
// instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue);
instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue);
return obj;
/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapCreateExchange(original) {
console.log("amqp exchange 拦截触发");
return function() {
let exchange = original.apply(this, arguments);
enhanceExchangeMethod(connection, exchange, instrumentation, contextManager);
return exchange;
};
}

// function wrapCreateQueue(original){
// console.log("amqp Queue 拦截触发");
// return function(){
// let queue = original.apply(this, arguments);
// enhanceQueueMethod(connection,queue, instrumentation, contextManager);
// return queue;
// }
// }
/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapCreateQueue(original) {
return function() {
let queue = original.apply(this, arguments);
enhanceQueueMethod(queue, instrumentation, contextManager);
return queue;
};
}
}

// function enhanceQueueMethod(connection,obj, instrumentation, contextManager){
// let connections = connection;
// let queue = obj;
// instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe);
// return obj;
/**
* filterParams
* @param {obj} obj
* @param {instrumentation} instrumentation
* @param {contextManager} contextManager
* @return {*}
*/
function enhanceQueueMethod(obj, instrumentation, contextManager) {
instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe);
return obj;

/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapQueueSubscribe(original) {
return function(options, messageListener) {
let optionsNew = function(message) {
let contextCarrier = new ContextCarrier();
contextCarrier.fetchBy(function(key) {
if (message.headers.hasOwnProperty(key)) {
return message.headers[key];
}
return undefined;
});

let span = contextManager.createEntrySpan(obj.name, contextCarrier);
span.component(componentDefine.Components.AMQP);
span.spanLayer(layerDefine.Layers.MQ);

// function wrapQueueSubscribe(original){
// console.log("amqp Queue Subscribe 拦截触发");
// return function(options, messageListener){
// console.log(`subscribe----options:${options} `);
// console.log(`subscribe----messageListener:${messageListener} `);
// // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port);
let res = options.apply(this, arguments);
contextManager.finishSpan(span);
return res;
};

// // let contextCarrier = new ContextCarrier();
// // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier);
// // contextCarrier.pushBy(function(key, value) {
// // if (!options.hasOwnProperty("headers") || !options.headers) {
// // options.headers = {};
// // }
// // options.headers[key] = value;
// // });
// // span.component(componentDefine.Components.HTTP);
// // span.spanLayer(layerDefine.Layers.HTTP);
// let result = original.apply(this, arguments);
// // contextManager.finishSpan(span);
// return result;
// }
// };
// }
let result = original.apply(this, [optionsNew, messageListener]);
return result;
};
};
}

/**
* filterParams
Expand All @@ -132,32 +145,24 @@ function enhanceExchangeMethod( connection, obj, instrumentation, contextManager
* @return {*}
*/
function wrapExchangePulish(original) {
console.log("amqp exchange-publish 拦截触发");
return function(routingKey, data, options, callback) {
console.log("amqp wrapRequest function 参数1:"+routingKey);
console.log("amqp wrapRequest function 参数2:"+JSON.stringify(data));
console.log("amqp wrapRequest function connections:"+ connections.options.host+":"+connections.options.port);
let enhanceCallback = callback;
let hasCallback = false;
let contextCarrier = new ContextCarrier();
let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port);
let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port, contextCarrier);
contextCarrier.pushBy(function(key, value) {
if (!data.hasOwnProperty("headers")) {
data.headers = {};
}
data.headers[key] = value;
console.log("添加 ContextCarrier k-v:"+key+":"+value);
});
console.log("amqp wrapRequest function 参数2-2:"+JSON.stringify(data));
span.component(componentDefine.Components.AMQP);
span.spanLayer(layerDefine.Layers.MQ);


if (typeof callback === "function") {
console.log("amqp publish call_back is function");
enhanceCallback = instrumentation.enhanceCallback(span.traceContext(),
contextManager, function() {
console.log(" exchange-publish call_back 触发");
contextManager.finishSpan(span);
return callback.apply(this, arguments);
});
Expand Down

0 comments on commit 9bfe84a

Please sign in to comment.