From f3e2d9306ff73ed134a122eea5378f347c05af7f Mon Sep 17 00:00:00 2001 From: "Quanjie.Deng" Date: Tue, 13 Oct 2020 10:20:11 +0800 Subject: [PATCH 1/3] add plugin amqp --- modules/nodejs-agent/lib/plugins/amqp/amqp.js | 174 ++++++++++++++++++ .../nodejs-agent/lib/plugins/amqp/index.js | 37 ++++ .../lib/plugins/plugin-manager.js | 2 +- .../lib/trace/component-define.js | 1 + 4 files changed, 213 insertions(+), 1 deletion(-) create mode 100755 modules/nodejs-agent/lib/plugins/amqp/amqp.js create mode 100755 modules/nodejs-agent/lib/plugins/amqp/index.js diff --git a/modules/nodejs-agent/lib/plugins/amqp/amqp.js b/modules/nodejs-agent/lib/plugins/amqp/amqp.js new file mode 100755 index 0000000..07304e2 --- /dev/null +++ b/modules/nodejs-agent/lib/plugins/amqp/amqp.js @@ -0,0 +1,174 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +"use strict"; + +const ContextCarrier = require("skyapm-nodejs/lib/trace/context-carrier"); +const layerDefine = require("skyapm-nodejs/lib/trace/span-layer"); +const componentDefine = require("skyapm-nodejs/lib/trace/component-define"); + +/** + * + * @param {amqpModule} amqpModule + * @param {instrumentation} instrumentation + * @param {contextManager} contextManager + * @return {*} + * @author Quanjie.Deng + */ +module.exports = function(amqpModule, instrumentation, contextManager) { + console.log("amqp hook"); + instrumentation.enhanceMethod(amqpModule, "createConnection", wrapCreateConnection); + return amqpModule; + + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapCreateConnection(original) { + console.log("amqp createConnection 拦截触发"); + return function() { + let Connection = original.apply(this, arguments); + enhanceConnectionsMethod(Connection, instrumentation, contextManager); + return Connection; + }; + } +}; + +/** + * filterParams + * @param {obj} obj + * @param {instrumentation} instrumentation + * @param {contextManager} contextManager + * @return {*} + */ +function enhanceConnectionsMethod(obj, instrumentation, contextManager) { + let connection = obj; + instrumentation.enhanceMethod(obj, "exchange", wrapCreateExchange); + // instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue); + return obj; + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapCreateExchange(original) { + console.log("amqp exchange 拦截触发"); + return function() { + let exchange = original.apply(this, arguments); + enhanceExchangeMethod(connection, exchange, instrumentation, contextManager); + return exchange; + }; + } + + // function wrapCreateQueue(original){ + // console.log("amqp Queue 拦截触发"); + // return function(){ + // let queue = original.apply(this, arguments); + // enhanceQueueMethod(connection,queue, instrumentation, contextManager); + // return queue; + // } + // } +} + +// function enhanceQueueMethod(connection,obj, instrumentation, contextManager){ +// let connections = connection; +// let queue = obj; +// instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe); +// return obj; + +// function wrapQueueSubscribe(original){ +// console.log("amqp Queue Subscribe 拦截触发"); +// return function(options, messageListener){ +// console.log(`subscribe----options:${options} `); +// console.log(`subscribe----messageListener:${messageListener} `); +// // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + +// // let contextCarrier = new ContextCarrier(); +// // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier); +// // contextCarrier.pushBy(function(key, value) { +// // if (!options.hasOwnProperty("headers") || !options.headers) { +// // options.headers = {}; +// // } +// // options.headers[key] = value; +// // }); +// // span.component(componentDefine.Components.HTTP); +// // span.spanLayer(layerDefine.Layers.HTTP); +// let result = original.apply(this, arguments); +// // contextManager.finishSpan(span); +// return result; +// } +// }; +// } + +/** + * filterParams + * @param {endpointName} connection + * @param {obj} obj + * @param {instrumentation} instrumentation + * @param {contextManager} contextManager + * @return {*} + */ +function enhanceExchangeMethod( connection, obj, instrumentation, contextManager) { + let connections = connection; + instrumentation.enhanceMethod( obj, "publish", wrapExchangePulish); + return obj; + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapExchangePulish(original) { + console.log("amqp exchange-publish 拦截触发"); + return function(routingKey, data, options, callback) { + console.log("amqp wrapRequest function 参数1:"+routingKey); + console.log("amqp wrapRequest function 参数2:"+JSON.stringify(data)); + console.log("amqp wrapRequest function connections:"+ connections.options.host+":"+connections.options.port); + let enhanceCallback = callback; + let hasCallback = false; + let contextCarrier = new ContextCarrier(); + let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + contextCarrier.pushBy(function(key, value) { + if (!data.hasOwnProperty("headers")) { + data.headers = {}; + } + data.headers[key] = value; + console.log("添加 ContextCarrier k-v:"+key+":"+value); + }); + console.log("amqp wrapRequest function 参数2-2:"+JSON.stringify(data)); + span.component(componentDefine.Components.AMQP); + span.spanLayer(layerDefine.Layers.MQ); + + + if (typeof callback === "function") { + console.log("amqp publish call_back is function"); + enhanceCallback = instrumentation.enhanceCallback(span.traceContext(), + contextManager, function() { + console.log(" exchange-publish call_back 触发"); + contextManager.finishSpan(span); + return callback.apply(this, arguments); + }); + hasCallback = true; + } + + let result = original.apply(this, [routingKey, data, options, enhanceCallback]); + if (result && !hasCallback) { + contextManager.finishSpan(span); + } + return result; + }; + }; +} diff --git a/modules/nodejs-agent/lib/plugins/amqp/index.js b/modules/nodejs-agent/lib/plugins/amqp/index.js new file mode 100755 index 0000000..da02837 --- /dev/null +++ b/modules/nodejs-agent/lib/plugins/amqp/index.js @@ -0,0 +1,37 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +"use strict"; + +const Plugin = require("skyapm-nodejs/lib/plugins/plugin"); + +module.exports = new Plugin("amqp-plugin", "amqp", [{ + _name: "amqp", + _description: "Enhance all version of amqp module", + _enhanceModules: ["amqp"], + canEnhance: function(version, enhanceFile) { + console.log("==============amqp canEnhance enhanceFile:"+enhanceFile+" version:"+version); + if (this._enhanceModules.indexOf(enhanceFile) > -1) { + return true; + } + return false; + }, + getInterceptor: function(enhanceFile) { + console.log("==============amqp getInterceptor enhanceFile:"+enhanceFile); + return require("./" + enhanceFile); + }, +}]); diff --git a/modules/nodejs-agent/lib/plugins/plugin-manager.js b/modules/nodejs-agent/lib/plugins/plugin-manager.js index 1e1e611..7b6e366 100644 --- a/modules/nodejs-agent/lib/plugins/plugin-manager.js +++ b/modules/nodejs-agent/lib/plugins/plugin-manager.js @@ -19,7 +19,7 @@ module.exports = PluginManager; const logger = require("../logger"); -const OFFICER_SUPPORTED_MODULE = ["mysql", "http", "egg-core", "egg"]; +const OFFICER_SUPPORTED_MODULE = ["mysql", "http", "egg-core", "egg", "amqp"]; /** * diff --git a/modules/nodejs-agent/lib/trace/component-define.js b/modules/nodejs-agent/lib/trace/component-define.js index 7da02da..d1f5540 100644 --- a/modules/nodejs-agent/lib/trace/component-define.js +++ b/modules/nodejs-agent/lib/trace/component-define.js @@ -39,6 +39,7 @@ let Components = function() { this.HTTP = new OfficeComponent(2, "HTTP"); this.MYSQL = new OfficeComponent(5, "MYSQL"); this.EGG = new OfficeComponent(4003, "Egg"); + this.AMQP = new OfficeComponent(4004, "AMQP"); }; Components.instance = null; From 9bfe84a649b24cf32dd47a6f58d1e9f055282af5 Mon Sep 17 00:00:00 2001 From: "Quanjie.Deng" Date: Tue, 20 Oct 2020 16:34:49 +0800 Subject: [PATCH 2/3] modif code check --- modules/nodejs-agent/lib/plugins/amqp/amqp.js | 101 +++++++++--------- 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/modules/nodejs-agent/lib/plugins/amqp/amqp.js b/modules/nodejs-agent/lib/plugins/amqp/amqp.js index 07304e2..3998cd5 100755 --- a/modules/nodejs-agent/lib/plugins/amqp/amqp.js +++ b/modules/nodejs-agent/lib/plugins/amqp/amqp.js @@ -29,7 +29,6 @@ const componentDefine = require("skyapm-nodejs/lib/trace/component-define"); * @author Quanjie.Deng */ module.exports = function(amqpModule, instrumentation, contextManager) { - console.log("amqp hook"); instrumentation.enhanceMethod(amqpModule, "createConnection", wrapCreateConnection); return amqpModule; @@ -39,7 +38,6 @@ module.exports = function(amqpModule, instrumentation, contextManager) { * @return {*} */ function wrapCreateConnection(original) { - console.log("amqp createConnection 拦截触发"); return function() { let Connection = original.apply(this, arguments); enhanceConnectionsMethod(Connection, instrumentation, contextManager); @@ -58,7 +56,7 @@ module.exports = function(amqpModule, instrumentation, contextManager) { function enhanceConnectionsMethod(obj, instrumentation, contextManager) { let connection = obj; instrumentation.enhanceMethod(obj, "exchange", wrapCreateExchange); - // instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue); + instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue); return obj; /** * filterParams @@ -66,7 +64,6 @@ function enhanceConnectionsMethod(obj, instrumentation, contextManager) { * @return {*} */ function wrapCreateExchange(original) { - console.log("amqp exchange 拦截触发"); return function() { let exchange = original.apply(this, arguments); enhanceExchangeMethod(connection, exchange, instrumentation, contextManager); @@ -74,45 +71,61 @@ function enhanceConnectionsMethod(obj, instrumentation, contextManager) { }; } - // function wrapCreateQueue(original){ - // console.log("amqp Queue 拦截触发"); - // return function(){ - // let queue = original.apply(this, arguments); - // enhanceQueueMethod(connection,queue, instrumentation, contextManager); - // return queue; - // } - // } + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapCreateQueue(original) { + return function() { + let queue = original.apply(this, arguments); + enhanceQueueMethod(queue, instrumentation, contextManager); + return queue; + }; + } } -// function enhanceQueueMethod(connection,obj, instrumentation, contextManager){ -// let connections = connection; -// let queue = obj; -// instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe); -// return obj; +/** + * filterParams + * @param {obj} obj + * @param {instrumentation} instrumentation + * @param {contextManager} contextManager + * @return {*} + */ +function enhanceQueueMethod(obj, instrumentation, contextManager) { + instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe); + return obj; + + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapQueueSubscribe(original) { + return function(options, messageListener) { + let optionsNew = function(message) { + let contextCarrier = new ContextCarrier(); + contextCarrier.fetchBy(function(key) { + if (message.headers.hasOwnProperty(key)) { + return message.headers[key]; + } + return undefined; + }); + + let span = contextManager.createEntrySpan(obj.name, contextCarrier); + span.component(componentDefine.Components.AMQP); + span.spanLayer(layerDefine.Layers.MQ); -// function wrapQueueSubscribe(original){ -// console.log("amqp Queue Subscribe 拦截触发"); -// return function(options, messageListener){ -// console.log(`subscribe----options:${options} `); -// console.log(`subscribe----messageListener:${messageListener} `); -// // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + let res = options.apply(this, arguments); + contextManager.finishSpan(span); + return res; + }; -// // let contextCarrier = new ContextCarrier(); -// // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier); -// // contextCarrier.pushBy(function(key, value) { -// // if (!options.hasOwnProperty("headers") || !options.headers) { -// // options.headers = {}; -// // } -// // options.headers[key] = value; -// // }); -// // span.component(componentDefine.Components.HTTP); -// // span.spanLayer(layerDefine.Layers.HTTP); -// let result = original.apply(this, arguments); -// // contextManager.finishSpan(span); -// return result; -// } -// }; -// } + let result = original.apply(this, [optionsNew, messageListener]); + return result; + }; + }; +} /** * filterParams @@ -132,32 +145,24 @@ function enhanceExchangeMethod( connection, obj, instrumentation, contextManager * @return {*} */ function wrapExchangePulish(original) { - console.log("amqp exchange-publish 拦截触发"); return function(routingKey, data, options, callback) { - console.log("amqp wrapRequest function 参数1:"+routingKey); - console.log("amqp wrapRequest function 参数2:"+JSON.stringify(data)); - console.log("amqp wrapRequest function connections:"+ connections.options.host+":"+connections.options.port); let enhanceCallback = callback; let hasCallback = false; let contextCarrier = new ContextCarrier(); - let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port, contextCarrier); contextCarrier.pushBy(function(key, value) { if (!data.hasOwnProperty("headers")) { data.headers = {}; } data.headers[key] = value; - console.log("添加 ContextCarrier k-v:"+key+":"+value); }); - console.log("amqp wrapRequest function 参数2-2:"+JSON.stringify(data)); span.component(componentDefine.Components.AMQP); span.spanLayer(layerDefine.Layers.MQ); if (typeof callback === "function") { - console.log("amqp publish call_back is function"); enhanceCallback = instrumentation.enhanceCallback(span.traceContext(), contextManager, function() { - console.log(" exchange-publish call_back 触发"); contextManager.finishSpan(span); return callback.apply(this, arguments); }); From 0eac07781b16dc98dee199a7d0c643b748deb60e Mon Sep 17 00:00:00 2001 From: "Quanjie.Deng" Date: Thu, 22 Oct 2020 10:33:57 +0800 Subject: [PATCH 3/3] del Comment --- modules/nodejs-agent/lib/plugins/amqp/index.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/nodejs-agent/lib/plugins/amqp/index.js b/modules/nodejs-agent/lib/plugins/amqp/index.js index da02837..37ec309 100755 --- a/modules/nodejs-agent/lib/plugins/amqp/index.js +++ b/modules/nodejs-agent/lib/plugins/amqp/index.js @@ -24,14 +24,12 @@ module.exports = new Plugin("amqp-plugin", "amqp", [{ _description: "Enhance all version of amqp module", _enhanceModules: ["amqp"], canEnhance: function(version, enhanceFile) { - console.log("==============amqp canEnhance enhanceFile:"+enhanceFile+" version:"+version); if (this._enhanceModules.indexOf(enhanceFile) > -1) { return true; } return false; }, getInterceptor: function(enhanceFile) { - console.log("==============amqp getInterceptor enhanceFile:"+enhanceFile); return require("./" + enhanceFile); }, }]);