Skip to content
This repository has been archived by the owner on Jun 17, 2021. It is now read-only.

Commit

Permalink
add plugin amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
QuanjieDeng committed Oct 13, 2020
1 parent c0e3fa0 commit f3e2d93
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 1 deletion.
174 changes: 174 additions & 0 deletions modules/nodejs-agent/lib/plugins/amqp/amqp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the SkyAPM under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"use strict";

const ContextCarrier = require("skyapm-nodejs/lib/trace/context-carrier");
const layerDefine = require("skyapm-nodejs/lib/trace/span-layer");
const componentDefine = require("skyapm-nodejs/lib/trace/component-define");

/**
*
* @param {amqpModule} amqpModule
* @param {instrumentation} instrumentation
* @param {contextManager} contextManager
* @return {*}
* @author Quanjie.Deng
*/
module.exports = function(amqpModule, instrumentation, contextManager) {
console.log("amqp hook");
instrumentation.enhanceMethod(amqpModule, "createConnection", wrapCreateConnection);
return amqpModule;

/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapCreateConnection(original) {
console.log("amqp createConnection 拦截触发");
return function() {
let Connection = original.apply(this, arguments);
enhanceConnectionsMethod(Connection, instrumentation, contextManager);
return Connection;
};
}
};

/**
* filterParams
* @param {obj} obj
* @param {instrumentation} instrumentation
* @param {contextManager} contextManager
* @return {*}
*/
function enhanceConnectionsMethod(obj, instrumentation, contextManager) {
let connection = obj;
instrumentation.enhanceMethod(obj, "exchange", wrapCreateExchange);
// instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue);
return obj;
/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapCreateExchange(original) {
console.log("amqp exchange 拦截触发");
return function() {
let exchange = original.apply(this, arguments);
enhanceExchangeMethod(connection, exchange, instrumentation, contextManager);
return exchange;
};
}

// function wrapCreateQueue(original){
// console.log("amqp Queue 拦截触发");
// return function(){
// let queue = original.apply(this, arguments);
// enhanceQueueMethod(connection,queue, instrumentation, contextManager);
// return queue;
// }
// }
}

// function enhanceQueueMethod(connection,obj, instrumentation, contextManager){
// let connections = connection;
// let queue = obj;
// instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe);
// return obj;

// function wrapQueueSubscribe(original){
// console.log("amqp Queue Subscribe 拦截触发");
// return function(options, messageListener){
// console.log(`subscribe----options:${options} `);
// console.log(`subscribe----messageListener:${messageListener} `);
// // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port);

// // let contextCarrier = new ContextCarrier();
// // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier);
// // contextCarrier.pushBy(function(key, value) {
// // if (!options.hasOwnProperty("headers") || !options.headers) {
// // options.headers = {};
// // }
// // options.headers[key] = value;
// // });
// // span.component(componentDefine.Components.HTTP);
// // span.spanLayer(layerDefine.Layers.HTTP);
// let result = original.apply(this, arguments);
// // contextManager.finishSpan(span);
// return result;
// }
// };
// }

/**
* filterParams
* @param {endpointName} connection
* @param {obj} obj
* @param {instrumentation} instrumentation
* @param {contextManager} contextManager
* @return {*}
*/
function enhanceExchangeMethod( connection, obj, instrumentation, contextManager) {
let connections = connection;
instrumentation.enhanceMethod( obj, "publish", wrapExchangePulish);
return obj;
/**
* filterParams
* @param {original} original
* @return {*}
*/
function wrapExchangePulish(original) {
console.log("amqp exchange-publish 拦截触发");
return function(routingKey, data, options, callback) {
console.log("amqp wrapRequest function 参数1:"+routingKey);
console.log("amqp wrapRequest function 参数2:"+JSON.stringify(data));
console.log("amqp wrapRequest function connections:"+ connections.options.host+":"+connections.options.port);
let enhanceCallback = callback;
let hasCallback = false;
let contextCarrier = new ContextCarrier();
let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port);
contextCarrier.pushBy(function(key, value) {
if (!data.hasOwnProperty("headers")) {
data.headers = {};
}
data.headers[key] = value;
console.log("添加 ContextCarrier k-v:"+key+":"+value);
});
console.log("amqp wrapRequest function 参数2-2:"+JSON.stringify(data));
span.component(componentDefine.Components.AMQP);
span.spanLayer(layerDefine.Layers.MQ);


if (typeof callback === "function") {
console.log("amqp publish call_back is function");
enhanceCallback = instrumentation.enhanceCallback(span.traceContext(),
contextManager, function() {
console.log(" exchange-publish call_back 触发");
contextManager.finishSpan(span);
return callback.apply(this, arguments);
});
hasCallback = true;
}

let result = original.apply(this, [routingKey, data, options, enhanceCallback]);
if (result && !hasCallback) {
contextManager.finishSpan(span);
}
return result;
};
};
}
37 changes: 37 additions & 0 deletions modules/nodejs-agent/lib/plugins/amqp/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the SkyAPM under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

"use strict";

const Plugin = require("skyapm-nodejs/lib/plugins/plugin");

module.exports = new Plugin("amqp-plugin", "amqp", [{
_name: "amqp",
_description: "Enhance all version of amqp module",
_enhanceModules: ["amqp"],
canEnhance: function(version, enhanceFile) {
console.log("==============amqp canEnhance enhanceFile:"+enhanceFile+" version:"+version);
if (this._enhanceModules.indexOf(enhanceFile) > -1) {
return true;
}
return false;
},
getInterceptor: function(enhanceFile) {
console.log("==============amqp getInterceptor enhanceFile:"+enhanceFile);
return require("./" + enhanceFile);
},
}]);
2 changes: 1 addition & 1 deletion modules/nodejs-agent/lib/plugins/plugin-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

module.exports = PluginManager;
const logger = require("../logger");
const OFFICER_SUPPORTED_MODULE = ["mysql", "http", "egg-core", "egg"];
const OFFICER_SUPPORTED_MODULE = ["mysql", "http", "egg-core", "egg", "amqp"];

/**
*
Expand Down
1 change: 1 addition & 0 deletions modules/nodejs-agent/lib/trace/component-define.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ let Components = function() {
this.HTTP = new OfficeComponent(2, "HTTP");
this.MYSQL = new OfficeComponent(5, "MYSQL");
this.EGG = new OfficeComponent(4003, "Egg");
this.AMQP = new OfficeComponent(4004, "AMQP");
};

Components.instance = null;
Expand Down

0 comments on commit f3e2d93

Please sign in to comment.