From 1246a33211e6e5b88c8a44156d74b1fb831ece1c Mon Sep 17 00:00:00 2001 From: Qiujiao Date: Fri, 24 Feb 2023 15:22:23 +0800 Subject: [PATCH] Add configuration to enable/disable cascading feature and set cascading logs to debug (#1311) --- doc/design/quic-cascading.md | 26 +++- source/agent/conference/agent.toml | 3 + source/agent/conference/conference.js | 151 +++++++++++--------- source/cluster_manager/clusterManager.js | 7 +- source/cluster_manager/cluster_manager.toml | 3 +- source/cluster_manager/index.js | 9 +- source/management_api/api.js | 105 +++++++------- source/management_api/management_api.toml | 5 +- 8 files changed, 182 insertions(+), 127 deletions(-) diff --git a/doc/design/quic-cascading.md b/doc/design/quic-cascading.md index 45c4cbc60..ed22caa37 100644 --- a/doc/design/quic-cascading.md +++ b/doc/design/quic-cascading.md @@ -214,13 +214,37 @@ node . This is just a sample to show how cascading clusters control work, please customize your own scheduling and secured deployment in real practice. +### ManagementAPI configuration + +management_api module will report restful server info to cluster manager and then report to cloud control, so that cloud control could communicate with cluster management_api for restful API, please configure following items in cascading item in ```management_api/management_api.toml```: + +```` +[cascading] +enabled: set true to enable cascading feature, disable cascading by default +servicename: service name used to register to cascading cloud for accessing to communicate with this cluster +```` + ### ClusterManager configuration -The OWT cluster will report cluster info to cloud control service through cluster_manager module in OWT, you need to configure following items in cascading item in cluster_manager/cluster_manager.toml file before starting it: +The OWT cluster will report cluster info to cloud control service through cluster_manager module in OWT, you need to configure following items in cascading item in ```cluster_manager/cluster_manager.toml``` file before starting it: +```` +[cascading] +enabled: set true to enable cascading feature, disable cascading by default url: specify the cloud control url, so that cluster manager module can connect to the url and send cluster info region: specify the region this OWT cluster locate, this will be used by cloud control service to schedule incoming clients by region clusterID: specify a unique cluster ID for this cluster in the cloud. +```` + +### Conference configuration + +conference events will be sent to cascaded clusters through ```conference_agent``` and eventbridge, configure following item in cascading item in ```conference_agent/agent.toml``` to enable event cascading: + +```` +[cascading] +enabled: set true to enable cascading feature, disable cascading by default +```` + ### Media/event bridge diff --git a/source/agent/conference/agent.toml b/source/agent/conference/agent.toml index fc9beaebe..027346291 100644 --- a/source/agent/conference/agent.toml +++ b/source/agent/conference/agent.toml @@ -33,3 +33,6 @@ dataBaseURL = "localhost/owtdb" #default: "localhost/owtdb" [internal] # tcp/sctp available, tcp is default protocol = "tcp" + +[cascading] +enabled = false # disable cascading feature by default diff --git a/source/agent/conference/conference.js b/source/agent/conference/conference.js index 97b8a2a53..017d8ddc8 100644 --- a/source/agent/conference/conference.js +++ b/source/agent/conference/conference.js @@ -219,6 +219,9 @@ var Conference = function (rpcClient, selfRpcId) { */ var clusterID; + var cascadingConfig = global.config.cascading || {}; + var enableCascading = cascadingConfig.enabled || false; + /* * {TrackId: string(StreamId)} @@ -508,7 +511,6 @@ var Conference = function (rpcClient, selfRpcId) { }).then(() => { rpcReq.getClusterID(cluster, room_id, roomToken) .then((id) => { - log.info('Get cluster id:', id); clusterID = id; }).catch((e) => { log.info('Failed to get cluster ID'); @@ -526,7 +528,9 @@ var Conference = function (rpcClient, selfRpcId) { var destroyRoom = function() { const doClean = () => { - sendMsgTo('cascading', 'destroyRoom', {type: 'destroyRoom', data: {rpcId: selfRpcId, room: room_id}}); + if (enableCascading) { + sendMsgTo('cascading', 'destroyRoom', {type: 'destroyRoom', data: {rpcId: selfRpcId, room: room_id}}); + } accessController && accessController.destroy(); accessController = undefined; rtcController && rtcController.destroy(); @@ -538,8 +542,10 @@ var Conference = function (rpcClient, selfRpcId) { participants = {}; selfCleanTimer && clearTimeout(selfCleanTimer); selfCleanTimer = null; - var cluster = global.config.cluster.name || 'owt-cluster'; - rpcReq.leaveConference(cluster, room_id); + if (enableCascading) { + var cluster = global.config.cluster.name || 'owt-cluster'; + rpcReq.leaveConference(cluster, room_id); + } room_id = undefined; }; @@ -569,11 +575,9 @@ var Conference = function (rpcClient, selfRpcId) { const sendMsgTo = function(to, msg, data) { if (to !== 'admin') { if (to === 'cascading') { - log.info("cascading bridges are:", cascadingEventBridges); cascadingEventBridges.forEach((eventBridge) => { - log.info("send message to eventBridge:", eventBridge); if (eventBridge) { - log.info("send message to eventBridge:", eventBridge); + log.debug("send message to eventBridge:", eventBridge); rpcReq.sendMsg(eventBridge, selfRpcId, msg, data); } }); @@ -632,7 +636,9 @@ var Conference = function (rpcClient, selfRpcId) { }, rpcReq); if (room_config.notifying.participantActivities) { sendMsg(participantInfo.id, 'others', 'participant', {action: 'join', data: {id: participantInfo.id, user: participantInfo.user, role: participantInfo.role}}); - !participantInfo.cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addParticipant", data: participantInfo}); + if (enableCascading) { + !participantInfo.cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addParticipant", data: participantInfo}); + } } return Promise.resolve('ok'); }; @@ -655,7 +661,9 @@ var Conference = function (rpcClient, selfRpcId) { var left_user = participant.getInfo(); if (room_config.notifying.participantActivities) { sendMsg('room', 'all', 'participant', {action: 'leave', data: left_user.id}); - !participants[participantId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "removeParticipant", data: participantId}); + if (enableCascading) { + !participants[participantId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "removeParticipant", data: participantId}); + } } delete participants[participantId]; } @@ -720,7 +728,7 @@ var Conference = function (rpcClient, selfRpcId) { } var fwdStream = null; - if (casStreams[id]) { + if (enableCascading && casStreams[id]) { fwdStream = new CascadedStream(id, media, data, info, locality, casStreams[id].cluster, true, casStreams[id].bridge); } else { fwdStream = new ForwardStream(id, media, data, info, locality); @@ -759,10 +767,12 @@ var Conference = function (rpcClient, selfRpcId) { setTimeout(() => { if (room_config.notifying.streamChange) { sendMsg('room', 'all', 'stream', {id: id, status: 'add', data: fwdStream.toPortalFormat()}); - if (casStreams[id]) { - delete casStreams[id]; - } else { - sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addStream", data: {cluster:clusterID, id: id, media: media, data: data, info: info}}); + if (enableCascading) { + if (casStreams[id]) { + delete casStreams[id]; + } else { + sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addStream", data: {cluster:clusterID, id: id, media: media, data: data, info: info}}); + } } } }, 10); @@ -788,10 +798,12 @@ var Conference = function (rpcClient, selfRpcId) { setTimeout(() => { if (room_config.notifying.streamChange) { sendMsg('room', 'all', 'stream', {id: id, status: 'add', data: fwdStream.toPortalFormat()}); - if (casStreams[id]) { - delete casStreams[id]; - } else { - sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addStream", data: {cluster:clusterID, id: id, media: media, data: data, info: info}}); + if (enableCascading) { + if (casStreams[id]) { + delete casStreams[id]; + } else { + sendMsgTo('cascading', {rpcId: selfRpcId}, {type: "addStream", data: {cluster:clusterID, id: id, media: media, data: data, info: info}}); + } } } }, 10); @@ -808,11 +820,9 @@ var Conference = function (rpcClient, selfRpcId) { }; const updateStreamInfo = (streamId, info) => { - if (casStreams[streamId]) { - log.info("update casStreams info"); + if (enableCascading && casStreams[streamId]) { if (casStreams[streamId].update(info)) { if (room_config.notifying.streamChange) { - log.info("Notify stream change to local participant"); sendMsg('room', 'all', 'stream', { id: streamId, status: 'update', @@ -821,7 +831,6 @@ var Conference = function (rpcClient, selfRpcId) { } } } else { - log.info("update local streams info"); if (!streams[streamId].isInConnecting && streams[streamId].update(info)) { if (room_config.notifying.streamChange) { sendMsg('room', 'all', 'stream', { @@ -830,14 +839,16 @@ var Conference = function (rpcClient, selfRpcId) { data: {field: '.', value: streams[streamId].toPortalFormat()} }); - var cascading = streams[streamId].cascading; - !cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, { - type: 'updateStreamInfo', - data: { - id: streamId, - info: info - } - }); + if (enableCascading) { + var cascading = streams[streamId].cascading; + !cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, { + type: 'updateStreamInfo', + data: { + id: streamId, + info: info + } + }); + } } } } @@ -847,7 +858,7 @@ var Conference = function (rpcClient, selfRpcId) { return new Promise((resolve, reject) => { if (streams[streamId]) { for (var sub_id in subscriptions) { - if (subscriptions[sub_id].info.mediabridge) { + if (enableCascading && subscriptions[sub_id].info.mediabridge) { delete subscriptions[sub_id]; } else { let subTrack = subscriptions[sub_id].media.tracks @@ -871,12 +882,14 @@ var Conference = function (rpcClient, selfRpcId) { setTimeout(() => { if (room_config.notifying.streamChange) { sendMsg('room', 'all', 'stream', {id: streamId, status: 'remove'}); - !cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: 'removeStream', data: streamId}); + if (enableCascading) { + !cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: 'removeStream', data: streamId}); + } } }, 10); } - if (casStreams[streamId]) { + if (enableCascading && casStreams[streamId]) { delete casStreams[streamId]; sendMsg('room', 'all', 'stream', {id: streamId, status: 'remove'}); } @@ -1148,9 +1161,11 @@ var Conference = function (rpcClient, selfRpcId) { } } - for (var stream_id in casStreams) { - if (!casStreams[stream_id].isInConnecting) { - current_streams.push(casStreams[stream_id].toPortalFormat()); + if (enableCascading) { + for (var stream_id in casStreams) { + if (!casStreams[stream_id].isInConnecting) { + current_streams.push(casStreams[stream_id].toPortalFormat()); + } } } @@ -1512,14 +1527,14 @@ var Conference = function (rpcClient, selfRpcId) { }; const initiateMediaCascading = (participantId, publicationId, pubDesc, streamId) => { - log.info("initiateMediaCascading participantId:", participantId, " publicationId", publicationId, " pubDesc", pubDesc, " streamId:", streamId); + log.debug("initiateMediaCascading participantId:", participantId, " publicationId", publicationId, " pubDesc", pubDesc, " streamId:", streamId); var subDesc = pubDesc; subDesc.originType = subDesc.type; subDesc.type = 'mediabridge'; subDesc.room = room_id; subDesc.pubArgs = []; casStreams[streamId].media.tracks.forEach(track => { - log.info('casstream info:', track); + log.debug('casstream info:', track); var pubArg = { id: track.id, type: track.type, @@ -1528,7 +1543,7 @@ var Conference = function (rpcClient, selfRpcId) { subDesc.pubArgs.push(pubArg); }); - log.info("tracks:", subDesc.pubArgs); + log.debug("tracks:", subDesc.pubArgs); var format_preference; subDesc.cluster = casStreams[streamId].cluster; return accessController.initiate(participantId, publicationId, 'in', participants[participantId].getOrigin(), subDesc, format_preference) @@ -1541,7 +1556,7 @@ var Conference = function (rpcClient, selfRpcId) { streams[streamId] = casStreams[streamId]; streams[streamId].cascading = true; delete casStreams[streamId]; - log.info("accessController initiate result is:", result); + log.debug("accessController initiate result is:", result); streams[streamId].locality = result; return Promise.resolve('ok'); @@ -1572,7 +1587,7 @@ var Conference = function (rpcClient, selfRpcId) { } const startSubscribe = (participantId, subscriptionId, subDesc, streamId, callback) => { - log.info("startSubscribe with type:", subDesc.type) + log.debug("startSubscribe with type:", subDesc.type) if (subDesc.type === 'sip') { return addSubscription(subscriptionId, subDesc.locality, subDesc.media, subDesc.data, {owner: participantId, type: 'sip'}, subDesc.transport) .then((result) => { @@ -1600,13 +1615,11 @@ var Conference = function (rpcClient, selfRpcId) { source.optional && source.optional.format && (formatPreference.optional = formatPreference.optional.concat(source.optional.format)); } track.formatPreference = formatPreference; - log.info("startSubscribe with formatPreference:",formatPreference) + log.debug("startSubscribe with formatPreference:",formatPreference) }); } - log.info("initiateSubscription"); initiateSubscription(subscriptionId, subDesc, {owner: participantId, type: subDesc.type}); - log.info("controller.initiate"); return controller.initiate(participantId, subscriptionId, 'out', participants[participantId].getOrigin(), rtcSubInfo) .then((result) => { const releasedSource = rtcSubInfo.tracks ? rtcSubInfo.tracks.find(track => { @@ -1741,15 +1754,14 @@ var Conference = function (rpcClient, selfRpcId) { streamId = subDesc.data.from; } - log.info('subscribe, streamid is:', streamId, 'streams are:', streams, 'casStreams:', casStreams); + log.debug('subscribe, streamid is:', streamId, 'streams are:', streams, 'casStreams:', casStreams); if (!streams[streamId] && casStreams[streamId]) { return initiateMediaCascading(participantId, streamId, subDesc, streamId, callback) .then((result) => { subDesc.type = subDesc.originType; - log.info("startSubscribe participantId:", participantId, " subscriptionId:", subscriptionId, " subDesc:", subDesc, " streamId:", streamId); + log.debug("startSubscribe participantId:", participantId, " subscriptionId:", subscriptionId, " subDesc:", subDesc, " streamId:", streamId); return startSubscribe(participantId, subscriptionId, subDesc, streamId, callback) .then((result) => { - log.info("startSubscribe succeed with result:", result); callback('callback', result); }) }) @@ -1776,7 +1788,7 @@ var Conference = function (rpcClient, selfRpcId) { }); } - log.info("mediabridge addSubscription with media:", subDesc.media); + log.debug("mediabridge addSubscription with media:", subDesc.media); return addSubscription(subscriptionId, subDesc.locality, subDesc.media, subDesc.data, {owner: participantId, type: subDesc.originType, mediabridge: true}, subDesc.transport) .then((result) => { callback('callback', result); @@ -2006,7 +2018,9 @@ var Conference = function (rpcClient, selfRpcId) { sendMsg('room', 'all', 'stream', {status: 'update', id: streamId, data: {field: fieldData, value: status}}); }); - !streams[streamId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: 'setStreamMute', data: {id: streamId, track: track, muted: muted}}); + if (enableCascading) { + !streams[streamId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, {type: 'setStreamMute', data: {id: streamId, track: track, muted: muted}}); + } } return 'ok'; }, function(reason) { @@ -2401,11 +2415,13 @@ var Conference = function (rpcClient, selfRpcId) { data: {field: 'activeInput', value: {id: input, volume: target.volume}} }); - !streams[streamId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, - { - type: 'onAudioActiveness', - data: {activeInputStream: activeInputStream, target: target} - }); + if (enableCascading) { + !streams[streamId].cascading && sendMsgTo('cascading', {rpcId: selfRpcId}, + { + type: 'onAudioActiveness', + data: {activeInputStream: activeInputStream, target: target} + }); + } } } } @@ -2425,10 +2441,6 @@ var Conference = function (rpcClient, selfRpcId) { (participant_id !== 'admin') && result.push(participants[participant_id].getDetail()); } -/* for (var participant_id in casParticipants) { - result.push(casParticipants[participant_id].getDetail()); - }*/ - callback('callback', result); }; @@ -2503,9 +2515,11 @@ var Conference = function (rpcClient, selfRpcId) { } } - for (var stream_id in casStreams) { - if (!casStreams[stream_id].isInConnecting) { - result.push(casStreams[stream_id].toPortalFormat()); + if (enableCascading) { + for (var stream_id in casStreams) { + if (!casStreams[stream_id].isInConnecting) { + result.push(casStreams[stream_id].toPortalFormat()); + } } } @@ -3208,13 +3222,12 @@ var Conference = function (rpcClient, selfRpcId) { }; that.destroy = function(callback) { - log.info('Destroy room:', room_id); destroyRoom(); callback('callback', 'Success'); }; var disconnectCascadingByBridge = function(bridgeId) { - log.info("Disconnect cascading by bridge id:", bridgeId); + log.debug("Disconnect cascading by bridge id:", bridgeId); for (var participant_id in participants) { if (participants[participant_id].getPortal() === bridgeId) { removeParticipant(participant_id); @@ -3257,12 +3270,12 @@ var Conference = function (rpcClient, selfRpcId) { var addCascadingParticipants = function(bridgeId, targetCluster, participantInfo) { participantInfo.cascading = targetCluster; participantInfo.portal = bridgeId; - log.info("Add cascading participant:", participantInfo); + log.debug("Add cascading participant:", participantInfo); addParticipant(participantInfo, participantInfo.permission); } var addCascadingStreams = function(bridgeId, msg) { - log.info("Add cascading stream id:", msg.id, " data:", msg.data, " info:", msg.info, " media:", msg.media); + log.debug("Add cascading stream id:", msg.id, " data:", msg.data, " info:", msg.info, " media:", msg.media); const fwdStream = new CascadedStream(msg.id, msg.media, msg.data, msg.info, null, msg.cluster, false, bridgeId); const errMsg = fwdStream.checkMediaError(); if (errMsg) { @@ -3276,14 +3289,14 @@ var Conference = function (rpcClient, selfRpcId) { var initializeCascading = function(bridgeId, targetCluster, data) { if(data.participants) { for (var pid in data.participants) { - log.info("initialize participant:", data.participants[pid]); + log.debug("initialize participant:", data.participants[pid]); addCascadingParticipants(bridgeId, targetCluster, data.participants[pid]); } } if(data.streams) { for (var sid in data.streams) { - log.info("initialize stream:", data.streams[sid]); + log.debug("initialize stream:", data.streams[sid]); addCascadingStreams(bridgeId, data.streams[sid]); } } @@ -3310,7 +3323,7 @@ var Conference = function (rpcClient, selfRpcId) { } that.handleCascadingEvents = function(bridgeId, targetCluster, events, callback) { - log.info('Handle cascading event: ', events); + log.debug('Handle cascading event: ', events); var result = 'ok'; switch (events.type) { case 'addParticipant': @@ -3359,7 +3372,7 @@ var Conference = function (rpcClient, selfRpcId) { data.participants[pid] = participants[pid].getDetail(); data.participants[pid].origin = participants[pid].getOrigin(); data.participants[pid].portal = participants[pid].getPortal(); - log.info("participant id:", pid, " info:", data.participants[pid]); + log.debug("participant id:", pid, " info:", data.participants[pid]); } } diff --git a/source/cluster_manager/clusterManager.js b/source/cluster_manager/clusterManager.js index f0c676ec1..6ece38f5f 100644 --- a/source/cluster_manager/clusterManager.js +++ b/source/cluster_manager/clusterManager.js @@ -35,8 +35,11 @@ var ClusterManager = function (clusterName, selfId, spec) { function validateUrl(url) { try { - new Url.URL(url); - return true; + if (spec.enableCascading) { + new Url.URL(url); + return true; + } + return false; } catch { return false; } diff --git a/source/cluster_manager/cluster_manager.toml b/source/cluster_manager/cluster_manager.toml index c688cadef..ce7576603 100644 --- a/source/cluster_manager/cluster_manager.toml +++ b/source/cluster_manager/cluster_manager.toml @@ -35,7 +35,8 @@ analytics = "least-used" eventbridge = "last-used" mediabridge = "last-used" -[cloud] +[cascading] +enabled = false #disable cascading feature by default url = "none" #default none:not connect to cascading cloud, specify cloud service url for the cluster to register to cloud region = "BJ" clusterID = "" #A unique cluster ID reporting to cascading cloud diff --git a/source/cluster_manager/index.js b/source/cluster_manager/index.js index 51fe89d05..a211deb3a 100644 --- a/source/cluster_manager/index.js +++ b/source/cluster_manager/index.js @@ -42,9 +42,10 @@ config.rabbit = config.rabbit || {}; config.rabbit.host = config.rabbit.host || 'localhost'; config.rabbit.port = config.rabbit.port || 5672; -config.cloud.url = config.cloud.url; -config.cloud.region = config.cloud.region; -config.cloud.clusterID = config.cloud.clusterID; +config.cascading.enabled = config.cascading.enabled || false; +config.cascading.url = config.cascading.url; +config.cascading.region = config.cascading.region; +config.cascading.clusterID = config.cascading.clusterID; function startup () { var id = Math.floor(Math.random() * 1000000000); @@ -54,7 +55,7 @@ function startup () { checkAliveCount: config.manager.check_alive_count, scheduleKeepTime: config.manager.schedule_reserve_time, strategy: config.strategy, - url: config.cloud.url, region: config.cloud.region, clusterID: config.cloud.clusterID + enableCascading: config.cascading.enabled, url: config.cascading.url, region: config.cascading.region, clusterID: config.cascading.clusterID }; if (config.manager.enable_grpc) { diff --git a/source/management_api/api.js b/source/management_api/api.js index 3cfcf1636..00908c666 100644 --- a/source/management_api/api.js +++ b/source/management_api/api.js @@ -119,7 +119,11 @@ var serverConfig = global.config.server || {}; var cluster = require("cluster"); var serverPort = serverConfig.port || 3000; var numCPUs = serverConfig.numberOfProcess || 1; -var servicename = serverConfig.servicename || 'sampleService'; + + +var cascadingConfig = global.config.cascading || {}; +var enableCascading = cascadingConfig.enabled || false; +var servicename = cascadingConfig.servicename || 'sampleService'; var ip_address; (function getPublicIP() { @@ -179,62 +183,65 @@ if (cluster.isMaster) { var dataAccess = require('./data_access'); dataAccess.token.genKey(); - var registerInfo = undefined; - amqper.connect(global.config.rabbit, function () { - amqper.asRpcClient(function(rpcCli) { - var keepTrying = true; - var trySendInfo = function(attempts) { - if (attempts <= 0) { - log.info("Send register info to cluster manager timeout"); - return - } - log.info("Send restful info to cluster manager:", registerInfo); - rpcCli.remoteCall(cluster_name, 'registerInfo', [registerInfo], { callback : function(result) { - if (result === 'timeout') { - if (keepTrying) { - log.info('Faild to register restful server info, keep trying.'); - setTimeout(function() { trySendInfo(attempts - (result === 'timeout' ? 4 : 1)); }, 1000); - } - } else { - log.info('Send register info to cluster manager succeed'); - keepTrying = false; + if (enableCascading) { + var registerInfo = undefined; + + amqper.connect(global.config.rabbit, function () { + amqper.asRpcClient(function(rpcCli) { + var keepTrying = true; + var trySendInfo = function(attempts) { + if (attempts <= 0) { + log.info("Send register info to cluster manager timeout"); + return } - } }, 1000); - } + log.info("Send restful info to cluster manager:", registerInfo); + rpcCli.remoteCall(cluster_name, 'registerInfo', [registerInfo], { callback : function(result) { + if (result === 'timeout') { + if (keepTrying) { + log.info('Faild to register restful server info, keep trying.'); + setTimeout(function() { trySendInfo(attempts - (result === 'timeout' ? 4 : 1)); }, 1000); + } + } else { + log.info('Send register info to cluster manager succeed'); + keepTrying = false; + } + } }, 1000); + } - if (registerInfo != undefined) { - trySendInfo(5); - } else { - setTimeout(function() { trySendInfo(5); }, 1000); - } + if (registerInfo != undefined) { + trySendInfo(5); + } else { + setTimeout(function() { trySendInfo(5); }, 1000); + } + }, function(reason) { + log.error('Initializing as rpc client failed, reason:', reason); + process.exit(); + }); }, function(reason) { - log.error('Initializing as rpc client failed, reason:', reason); + log.error('Connect to rabbitMQ server failed, reason:', reason); process.exit(); - }); - }, function(reason) { - log.error('Connect to rabbitMQ server failed, reason:', reason); - process.exit(); - }); + }); - dataAccess.service.list(function (err, sers) { - if (err) { - log.warn('Failed to get service:', err.message); - } else { - var serviceToCloud = sers.filter((t) => {return t.name === servicename;}); - log.info('Representing service ', serviceToCloud); - var key = serviceToCloud[0].key; - if (serviceToCloud[0].encrypted === true) { - key = cipher.decrypt(cipher.k, key); - } + dataAccess.service.list(function (err, sers) { + if (err) { + log.warn('Failed to get service:', err.message); + } else { + var serviceToCloud = sers.filter((t) => {return t.name === servicename;}); + log.info('Representing service ', serviceToCloud); + var key = serviceToCloud[0].key; + if (serviceToCloud[0].encrypted === true) { + key = cipher.decrypt(cipher.k, key); + } - registerInfo = { - resturl: url, - servicekey: key, - serviceid: serviceToCloud[0]._id + registerInfo = { + resturl: url, + servicekey: key, + serviceid: serviceToCloud[0]._id + } } - } - }); + }); + } for (var i = 0; i < numCPUs; i++) { cluster.fork(); diff --git a/source/management_api/management_api.toml b/source/management_api/management_api.toml index a33f1b9ad..5d225ead5 100644 --- a/source/management_api/management_api.toml +++ b/source/management_api/management_api.toml @@ -7,7 +7,6 @@ numberOfProcess = 4 #default: 1 enableWebTransport = false #default: false. hostname = "" #default: "" ip_address = "" #default: "" -servicename = "sampleService" #service name used to register to cascading cloud for accessing to communicate with this cluster #enable_grpc = true [cluster] @@ -21,3 +20,7 @@ port = 5672 #default: 5672 [mongo] dataBaseURL = "localhost/owtdb" #default: "localhost/owtdb" + +[cascading] +enabled = false # disable cascading feature by default +servicename = "sampleService" #service name used to register to cascading cloud for accessing to communicate with this cluster