Skip to content

Commit

Permalink
Move FlowController running field to flowInfo level
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jan 16, 2025
1 parent 24c9332 commit 36f3262
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 18 deletions.
3 changes: 2 additions & 1 deletion C2.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ Contains information about the flow the agent is running, including the versione
"running": true
}
],
"flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
"flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
"running": true
}
```

Expand Down
4 changes: 3 additions & 1 deletion METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ FlowInformation is a system level metric that reports component and queue relate
| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure |
| queue_size | connection_uuid, connection_name | Current queue size |
| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure |
| is_running | processor_uuid, processor_name | Check if the component is running (1 or 0) |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| bytes_read | processor_uuid, processor_name | Number of bytes read by the processor |
| bytes_written | processor_uuid, processor_name | Number of bytes written by the processor |
| flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor |
Expand All @@ -202,6 +202,8 @@ FlowInformation is a system level metric that reports component and queue relate
|-----------------|--------------------------------------------------------------|
| connection_uuid | UUID of the connection defined in the flow configuration |
| connection_name | Name of the connection defined in the flow configuration |
| component_uuid | UUID of the component |
| component_name | Name of the component |
| processor_uuid | UUID of the processor |
| processor_name | Name of the processor |

Expand Down
6 changes: 3 additions & 3 deletions libminifi/include/core/state/nodes/FlowInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ class FlowVersion : public DeviceInformation {
std::shared_ptr<FlowIdentifier> identifier;
};

class FlowInformation : public ResponseNode {
class FlowInformation : public StateMonitorNode {
public:
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: ResponseNode(name, uuid) {
: StateMonitorNode(name, uuid) {
}

explicit FlowInformation(std::string_view name)
: ResponseNode(name) {
: StateMonitorNode(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";
Expand Down
12 changes: 12 additions & 0 deletions libminifi/src/core/state/nodes/FlowInformation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
{.name = "flowId", .value = flow_version_->getFlowId()}
};

if (nullptr != monitor_) {
monitor_->executeOnComponent("FlowController", [&serialized](StateController& component) {
serialized.push_back({.name = "running", .value = component.isRunning()});
});
}

SerializedResponseNode uri;
uri.name = "versionedFlowSnapshotURI";
for (auto &entry : flow_version_->serialize()) {
Expand Down Expand Up @@ -97,6 +103,12 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {

std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
std::vector<PublishedMetric> metrics = connection_store_.calculateConnectionMetrics("FlowInformation");
if (nullptr != monitor_) {
monitor_->executeOnComponent("FlowController", [&metrics](StateController& component) {
metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
{{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}});
});
}

for (const auto& processor : processors_) {
if (!processor) {
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& res
for (auto &con : connections) {
flow_information->updateConnection(con.second);
}

flow_information->setStateMonitor(update_sink_);
if (flow_configuration_) {
flow_information->setFlowVersion(flow_configuration_->getFlowVersion());
}
Expand Down
26 changes: 14 additions & 12 deletions libminifi/test/integration/C2MetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,24 @@ class MetricsHandler: public HeartbeatHandler {
processor["running"].GetBool();
}

static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value& runtime_metrics, const std::string& queue_id) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("flowId") &&
runtime_metrics["flowInfo"].HasMember("running") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) &&
runtime_metrics["flowInfo"].HasMember("processorStatuses");
}

static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") &&
[&]() {
if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) {
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
if (processor_statuses.Size() != 2) {
return false;
}
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) {
if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString());
Expand All @@ -166,17 +172,13 @@ class MetricsHandler: public HeartbeatHandler {
}

static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") &&
return verifyCommonRuntimeMetricNodes(runtime_metrics, "8368e3c8-015a-1003-52ca-83af40ec1332") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
[&]() {
if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) {
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
if (processor_statuses.Size() != 2) {
return false;
}
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) {
if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) {
throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString());
Expand Down

0 comments on commit 36f3262

Please sign in to comment.