Skip to content

Commit

Permalink
Move component running field to processorStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jan 15, 2025
1 parent efa4f0c commit 8a5bc25
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 38 deletions.
20 changes: 4 additions & 16 deletions C2.md
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,6 @@ Contains information about the flow the agent is running, including the versione
"uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
}
},
"components": {
"LogAttribute": {
"running": true,
"uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
},
"GenerateFlowFile": {
"running": true,
"uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
},
"FlowController": {
"running": true,
"uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
}
},
"processorStatuses": [
{
"id": "5128e3c8-015a-1000-79ca-83af40ec1990",
Expand All @@ -544,7 +530,8 @@ Contains information about the flow the agent is running, including the versione
"invocations": 0,
"processingNanos": 0,
"activeThreadCount": -1,
"terminatedThreadCount": -1
"terminatedThreadCount": -1,
"running": true
},
{
"id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
Expand All @@ -558,7 +545,8 @@ Contains information about the flow the agent is running, including the versione
"invocations": 4,
"processingNanos": 2119148,
"activeThreadCount": -1,
"terminatedThreadCount": -1
"terminatedThreadCount": -1,
"running": true
}
],
"flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
Expand Down
17 changes: 5 additions & 12 deletions libminifi/src/core/state/nodes/FlowInformation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,11 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
serialized.push_back(queues);
}

std::unordered_map<std::string, bool> processors_running;
if (nullptr != monitor_) {
SerializedResponseNode componentsNode{.name = "components", .collapsible = false};
monitor_->executeOnAllComponents([&componentsNode](StateController& component){
componentsNode.children.push_back({
.name = component.getComponentName(),
.collapsible = false,
.children = {
{.name = "running", .value = component.isRunning()},
{.name = "uuid", .value = std::string{component.getComponentUUID().to_string()}}
}
});
monitor_->executeOnAllComponents([&processors_running](StateController& component){
processors_running[component.getComponentUUID().to_string()] = component.isRunning();
});
serialized.push_back(componentsNode);
}

if (!processors_.empty()) {
Expand All @@ -99,7 +91,8 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
{.name = "invocations", .value = metrics->invocations.load()},
{.name = "processingNanos", .value = metrics->processing_nanos.load()},
{.name = "activeThreadCount", .value = -1},
{.name = "terminatedThreadCount", .value = -1}
{.name = "terminatedThreadCount", .value = -1},
{.name = "running", .value = (processors_running.contains(processor->getUUIDStr()) ? processors_running[processor->getUUIDStr()] : false)}
}
});
}
Expand Down
14 changes: 4 additions & 10 deletions libminifi/test/integration/C2MetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ class MetricsHandler: public HeartbeatHandler {
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
[&]() {
if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) {
Expand All @@ -158,7 +154,8 @@ class MetricsHandler: public HeartbeatHandler {
processor["invocations"].GetInt() < 0 ||
processor["processingNanos"].GetInt() < 0 ||
processor["activeThreadCount"].GetInt() != -1 ||
processor["terminatedThreadCount"].GetInt() != -1) {
processor["terminatedThreadCount"].GetInt() != -1 ||
!processor["running"].GetBool()) {
return false;
}
}
Expand All @@ -171,11 +168,7 @@ class MetricsHandler: public HeartbeatHandler {
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
[&]() {
if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) {
Expand All @@ -195,7 +188,8 @@ class MetricsHandler: public HeartbeatHandler {
processor["invocations"].GetInt() < 0 ||
processor["processingNanos"].GetInt() < 0 ||
processor["activeThreadCount"].GetInt() != -1 ||
processor["terminatedThreadCount"].GetInt() != -1) {
processor["terminatedThreadCount"].GetInt() != -1 ||
!processor["running"].GetBool()) {
return false;
}
}
Expand Down

0 comments on commit 8a5bc25

Please sign in to comment.