From efa4f0c10fb04b6ac0e99a21ffc204018b23415f Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 10 Dec 2024 15:38:25 +0100 Subject: [PATCH 1/6] MINIFICPP-2501 Add processorStatuses C2 metric node to FlowInformation --- C2.md | 362 ++++++++++++++++++ METRICS.md | 34 +- .../cluster/checkers/PrometheusChecker.py | 12 +- libminifi/include/core/Processor.h | 10 + libminifi/include/core/ProcessorMetrics.h | 10 +- .../core/state/nodes/FlowInformation.h | 42 +- .../core/state/nodes/ResponseNodeLoader.h | 2 +- libminifi/include/io/StreamCallback.h | 7 +- .../LineByLineInputOutputStreamCallback.h | 2 +- libminifi/src/core/ProcessGroup.cpp | 1 + libminifi/src/core/ProcessSession.cpp | 45 ++- libminifi/src/core/Processor.cpp | 2 +- libminifi/src/core/ProcessorMetrics.cpp | 28 +- .../src/core/state/nodes/FlowInformation.cpp | 54 +++ .../core/state/nodes/ResponseNodeLoader.cpp | 20 +- .../LineByLineInputOutputStreamCallback.cpp | 21 +- libminifi/test/integration/C2MetricsTest.cpp | 78 +++- .../unit/SingleProcessorTestController.h | 2 +- libminifi/test/libtest/unit/TestBase.cpp | 2 + libminifi/test/unit/MetricsTests.cpp | 73 ++++ 20 files changed, 727 insertions(+), 80 deletions(-) diff --git a/C2.md b/C2.md index ad492ec57a..37ca600cee 100644 --- a/C2.md +++ b/C2.md @@ -26,10 +26,23 @@ options defined are located in minifi.properties. - [Description](#description) - [Configuration](#configuration) - [Base Options](#base-options) + - [Flow Id and URL](#flow-id-and-url) + - [Agent Identifier Fallback](#agent-identifier-fallback) - [Metrics](#metrics) - [UpdatePolicies](#updatepolicies) - [Triggers](#triggers) - [C2 File triggers](#c2-file-triggers) + - [C2 Response Nodes](#c2-response-nodes) + - [AgentInformation](#agentinformation) + - [AgentStatus](#agentstatus) + - [AssetInformation](#assetinformation) + - [BuildInformation](#buildinformation) + - [ConfigurationChecksums](#configurationchecksums) + - [DeviceInfoNode](#deviceinfonode) + - [FlowInformation](#flowinformation) + - [QueueMetrics](#queuemetrics) + - [RepositoryMetrics](#repositorymetrics) + - [Processor Metric Response Nodes](#processor-metric-response-nodes) ## Description @@ -271,3 +284,352 @@ in minifi.properties to activate the file update trigger specify # specifying a trigger nifi.c2.agent.trigger.classes=FileUpdateTrigger nifi.c2.file.watch= + +## C2 Response Nodes + +The following is a list of nodes that can be defined in the minifi.properties file for the C2 heartbeat response as part of the C2 root nodes defined in the `nifi.c2.root.classes` property or in the metrics nodes defined in the tree under `nifi.c2.root.class.definitions` as stated above. + +### AgentInformation + +Contains information about the agent's build, extensions, supported C2 operations and status of its components. + +``` +"agentInfo": { + "agentManifest": { + "buildInfo": { + "compiler": "/usr/lib/ccache/g++", + "flags": " -std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1", + "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758", + "timestamp": 1734001238, + "version": "0.99.1" + }, + "bundles": [ + { + "componentManifest": { + "processors": [ + ... + ] + }, + "artifact": "minifi-civet-extensions", + "group": "org.apache.nifi.minifi", + "version": "0.99.1" + } + ], + "schedulingDefaults": { + "defaultMaxConcurrentTasks": 1, + "defaultRunDurationNanos": 0, + "defaultSchedulingPeriodMillis": 1000, + "defaultSchedulingStrategy": "TIMER_DRIVEN", + "penalizationPeriodMillis": 30000, + "yieldDurationMillis": 1000 + }, + "supportedOperations": [ + { + "type": "acknowledge" + } + ... + ], + "agentType": "cpp", + "identifier": "bH77vXakM0Lkgt8VcDOGZVW3" + }, + "status": { + "repositories": { + "content_repo": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "flow_file_repo": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } + }, + "components": { + "LogAttribute": { + "running": true, + "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" + }, + "GenerateFlowFile": { + "running": true, + "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" + }, + "FlowController": { + "running": true, + "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" + } + }, + "resourceConsumption": { + "cpuUtilization": 0.05, + "memoryUsage": 97955840 + }, + "uptime": 1025 + }, + "agentClass": "test", + "agentManifestHash": "9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77", + "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64" +} +``` + +### AgentStatus + +Contains information about the agent's status, including the status of its components, repositories, and resource consumption. + +``` +"AgentStatus": { + "repositories": { + "repo_name": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "ff": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } + }, + "components": { + "LogAttribute": { + "running": true, + "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" + }, + "GenerateFlowFile": { + "running": true, + "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" + }, + "FlowController": { + "running": true, + "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" + } + }, + "resourceConsumption": { + "cpuUtilization": 0.0028846153846153849, + "memoryUsage": 97955840 + }, + "uptime": 995 +} +``` + +### AssetInformation + +Contains the calculated hash of the assets. + +``` +"resourceInfo": { + "hash": "null" +} +``` + +### BuildInformation + +Contains information about the agent's build. + +``` +"BuildInformation": { + "compiler": { + "compiler_command": "/usr/lib/ccache/g++", + "compiler_flags": " -std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1", + "compiler_version": "11.4.0" + }, + "build_date": "1734001238", + "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758", + "build_version": "0.99.1", + "device_id": "bH77vXakM0Lkgt8VcDOGZVW3" +} +``` + +### ConfigurationChecksums + +Metric node that defines checksums of configuration files in the C2 protocol. + +``` +"configurationChecksums": { + "SHA256": { + "TestC2Metrics.yml": "9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c", + "minifi.properties": "06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df" + } +}, +``` + +### DeviceInfoNode + +Contains information about the device the agent is running on. + +``` +"deviceInfo": { + "systemInfo": { + "cpuLoadAverage": 1.271484375, + "cpuUtilization": 0.06179499754781756, + "machineArch": "x86_64", + "memoryUsage": 12681670656, + "operatingSystem": "Linux", + "physicalMem": 67081129984, + "vCores": 20 + }, + "networkInfo": { + "hostname": "ggyimesi-5570-ubuntu", + "ipAddress": "10.255.0.1" + }, + "identifier": "16475557466943148337" +} +``` + +### FlowInformation + +Contains information about the flow the agent is running, including the versioned flow snapshot URI, queues, components, and processor statuses. + +``` +"flowInfo": { + "versionedFlowSnapshotURI": { + "bucketId": "default", + "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64" + }, + "queues": { + "8368e3c8-015a-1003-52ca-83af40ec1332": { + "dataSize": 40, + "dataSizeMax": 1048576, + "name": "GenerateFlowFile/success/LogAttribute", + "size": 4, + "sizeMax": 0, + "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332" + } + }, + "components": { + "LogAttribute": { + "running": true, + "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" + }, + "GenerateFlowFile": { + "running": true, + "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" + }, + "FlowController": { + "running": true, + "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" + } + }, + "processorStatuses": [ + { + "id": "5128e3c8-015a-1000-79ca-83af40ec1990", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "bytesRead": 0, + "bytesWritten": 0, + "flowFilesIn": 0, + "flowFilesOut": 0, + "bytesIn": 0, + "bytesOut": 0, + "invocations": 0, + "processingNanos": 0, + "activeThreadCount": -1, + "terminatedThreadCount": -1 + }, + { + "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "bytesRead": 0, + "bytesWritten": 40, + "flowFilesIn": 0, + "flowFilesOut": 4, + "bytesIn": 0, + "bytesOut": 40, + "invocations": 4, + "processingNanos": 2119148, + "activeThreadCount": -1, + "terminatedThreadCount": -1 + } + ], + "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64" +} +``` + +### QueueMetrics + +Contains information about the queues in the flow, including the contained data and number of flow files. + +``` +"QueueMetrics": { + "GenerateFlowFile/success/LogAttribute": { + "datasize": "40", + "datasizemax": "1048576", + "queued": "4", + "queuedmax": "0" + } +} +``` + +### RepositoryMetrics + +Contains information about the repositories in the agent, including the number of entries, size, and whether the repository is full. + + +``` +"RepositoryMetrics": { + "repo_name": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "ff": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } +} +``` + +### Processor Metric Response Nodes + +Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. + +``` +"GetTCPMetrics": { + "2438e3c8-015a-1000-79ca-83af40ec1991": { + "AverageOnTriggerRunTime": 0, + "AverageSessionCommitRunTime": 0, + "BytesRead": 0, + "BytesWritten": 0, + "IncomingBytes": 0, + "IncomingFlowFiles": 0, + "LastOnTriggerRunTime": 0, + "LastSessionCommitRunTime": 0, + "OnTriggerInvocations": 11, + "ProcessingNanos": 729328, + "TransferredBytes": 0, + "TransferredFlowFiles": 0 + } +} +``` diff --git a/METRICS.md b/METRICS.md index 7aa53fda7f..e28f77a400 100644 --- a/METRICS.md +++ b/METRICS.md @@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports metrics for the register | rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) | | rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) | -| Label | Description | -|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------| +| Label | Description | +|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------| | repository_name | Name of the reported repository. There are three repositories present with the following names: `flowfile`, `content` and `provenance` | ### DeviceInfoNode @@ -181,13 +181,22 @@ DeviceInfoNode is a system level metric that reports metrics about the system re FlowInformation is a system level metric that reports component and queue related metrics. -| Metric name | Labels | Description | -|----------------------|----------------------------------|--------------------------------------------| -| queue_data_size | connection_uuid, connection_name | Current queue data size | -| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | -| queue_size | connection_uuid, connection_name | Current queue size | -| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | -| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| Metric name | Labels | Description | +|----------------------|----------------------------------|----------------------------------------------------------------------------| +| queue_data_size | connection_uuid, connection_name | Current queue data size | +| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | +| queue_size | connection_uuid, connection_name | Current queue size | +| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| bytes_read | processor_uuid, processor_name | Number of bytes read by the processor | +| bytes_written | processor_uuid, processor_name | Number of bytes written by the processor | +| flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor | +| flow_files_out | processor_uuid, processor_name | Number of flow files transferred to outgoing relationship by the processor | +| bytes_in | processor_uuid, processor_name | Sum of data from the incoming queue processed by the processor | +| bytes_out | processor_uuid, processor_name | Sum of data transferred to outgoing relationship by the processor | +| invocations | processor_uuid, processor_name | Number of times the processor was triggered | +| processing_nanos | processor_uuid, processor_name | Sum of the runtime spent in the processor in nanoseconds | + | Label | Description | |-----------------|--------------------------------------------------------------| @@ -195,6 +204,8 @@ FlowInformation is a system level metric that reports component and queue relate | connection_name | Name of the connection defined in the flow configuration | | component_uuid | UUID of the component | | component_name | Name of the component | +| processor_uuid | UUID of the processor | +| processor_name | Name of the processor | ### AgentStatus @@ -251,6 +262,11 @@ There are general metrics that are available for all processors. Besides these m | transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship | | transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship | | transferred_to_\ | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship | +| incoming_flow_files | metric_class, processor_name, processor_uuid | Number of flow files from the incoming queue processed by the processor | +| incoming_bytes | metric_class, processor_name, processor_uuid | Sum of data from the incoming queue processed by the processor | +| bytes_read | metric_class, processor_name, processor_uuid | Number of bytes read by the processor | +| bytes_written | metric_class, processor_name, processor_uuid | Number of bytes written by the processor | +| processing_nanos | metric_class, processor_name, processor_uuid | Sum of the runtime spent in the processor in nanoseconds | | Label | Description | |----------------|------------------------------------------------------------------------| diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 8c12bc25fd..2c97965a8f 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -60,8 +60,12 @@ def verify_queue_metrics(self): def verify_general_processor_metrics(self, metric_class, processor_name): labels = {'processor_name': processor_name} return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds', - 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \ - self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels) + 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds', + 'minifi_incoming_flow_files', 'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written', + 'minifi_processing_nanos'], metric_class, labels) and \ + self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', + 'minifi_transferred_bytes', 'minifi_processing_nanos'], + metric_class, labels) def verify_getfile_metrics(self, metric_class, processor_name): labels = {'processor_name': processor_name} @@ -69,7 +73,9 @@ def verify_getfile_metrics(self, metric_class, processor_name): self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels) def verify_flow_information_metrics(self): - return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \ + return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max', + 'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out', + 'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \ self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'}) def verify_device_info_node_metrics(self): diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 6f25c176c3..8dfc3db778 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -165,6 +165,14 @@ class Processor : public Connectable, public ConfigurableComponent, public state active_tasks_ = 0; } + std::string getProcessGroupUUIDStr() const { + return process_group_uuid_; + } + + void setProcessGroupUUIDStr(const std::string &uuid) { + process_group_uuid_ = uuid; + } + void yield() override; void yield(std::chrono::steady_clock::duration delta_time); @@ -256,6 +264,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state // an outgoing connection allows us to reach these nodes std::unordered_map> reachable_processors_; + + std::string process_group_uuid_; }; } // namespace core diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h index de5c3d157e..231f2efb4c 100644 --- a/libminifi/include/core/ProcessorMetrics.h +++ b/libminifi/include/core/ProcessorMetrics.h @@ -52,10 +52,16 @@ class ProcessorMetrics : public state::response::ResponseNode { std::chrono::milliseconds getAverageSessionCommitRuntime() const; std::chrono::milliseconds getLastSessionCommitRuntime() const; void addLastSessionCommitRuntime(std::chrono::milliseconds runtime); + std::optional getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const; - std::atomic iterations{0}; + std::atomic invocations{0}; + std::atomic incoming_flow_files{0}; std::atomic transferred_flow_files{0}; + std::atomic incoming_bytes{0}; std::atomic transferred_bytes{0}; + std::atomic bytes_read{0}; + std::atomic bytes_written{0}; + std::atomic processing_nanos{0}; protected: template @@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode { [[nodiscard]] std::unordered_map getCommonLabels() const; static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10; - std::mutex transferred_relationships_mutex_; + mutable std::mutex transferred_relationships_mutex_; std::unordered_map transferred_relationships_; const Processor& source_processor_; Averager on_trigger_runtime_averager_; diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index a605bd8b25..5bfc257e14 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -26,6 +26,7 @@ #include "core/state/nodes/StateMonitor.h" #include "Connection.h" #include "core/state/ConnectionStore.h" +#include "core/Processor.h" namespace org::apache::nifi::minifi::state::response { @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation { std::shared_ptr identifier; }; -class FlowMonitor : public StateMonitorNode { +class FlowInformation : public StateMonitorNode { public: - FlowMonitor(std::string_view name, const utils::Identifier &uuid) + FlowInformation(std::string_view name, const utils::Identifier &uuid) : StateMonitorNode(name, uuid) { } - explicit FlowMonitor(std::string_view name) + explicit FlowInformation(std::string_view name) : StateMonitorNode(name) { } + MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; + + std::string getName() const override { + return "flowInfo"; + } + void setFlowVersion(std::shared_ptr flow_version) { flow_version_ = std::move(flow_version); } @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode { connection_store_.updateConnection(connection); } - protected: - std::shared_ptr flow_version_; - ConnectionStore connection_store_; -}; - -/** - * Justification and Purpose: Provides flow version Information - */ -class FlowInformation : public FlowMonitor { - public: - FlowInformation(std::string_view name, const utils::Identifier &uuid) - : FlowMonitor(name, uuid) { - } - - explicit FlowInformation(std::string_view name) - : FlowMonitor(name) { - } - - MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; - - std::string getName() const override { - return "flowInfo"; + void setProcessors(std::vector processors) { + processors_ = std::move(processors); } std::vector serialize() override; std::vector calculateMetrics() override; + + private: + std::shared_ptr flow_version_; + ConnectionStore connection_store_; + std::vector processors_; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 9eb55f413e..284496c0a3 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -62,7 +62,7 @@ class ResponseNodeLoader { void initializeAgentNode(const SharedResponseNode& response_node) const; void initializeAgentStatus(const SharedResponseNode& response_node) const; void initializeConfigurationChecksums(const SharedResponseNode& response_node) const; - void initializeFlowMonitor(const SharedResponseNode& response_node) const; + void initializeFlowInformation(const SharedResponseNode& response_node) const; void initializeAssetInformation(const SharedResponseNode& response_node) const; std::vector getMatchingComponentMetricsNodes(const std::string& regex_str) const; diff --git a/libminifi/include/io/StreamCallback.h b/libminifi/include/io/StreamCallback.h index 6d4e6c00a6..401fe472bc 100644 --- a/libminifi/include/io/StreamCallback.h +++ b/libminifi/include/io/StreamCallback.h @@ -24,10 +24,15 @@ namespace org::apache::nifi::minifi::io { class InputStream; class OutputStream; +struct ReadWriteResult { + int64_t bytes_written = 0; + int64_t bytes_read = 0; +}; + // FlowFile IO Callback functions for input and output // throw exception for error using InputStreamCallback = std::function& input_stream)>; using OutputStreamCallback = std::function& output_stream)>; -using InputOutputStreamCallback = std::function& input_stream, const std::shared_ptr& output_stream)>; +using InputOutputStreamCallback = std::function& input_stream, const std::shared_ptr& output_stream)>; } // namespace org::apache::nifi::minifi::io diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h index 0d68fe5680..8fef08c31a 100644 --- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h +++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback { public: using CallbackType = std::function; explicit LineByLineInputOutputStreamCallback(CallbackType callback); - int64_t operator()(const std::shared_ptr& input, const std::shared_ptr& output); + io::ReadWriteResult operator()(const std::shared_ptr& input, const std::shared_ptr& output); private: int64_t readInput(io::InputStream& stream); diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 7fdcf175d9..fcf916ceb1 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -86,6 +86,7 @@ std::tuple ProcessGroup::addProcessor(std::unique_ptrgetName(); std::lock_guard lock(mutex_); + processor->setProcessGroupUUIDStr(getUUIDStr()); const auto [iter, inserted] = processors_.insert(std::move(processor)); if (inserted) { logger_->log_debug("Add processor {} into process group {}", name, name_); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 79c02ff63e..7b80f3d936 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -268,6 +268,9 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback& std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow.getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); provenance_report_->modifyContent(flow, details, duration); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -280,6 +283,7 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback& void ProcessSession::writeBuffer(const std::shared_ptr& flow_file, std::span buffer) { writeBuffer(flow_file, as_bytes(buffer)); } + void ProcessSession::writeBuffer(const std::shared_ptr& flow_file, std::span buffer) { write(flow_file, [buffer](const std::shared_ptr& output_stream) { const auto write_status = output_stream->write(buffer); @@ -316,6 +320,9 @@ void ProcessSession::append(const std::shared_ptr &flow, const i throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback)); + if (metrics_) { + metrics_->bytes_written += stream->size() - stream_size_before_callback; + } std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); @@ -373,6 +380,9 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr if (ret < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } + if (metrics_) { + metrics_->bytes_read += ret; + } return ret; } catch (const std::exception& exception) { logger_->log_debug("Caught Exception {}", exception.what()); @@ -383,7 +393,6 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr } } - int64_t ProcessSession::readWrite(const std::shared_ptr &flow, const io::InputOutputStreamCallback& callback) { gsl_Expects(callback); @@ -409,19 +418,23 @@ int64_t ProcessSession::readWrite(const std::shared_ptr &flow, c throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write"); } - int64_t bytes_written = callback(input_stream, output_stream); - if (bytes_written < 0) { + auto read_write_result = callback(input_stream, output_stream); + if (read_write_result.bytes_written < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } input_stream->close(); output_stream->close(); - flow->setSize(gsl::narrow(bytes_written)); + flow->setSize(gsl::narrow(read_write_result.bytes_written)); flow->setOffset(0); flow->setResourceClaim(output_claim); + if (metrics_) { + metrics_->bytes_written += read_write_result.bytes_written; + metrics_->bytes_read += read_write_result.bytes_read; + } - return bytes_written; + return read_write_result.bytes_written; } catch (const std::exception& exception) { logger_->log_debug("Caught exception during process session readWrite, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -486,6 +499,9 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptrgetOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr()); content_stream->close(); + if (metrics_) { + metrics_->bytes_written += content_stream->size(); + } std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); @@ -547,6 +563,9 @@ void ProcessSession::import(const std::string& source, const std::shared_ptrgetUUIDStr()); stream->close(); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } input.close(); if (!keepSource) { (void)std::remove(source.c_str()); @@ -649,6 +668,9 @@ void ProcessSession::import(const std::string& source, std::vectorlog_debug("Import offset {} length {} into content {}, FlowFile UUID {}", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); stream->close(); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); provenance_report_->modifyContent(*flowFile, details, duration); @@ -819,7 +841,7 @@ void ProcessSession::commit() { const auto commit_start_time = std::chrono::steady_clock::now(); try { std::unordered_map transfers; - auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) { + auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) { ++transfers[relationship.getName()].transfer_count; transfers[relationship.getName()].transfer_size += record.getSize(); }; @@ -930,8 +952,11 @@ void ProcessSession::commit() { // persistent the provenance report this->provenance_report_->commit(); logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessorNode()->getName()); - if (metrics_) - metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - commit_start_time)); + if (metrics_) { + auto time_delta = std::chrono::steady_clock::now() - commit_start_time; + metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast(time_delta)); + metrics_->processing_nanos += std::chrono::duration_cast(time_delta).count(); + } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -1141,6 +1166,10 @@ std::shared_ptr ProcessSession::get() { if (flow_version != nullptr) { ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId()); } + if (metrics_) { + metrics_->incoming_bytes += ret->getSize(); + ++metrics_->incoming_flow_files; + } return ret; } current = dynamic_cast(process_context_->getProcessorNode()->pickIncomingConnection()); diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index b5cf616e23..77b8d3fa1d 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const std::shared_ptr& context, } void Processor::trigger(const std::shared_ptr& context, const std::shared_ptr& process_session) { - ++metrics_->iterations; + ++metrics_->invocations; const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - start)); diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp index 8cd0548883..bf30910003 100644 --- a/libminifi/src/core/ProcessorMetrics.cpp +++ b/libminifi/src/core/ProcessorMetrics.cpp @@ -44,13 +44,18 @@ std::vector ProcessorMetrics::serialize state::response::SerializedResponseNode root_node { .name = source_processor_.getUUIDStr(), .children = { - {.name = "OnTriggerInvocations", .value = static_cast(iterations.load())}, + {.name = "OnTriggerInvocations", .value = static_cast(invocations.load())}, {.name = "AverageOnTriggerRunTime", .value = static_cast(getAverageOnTriggerRuntime().count())}, {.name = "LastOnTriggerRunTime", .value = static_cast(getLastOnTriggerRuntime().count())}, {.name = "AverageSessionCommitRunTime", .value = static_cast(getAverageSessionCommitRuntime().count())}, {.name = "LastSessionCommitRunTime", .value = static_cast(getLastSessionCommitRuntime().count())}, {.name = "TransferredFlowFiles", .value = static_cast(transferred_flow_files.load())}, - {.name = "TransferredBytes", .value = transferred_bytes.load()} + {.name = "TransferredBytes", .value = static_cast(transferred_bytes.load())}, + {.name = "IncomingFlowFiles", .value = static_cast(incoming_flow_files.load())}, + {.name = "IncomingBytes", .value = static_cast(incoming_bytes.load())}, + {.name = "BytesRead", .value = static_cast(bytes_read.load())}, + {.name = "BytesWritten", .value = static_cast(bytes_written.load())}, + {.name = "ProcessingNanos", .value = static_cast(processing_nanos.load())} } }; @@ -59,7 +64,7 @@ std::vector ProcessorMetrics::serialize for (const auto& [relationship, count] : transferred_relationships_) { gsl_Expects(!relationship.empty()); state::response::SerializedResponseNode transferred_to_relationship_node; - transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1)); + transferred_to_relationship_node.name = std::string("TransferredTo").append(1, gsl::narrow(toupper(relationship[0]))).append(relationship.substr(1)); transferred_to_relationship_node.value = static_cast(count); root_node.children.push_back(transferred_to_relationship_node); @@ -73,13 +78,18 @@ std::vector ProcessorMetrics::serialize std::vector ProcessorMetrics::calculateMetrics() { std::vector metrics = { - {"onTrigger_invocations", static_cast(iterations.load()), getCommonLabels()}, + {"onTrigger_invocations", static_cast(invocations.load()), getCommonLabels()}, {"average_onTrigger_runtime_milliseconds", static_cast(getAverageOnTriggerRuntime().count()), getCommonLabels()}, {"last_onTrigger_runtime_milliseconds", static_cast(getLastOnTriggerRuntime().count()), getCommonLabels()}, {"average_session_commit_runtime_milliseconds", static_cast(getAverageSessionCommitRuntime().count()), getCommonLabels()}, {"last_session_commit_runtime_milliseconds", static_cast(getLastSessionCommitRuntime().count()), getCommonLabels()}, {"transferred_flow_files", static_cast(transferred_flow_files.load()), getCommonLabels()}, - {"transferred_bytes", static_cast(transferred_bytes.load()), getCommonLabels()} + {"transferred_bytes", static_cast(transferred_bytes.load()), getCommonLabels()}, + {"incoming_flow_files", static_cast(incoming_flow_files.load()), getCommonLabels()}, + {"incoming_bytes", static_cast(incoming_bytes.load()), getCommonLabels()}, + {"bytes_read", static_cast(bytes_read.load()), getCommonLabels()}, + {"bytes_written", static_cast(bytes_written.load()), getCommonLabels()}, + {"processing_nanos", static_cast(processing_nanos.load()), getCommonLabels()} }; { @@ -122,6 +132,14 @@ std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const return session_commit_runtime_averager_.getLastValue(); } +std::optional ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const { + std::lock_guard lock(transferred_relationships_mutex_); + if (transferred_relationships_.contains(relationship)) { + return transferred_relationships_.at(relationship); + } + return {}; +} + template requires Summable && DividableByInteger ValueType ProcessorMetrics::Averager::getAverage() const { diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 5756072acf..0fa8e06686 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -76,6 +76,36 @@ std::vector FlowInformation::serialize() { serialized.push_back(componentsNode); } + if (!processors_.empty()) { + SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", .array = true, .collapsible = false}; + for (const auto processor : processors_) { + if (!processor) { + continue; + } + + auto metrics = processor->getMetrics(); + processorsStatusesNode.children.push_back({ + .name = processor->getName(), + .collapsible = false, + .children = { + {.name = "id", .value = std::string{processor->getUUIDStr()}}, + {.name = "groupId", .value = processor->getProcessGroupUUIDStr()}, + {.name = "bytesRead", .value = metrics->bytes_read.load()}, + {.name = "bytesWritten", .value = metrics->bytes_written.load()}, + {.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()}, + {.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()}, + {.name = "bytesIn", .value = metrics->incoming_bytes.load()}, + {.name = "bytesOut", .value = metrics->transferred_bytes.load()}, + {.name = "invocations", .value = metrics->invocations.load()}, + {.name = "processingNanos", .value = metrics->processing_nanos.load()}, + {.name = "activeThreadCount", .value = -1}, + {.name = "terminatedThreadCount", .value = -1} + } + }); + } + serialized.push_back(processorsStatusesNode); + } + return serialized; } @@ -88,6 +118,30 @@ std::vector FlowInformation::calculateMetrics() { {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); }); } + + for (const auto& processor : processors_) { + if (!processor) { + continue; + } + auto processor_metrics = processor->getMetrics(); + metrics.push_back({"bytes_read", gsl::narrow(processor_metrics->bytes_read.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_written", gsl::narrow(processor_metrics->bytes_written.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"flow_files_in", gsl::narrow(processor_metrics->incoming_flow_files.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"flow_files_out", gsl::narrow(processor_metrics->transferred_flow_files.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_in", gsl::narrow(processor_metrics->incoming_bytes.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_out", gsl::narrow(processor_metrics->transferred_bytes.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"invocations", gsl::narrow(processor_metrics->invocations.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"processing_nanos", gsl::narrow(processor_metrics->processing_nanos.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + } + return metrics; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 68b74da48d..fa50f4e73a 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -209,9 +209,9 @@ void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& re } } -void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const { - auto flowMonitor = dynamic_cast(response_node.get()); - if (flowMonitor == nullptr) { +void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& response_node) const { + auto flow_information = dynamic_cast(response_node.get()); + if (flow_information == nullptr) { return; } @@ -222,11 +222,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& respons } for (auto &con : connections) { - flowMonitor->updateConnection(con.second); + flow_information->updateConnection(con.second); } - flowMonitor->setStateMonitor(update_sink_); + flow_information->setStateMonitor(update_sink_); if (flow_configuration_) { - flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion()); + flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); + } + + if (root_) { + std::vector processors; + root_->getAllProcessors(processors); + flow_information->setProcessors(processors); } } @@ -245,7 +251,7 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: initializeAgentNode(response_node); initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); - initializeFlowMonitor(response_node); + initializeFlowInformation(response_node); initializeAssetInformation(response_node); } return response_nodes; diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp index af2307e5c8..ea993f8c63 100644 --- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp +++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp @@ -26,27 +26,36 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac : callback_(std::move(callback)) { } -int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { +io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { gsl_Expects(input); gsl_Expects(output); + io::ReadWriteResult result; + if (int64_t status = readInput(*input); status <= 0) { - return status; + result.bytes_read = status; + return result; } - std::size_t total_bytes_written_ = 0; + result.bytes_read = gsl::narrow(input_.size()); + + std::size_t total_bytes_written = 0; bool is_first_line = true; readLine(); do { readLine(); std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); const auto bytes_written = output->write(reinterpret_cast(output_line.data()), output_line.size()); - if (io::isError(bytes_written)) { return -1; } - total_bytes_written_ += bytes_written; + if (io::isError(bytes_written)) { + result.bytes_written = gsl::narrow(bytes_written); + return result; + } + total_bytes_written += bytes_written; is_first_line = false; } while (!isLastLine()); - return gsl::narrow(total_bytes_written_); + result.bytes_written = gsl::narrow(total_bytes_written); + return result; } int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) { diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 39a0a19c75..63544174d6 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -92,7 +92,10 @@ class MetricsHandler: public HeartbeatHandler { VERIFY_UPDATED_METRICS }; - static constexpr const char* GETTCP1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; + static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; + static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992"; + static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990"; + static constexpr const char* GENERATE_FLOWFILE_UUID = "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"; static void sendEmptyHeartbeatResponse(struct mg_connection* conn) { mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); @@ -135,7 +138,32 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") && runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && + runtime_metrics["flowInfo"].HasMember("processorStatuses") && + [&]() { + if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { + return false; + } + for (const auto& processor : runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { // NOLINT(readability-use-anyofallof) + if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) { + throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); + } + + if (processor["bytesRead"].GetInt() < 0 || + processor["bytesWritten"].GetInt() < 0 || + processor["flowFilesIn"].GetInt() < 0 || + processor["flowFilesOut"].GetInt() < 0 || + processor["bytesIn"].GetInt() < 0 || + processor["bytesOut"].GetInt() < 0 || + processor["invocations"].GetInt() < 0 || + processor["processingNanos"].GetInt() < 0 || + processor["activeThreadCount"].GetInt() != -1 || + processor["terminatedThreadCount"].GetInt() != -1) { + return false; + } + } + return true; + }(); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { @@ -147,7 +175,32 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") && runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && + runtime_metrics["flowInfo"].HasMember("processorStatuses") && + [&]() { + if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { + return false; + } + for (const auto& processor : runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { // NOLINT(readability-use-anyofallof) + if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) { + throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); + } + + if (processor["bytesRead"].GetInt() < 0 || + processor["bytesWritten"].GetInt() < 0 || + processor["flowFilesIn"].GetInt() < 0 || + processor["flowFilesOut"].GetInt() < 0 || + processor["bytesIn"].GetInt() < 0 || + processor["bytesOut"].GetInt() < 0 || + processor["invocations"].GetInt() < 0 || + processor["processingNanos"].GetInt() < 0 || + processor["activeThreadCount"].GetInt() != -1 || + processor["terminatedThreadCount"].GetInt() != -1) { + return false; + } + } + return true; + }(); } static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) { @@ -169,13 +222,18 @@ class MetricsHandler: public HeartbeatHandler { static bool verifyProcessorMetrics(const rapidjson::Value& processor_metrics) { return processor_metrics.HasMember("GetTCPMetrics") && - processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint() > 0 && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes"); + processor_metrics["GetTCPMetrics"].HasMember(GETTCP_UUID) && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("OnTriggerInvocations") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID]["OnTriggerInvocations"].GetUint() > 0 && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredFlowFiles") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("AverageOnTriggerRunTime") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("LastOnTriggerRunTime") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredBytes") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingFlowFiles") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingBytes") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesRead") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesWritten") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("ProcessingNanos"); } std::atomic_bool& metrics_updated_successfully_; diff --git a/libminifi/test/libtest/unit/SingleProcessorTestController.h b/libminifi/test/libtest/unit/SingleProcessorTestController.h index 0245e3314c..5928c6bc51 100644 --- a/libminifi/test/libtest/unit/SingleProcessorTestController.h +++ b/libminifi/test/libtest/unit/SingleProcessorTestController.h @@ -39,7 +39,7 @@ class SingleProcessorTestController : public TestController { public: explicit SingleProcessorTestController(std::unique_ptr processor) { auto name = processor->getName(); - processor_ = plan->addProcessor(std::move(processor), name); + processor_ = plan->addProcessor(std::move(processor), name, {}); input_ = plan->addConnection(nullptr, core::Relationship{"success", "success"}, processor_); outgoing_connections_ = [this] { std::unordered_map result; diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index ee337986e1..19f5de7980 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -539,11 +539,13 @@ bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& ve if (verify) { auto current_session = std::make_shared(context); + current_session->setMetrics(processor->getMetrics()); process_sessions_.push_back(current_session); verify(context, current_session); current_session->commit(); } else { auto session_factory = std::make_shared(context, [&] (auto current_session) { + current_session->setMetrics(processor->getMetrics()); process_sessions_.push_back(current_session); }); logger_->log_info("Running {}", processor->getName()); diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 16e732c185..a8c4ad5ee7 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -27,6 +27,7 @@ #include "unit/ProvenanceTestHelper.h" #include "unit/DummyProcessor.h" #include "range/v3/algorithm/find_if.hpp" +#include "unit/SingleProcessorTestController.h" using namespace std::literals::chrono_literals; @@ -285,4 +286,76 @@ TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") { REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms); } +class DuplicateContentProcessor : public minifi::core::Processor { + using minifi::core::Processor::Processor; + + public: + DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : Processor(name, uuid) {} + explicit DuplicateContentProcessor(std::string_view name) : Processor(name) {} + static constexpr const char* Description = "A processor that creates two more of the same flow file."; + static constexpr auto Properties = std::array{}; + static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"}; + static constexpr auto Original = core::RelationshipDefinition{"original", "Original FlowFile"}; + static constexpr auto Relationships = std::array{Success, Original}; + static constexpr bool SupportsDynamicProperties = false; + static constexpr bool SupportsDynamicRelationships = false; + static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + static constexpr bool IsSingleThreaded = false; + void initialize() override { + setSupportedRelationships(Relationships); + } + void onTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) override { + auto flow_file = session.get(); + if (!flow_file) { + return; + } + + auto flow_file_copy = session.create(); + std::vector buffer; + session.read(flow_file, [&](const std::shared_ptr& stream) -> int64_t { + buffer.resize(stream->size()); + return gsl::narrow(stream->read(buffer)); + }); + session.write(flow_file_copy, [&](const std::shared_ptr& stream) -> int64_t { + return gsl::narrow(stream->write(buffer)); + }); + session.append(flow_file_copy, [&](const std::shared_ptr& stream) -> int64_t { + return gsl::narrow(stream->write(buffer)); + }); + session.transfer(flow_file_copy, Success); + session.transfer(flow_file, Original); + } + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS +}; + +TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") { + minifi::test::SingleProcessorTestController test_controller(std::make_unique("DuplicateContentProcessor")); + test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); + auto metrics = test_controller.getProcessor()->getMetrics(); + CHECK(metrics->invocations == 1); + CHECK(metrics->incoming_flow_files == 1); + CHECK(metrics->transferred_flow_files == 2); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1); + CHECK(metrics->incoming_bytes == 10); + CHECK(metrics->transferred_bytes == 30); + CHECK(metrics->bytes_read == 10); + CHECK(metrics->bytes_written == 20); + auto old_nanos = metrics->processing_nanos.load(); + CHECK(metrics->processing_nanos > 0); + + test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}}); + CHECK(metrics->invocations == 2); + CHECK(metrics->incoming_flow_files == 2); + CHECK(metrics->transferred_flow_files == 4); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2); + CHECK(metrics->incoming_bytes == 24); + CHECK(metrics->transferred_bytes == 72); + CHECK(metrics->bytes_read == 24); + CHECK(metrics->bytes_written == 48); + CHECK(metrics->processing_nanos > old_nanos); +} + + } // namespace org::apache::nifi::minifi::test From 8a5bc2585241b1fa27feef78314e6bf8f104f611 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 15 Jan 2025 13:54:05 +0100 Subject: [PATCH 2/6] Move component running field to processorStatus --- C2.md | 20 ++++--------------- .../src/core/state/nodes/FlowInformation.cpp | 17 +++++----------- libminifi/test/integration/C2MetricsTest.cpp | 14 ++++--------- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/C2.md b/C2.md index 37ca600cee..9697dae5ef 100644 --- a/C2.md +++ b/C2.md @@ -517,20 +517,6 @@ Contains information about the flow the agent is running, including the versione "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332" } }, - "components": { - "LogAttribute": { - "running": true, - "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" - }, - "GenerateFlowFile": { - "running": true, - "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" - }, - "FlowController": { - "running": true, - "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" - } - }, "processorStatuses": [ { "id": "5128e3c8-015a-1000-79ca-83af40ec1990", @@ -544,7 +530,8 @@ Contains information about the flow the agent is running, including the versione "invocations": 0, "processingNanos": 0, "activeThreadCount": -1, - "terminatedThreadCount": -1 + "terminatedThreadCount": -1, + "running": true }, { "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f", @@ -558,7 +545,8 @@ Contains information about the flow the agent is running, including the versione "invocations": 4, "processingNanos": 2119148, "activeThreadCount": -1, - "terminatedThreadCount": -1 + "terminatedThreadCount": -1, + "running": true } ], "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64" diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 0fa8e06686..38da4d4005 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -61,19 +61,11 @@ std::vector FlowInformation::serialize() { serialized.push_back(queues); } + std::unordered_map processors_running; if (nullptr != monitor_) { - SerializedResponseNode componentsNode{.name = "components", .collapsible = false}; - monitor_->executeOnAllComponents([&componentsNode](StateController& component){ - componentsNode.children.push_back({ - .name = component.getComponentName(), - .collapsible = false, - .children = { - {.name = "running", .value = component.isRunning()}, - {.name = "uuid", .value = std::string{component.getComponentUUID().to_string()}} - } - }); + monitor_->executeOnAllComponents([&processors_running](StateController& component){ + processors_running[component.getComponentUUID().to_string()] = component.isRunning(); }); - serialized.push_back(componentsNode); } if (!processors_.empty()) { @@ -99,7 +91,8 @@ std::vector FlowInformation::serialize() { {.name = "invocations", .value = metrics->invocations.load()}, {.name = "processingNanos", .value = metrics->processing_nanos.load()}, {.name = "activeThreadCount", .value = -1}, - {.name = "terminatedThreadCount", .value = -1} + {.name = "terminatedThreadCount", .value = -1}, + {.name = "running", .value = (processors_running.contains(processor->getUUIDStr()) ? processors_running[processor->getUUIDStr()] : false)} } }); } diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 63544174d6..46cadbbfe4 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -134,11 +134,7 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics.HasMember("flowInfo") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"].HasMember("components") && runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") && - runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && - runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && runtime_metrics["flowInfo"].HasMember("processorStatuses") && [&]() { if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { @@ -158,7 +154,8 @@ class MetricsHandler: public HeartbeatHandler { processor["invocations"].GetInt() < 0 || processor["processingNanos"].GetInt() < 0 || processor["activeThreadCount"].GetInt() != -1 || - processor["terminatedThreadCount"].GetInt() != -1) { + processor["terminatedThreadCount"].GetInt() != -1 || + !processor["running"].GetBool()) { return false; } } @@ -171,11 +168,7 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics.HasMember("flowInfo") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"].HasMember("components") && runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") && - runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && - runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && runtime_metrics["flowInfo"].HasMember("processorStatuses") && [&]() { if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { @@ -195,7 +188,8 @@ class MetricsHandler: public HeartbeatHandler { processor["invocations"].GetInt() < 0 || processor["processingNanos"].GetInt() < 0 || processor["activeThreadCount"].GetInt() != -1 || - processor["terminatedThreadCount"].GetInt() != -1) { + processor["terminatedThreadCount"].GetInt() != -1 || + !processor["running"].GetBool()) { return false; } } From fe98dcba1ed1a3f584766618625a5efd3e369189 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 15 Jan 2025 16:43:15 +0100 Subject: [PATCH 3/6] Review update --- METRICS.md | 4 +--- .../include/core/state/nodes/FlowInformation.h | 6 +++--- .../src/core/state/nodes/FlowInformation.cpp | 18 +++--------------- .../core/state/nodes/ResponseNodeLoader.cpp | 2 +- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/METRICS.md b/METRICS.md index e28f77a400..3c891fcc4b 100644 --- a/METRICS.md +++ b/METRICS.md @@ -187,7 +187,7 @@ FlowInformation is a system level metric that reports component and queue relate | queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | | queue_size | connection_uuid, connection_name | Current queue size | | queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | -| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| is_running | processor_uuid, processor_name | Check if the component is running (1 or 0) | | bytes_read | processor_uuid, processor_name | Number of bytes read by the processor | | bytes_written | processor_uuid, processor_name | Number of bytes written by the processor | | flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor | @@ -202,8 +202,6 @@ FlowInformation is a system level metric that reports component and queue relate |-----------------|--------------------------------------------------------------| | connection_uuid | UUID of the connection defined in the flow configuration | | connection_name | Name of the connection defined in the flow configuration | -| component_uuid | UUID of the component | -| component_name | Name of the component | | processor_uuid | UUID of the processor | | processor_name | Name of the processor | diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 5bfc257e14..442e1007bc 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -93,14 +93,14 @@ class FlowVersion : public DeviceInformation { std::shared_ptr identifier; }; -class FlowInformation : public StateMonitorNode { +class FlowInformation : public ResponseNode { public: FlowInformation(std::string_view name, const utils::Identifier &uuid) - : StateMonitorNode(name, uuid) { + : ResponseNode(name, uuid) { } explicit FlowInformation(std::string_view name) - : StateMonitorNode(name) { + : ResponseNode(name) { } MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 38da4d4005..1331b93bd1 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -61,13 +61,6 @@ std::vector FlowInformation::serialize() { serialized.push_back(queues); } - std::unordered_map processors_running; - if (nullptr != monitor_) { - monitor_->executeOnAllComponents([&processors_running](StateController& component){ - processors_running[component.getComponentUUID().to_string()] = component.isRunning(); - }); - } - if (!processors_.empty()) { SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", .array = true, .collapsible = false}; for (const auto processor : processors_) { @@ -92,7 +85,7 @@ std::vector FlowInformation::serialize() { {.name = "processingNanos", .value = metrics->processing_nanos.load()}, {.name = "activeThreadCount", .value = -1}, {.name = "terminatedThreadCount", .value = -1}, - {.name = "running", .value = (processors_running.contains(processor->getUUIDStr()) ? processors_running[processor->getUUIDStr()] : false)} + {.name = "running", .value = processor->isRunning()} } }); } @@ -105,13 +98,6 @@ std::vector FlowInformation::serialize() { std::vector FlowInformation::calculateMetrics() { std::vector metrics = connection_store_.calculateConnectionMetrics("FlowInformation"); - if (nullptr != monitor_) { - monitor_->executeOnAllComponents([&metrics](StateController& component){ - metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), - {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); - }); - } - for (const auto& processor : processors_) { if (!processor) { continue; @@ -133,6 +119,8 @@ std::vector FlowInformation::calculateMetrics() { {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); metrics.push_back({"processing_nanos", gsl::narrow(processor_metrics->processing_nanos.load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); } return metrics; diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index fa50f4e73a..bfb4d35ad7 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -224,7 +224,7 @@ void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& res for (auto &con : connections) { flow_information->updateConnection(con.second); } - flow_information->setStateMonitor(update_sink_); + if (flow_configuration_) { flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); } From 2723bace043147136941c68f86216fcd61d74a36 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 16 Jan 2025 10:51:36 +0100 Subject: [PATCH 4/6] Review update --- C2.md | 2 +- .../cluster/checkers/PrometheusChecker.py | 2 +- libminifi/src/core/ProcessorMetrics.cpp | 5 +- libminifi/test/integration/C2MetricsTest.cpp | 57 ++++++++----------- 4 files changed, 28 insertions(+), 38 deletions(-) diff --git a/C2.md b/C2.md index 9697dae5ef..b5d66c7275 100644 --- a/C2.md +++ b/C2.md @@ -601,7 +601,7 @@ Contains information about the repositories in the agent, including the number o ### Processor Metric Response Nodes -Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. +Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the \Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. ``` "GetTCPMetrics": { diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 2c97965a8f..04ab1ad645 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -76,7 +76,7 @@ def verify_flow_information_metrics(self): return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max', 'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out', 'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \ - self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'}) + self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'processor_name': 'FlowController'}) def verify_device_info_node_metrics(self): return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization', 'minifi_cpu_load_average'], 'DeviceInfoNode') diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp index bf30910003..95e00c4872 100644 --- a/libminifi/src/core/ProcessorMetrics.cpp +++ b/libminifi/src/core/ProcessorMetrics.cpp @@ -134,8 +134,9 @@ std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const std::optional ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const { std::lock_guard lock(transferred_relationships_mutex_); - if (transferred_relationships_.contains(relationship)) { - return transferred_relationships_.at(relationship); + const auto relationship_it = transferred_relationships_.find(relationship); + if (relationship_it != std::end(transferred_relationships_)) { + return relationship_it->second; } return {}; } diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 46cadbbfe4..4447019df0 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "unit/TestBase.h" #include "integration/HTTPIntegrationBase.h" @@ -129,6 +130,20 @@ class MetricsHandler: public HeartbeatHandler { } } + static bool processorMetricsAreValid(const auto& processor) { + return processor["bytesRead"].GetInt() >= 0 && + processor["bytesWritten"].GetInt() >= 0 && + processor["flowFilesIn"].GetInt() >= 0 && + processor["flowFilesOut"].GetInt() >= 0 && + processor["bytesIn"].GetInt() >= 0 && + processor["bytesOut"].GetInt() >= 0 && + processor["invocations"].GetInt() >= 0 && + processor["processingNanos"].GetInt() >= 0 && + processor["activeThreadCount"].GetInt() == -1 && + processor["terminatedThreadCount"].GetInt() == -1 && + processor["running"].GetBool(); + } + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return runtime_metrics.HasMember("deviceInfo") && runtime_metrics.HasMember("flowInfo") && @@ -140,26 +155,13 @@ class MetricsHandler: public HeartbeatHandler { if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { return false; } - for (const auto& processor : runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { // NOLINT(readability-use-anyofallof) + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) { throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); } - - if (processor["bytesRead"].GetInt() < 0 || - processor["bytesWritten"].GetInt() < 0 || - processor["flowFilesIn"].GetInt() < 0 || - processor["flowFilesOut"].GetInt() < 0 || - processor["bytesIn"].GetInt() < 0 || - processor["bytesOut"].GetInt() < 0 || - processor["invocations"].GetInt() < 0 || - processor["processingNanos"].GetInt() < 0 || - processor["activeThreadCount"].GetInt() != -1 || - processor["terminatedThreadCount"].GetInt() != -1 || - !processor["running"].GetBool()) { - return false; - } - } - return true; + return processorMetricsAreValid(processor); + }); }(); } @@ -174,26 +176,13 @@ class MetricsHandler: public HeartbeatHandler { if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { return false; } - for (const auto& processor : runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { // NOLINT(readability-use-anyofallof) + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) { throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); } - - if (processor["bytesRead"].GetInt() < 0 || - processor["bytesWritten"].GetInt() < 0 || - processor["flowFilesIn"].GetInt() < 0 || - processor["flowFilesOut"].GetInt() < 0 || - processor["bytesIn"].GetInt() < 0 || - processor["bytesOut"].GetInt() < 0 || - processor["invocations"].GetInt() < 0 || - processor["processingNanos"].GetInt() < 0 || - processor["activeThreadCount"].GetInt() != -1 || - processor["terminatedThreadCount"].GetInt() != -1 || - !processor["running"].GetBool()) { - return false; - } - } - return true; + return processorMetricsAreValid(processor); + }); }(); } From 24c93329eec9e7a505d318804ba9b78589d5c9d3 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 16 Jan 2025 11:52:57 +0100 Subject: [PATCH 5/6] Review update --- C2.md | 2 +- .../integration/cluster/checkers/PrometheusChecker.py | 2 +- libminifi/include/io/StreamCallback.h | 3 ++- .../utils/LineByLineInputOutputStreamCallback.h | 2 +- libminifi/src/core/ProcessSession.cpp | 10 +++++----- .../src/utils/LineByLineInputOutputStreamCallback.cpp | 9 +++++---- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/C2.md b/C2.md index b5d66c7275..227ded7c3c 100644 --- a/C2.md +++ b/C2.md @@ -601,7 +601,7 @@ Contains information about the repositories in the agent, including the number o ### Processor Metric Response Nodes -Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the \Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. +Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the \Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. ``` "GetTCPMetrics": { diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 04ab1ad645..2c97965a8f 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -76,7 +76,7 @@ def verify_flow_information_metrics(self): return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max', 'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out', 'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \ - self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'processor_name': 'FlowController'}) + self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'}) def verify_device_info_node_metrics(self): return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization', 'minifi_cpu_load_average'], 'DeviceInfoNode') diff --git a/libminifi/include/io/StreamCallback.h b/libminifi/include/io/StreamCallback.h index 401fe472bc..8467c0f966 100644 --- a/libminifi/include/io/StreamCallback.h +++ b/libminifi/include/io/StreamCallback.h @@ -18,6 +18,7 @@ #include #include +#include namespace org::apache::nifi::minifi::io { @@ -33,6 +34,6 @@ struct ReadWriteResult { // throw exception for error using InputStreamCallback = std::function& input_stream)>; using OutputStreamCallback = std::function& output_stream)>; -using InputOutputStreamCallback = std::function& input_stream, const std::shared_ptr& output_stream)>; +using InputOutputStreamCallback = std::function(const std::shared_ptr& input_stream, const std::shared_ptr& output_stream)>; } // namespace org::apache::nifi::minifi::io diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h index 8fef08c31a..b52821d8ef 100644 --- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h +++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback { public: using CallbackType = std::function; explicit LineByLineInputOutputStreamCallback(CallbackType callback); - io::ReadWriteResult operator()(const std::shared_ptr& input, const std::shared_ptr& output); + std::optional operator()(const std::shared_ptr& input, const std::shared_ptr& output); private: int64_t readInput(io::InputStream& stream); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 7b80f3d936..15941f0b77 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -419,22 +419,22 @@ int64_t ProcessSession::readWrite(const std::shared_ptr &flow, c } auto read_write_result = callback(input_stream, output_stream); - if (read_write_result.bytes_written < 0) { + if (!read_write_result) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } input_stream->close(); output_stream->close(); - flow->setSize(gsl::narrow(read_write_result.bytes_written)); + flow->setSize(gsl::narrow(read_write_result->bytes_written)); flow->setOffset(0); flow->setResourceClaim(output_claim); if (metrics_) { - metrics_->bytes_written += read_write_result.bytes_written; - metrics_->bytes_read += read_write_result.bytes_read; + metrics_->bytes_written += read_write_result->bytes_written; + metrics_->bytes_read += read_write_result->bytes_read; } - return read_write_result.bytes_written; + return read_write_result->bytes_written; } catch (const std::exception& exception) { logger_->log_debug("Caught exception during process session readWrite, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp index ea993f8c63..6be8b7fd3a 100644 --- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp +++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp @@ -26,14 +26,16 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac : callback_(std::move(callback)) { } -io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { +std::optional LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { gsl_Expects(input); gsl_Expects(output); io::ReadWriteResult result; if (int64_t status = readInput(*input); status <= 0) { - result.bytes_read = status; + if (status < 0) { + return std::nullopt; + } return result; } @@ -47,8 +49,7 @@ io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const std::s std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); const auto bytes_written = output->write(reinterpret_cast(output_line.data()), output_line.size()); if (io::isError(bytes_written)) { - result.bytes_written = gsl::narrow(bytes_written); - return result; + return std::nullopt; } total_bytes_written += bytes_written; is_first_line = false; From 36f3262ba5901b9f59828db052cb9ac1a5bdef2e Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 16 Jan 2025 13:37:02 +0100 Subject: [PATCH 6/6] Move FlowController running field to flowInfo level --- C2.md | 3 ++- METRICS.md | 4 ++- .../core/state/nodes/FlowInformation.h | 6 ++--- .../src/core/state/nodes/FlowInformation.cpp | 12 +++++++++ .../core/state/nodes/ResponseNodeLoader.cpp | 2 +- libminifi/test/integration/C2MetricsTest.cpp | 26 ++++++++++--------- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/C2.md b/C2.md index 227ded7c3c..8f36f6c7d8 100644 --- a/C2.md +++ b/C2.md @@ -549,7 +549,8 @@ Contains information about the flow the agent is running, including the versione "running": true } ], - "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64" + "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64", + "running": true } ``` diff --git a/METRICS.md b/METRICS.md index 3c891fcc4b..e28f77a400 100644 --- a/METRICS.md +++ b/METRICS.md @@ -187,7 +187,7 @@ FlowInformation is a system level metric that reports component and queue relate | queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | | queue_size | connection_uuid, connection_name | Current queue size | | queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | -| is_running | processor_uuid, processor_name | Check if the component is running (1 or 0) | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | | bytes_read | processor_uuid, processor_name | Number of bytes read by the processor | | bytes_written | processor_uuid, processor_name | Number of bytes written by the processor | | flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor | @@ -202,6 +202,8 @@ FlowInformation is a system level metric that reports component and queue relate |-----------------|--------------------------------------------------------------| | connection_uuid | UUID of the connection defined in the flow configuration | | connection_name | Name of the connection defined in the flow configuration | +| component_uuid | UUID of the component | +| component_name | Name of the component | | processor_uuid | UUID of the processor | | processor_name | Name of the processor | diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 442e1007bc..5bfc257e14 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -93,14 +93,14 @@ class FlowVersion : public DeviceInformation { std::shared_ptr identifier; }; -class FlowInformation : public ResponseNode { +class FlowInformation : public StateMonitorNode { public: FlowInformation(std::string_view name, const utils::Identifier &uuid) - : ResponseNode(name, uuid) { + : StateMonitorNode(name, uuid) { } explicit FlowInformation(std::string_view name) - : ResponseNode(name) { + : StateMonitorNode(name) { } MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 1331b93bd1..b8066b2a86 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -34,6 +34,12 @@ std::vector FlowInformation::serialize() { {.name = "flowId", .value = flow_version_->getFlowId()} }; + if (nullptr != monitor_) { + monitor_->executeOnComponent("FlowController", [&serialized](StateController& component) { + serialized.push_back({.name = "running", .value = component.isRunning()}); + }); + } + SerializedResponseNode uri; uri.name = "versionedFlowSnapshotURI"; for (auto &entry : flow_version_->serialize()) { @@ -97,6 +103,12 @@ std::vector FlowInformation::serialize() { std::vector FlowInformation::calculateMetrics() { std::vector metrics = connection_store_.calculateConnectionMetrics("FlowInformation"); + if (nullptr != monitor_) { + monitor_->executeOnComponent("FlowController", [&metrics](StateController& component) { + metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), + {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); + }); + } for (const auto& processor : processors_) { if (!processor) { diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index bfb4d35ad7..fa50f4e73a 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -224,7 +224,7 @@ void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& res for (auto &con : connections) { flow_information->updateConnection(con.second); } - + flow_information->setStateMonitor(update_sink_); if (flow_configuration_) { flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); } diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 4447019df0..6398446fbd 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -144,18 +144,24 @@ class MetricsHandler: public HeartbeatHandler { processor["running"].GetBool(); } - static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { + static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value& runtime_metrics, const std::string& queue_id) { return runtime_metrics.HasMember("deviceInfo") && runtime_metrics.HasMember("flowInfo") && + runtime_metrics["flowInfo"].HasMember("flowId") && + runtime_metrics["flowInfo"].HasMember("running") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") && - runtime_metrics["flowInfo"].HasMember("processorStatuses") && + runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) && + runtime_metrics["flowInfo"].HasMember("processorStatuses"); + } + + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { + return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && [&]() { - if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + if (processor_statuses.Size() != 2) { return false; } - const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) { throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); @@ -166,17 +172,13 @@ class MetricsHandler: public HeartbeatHandler { } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { - return runtime_metrics.HasMember("deviceInfo") && - runtime_metrics.HasMember("flowInfo") && - runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && - runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") && + return verifyCommonRuntimeMetricNodes(runtime_metrics, "8368e3c8-015a-1003-52ca-83af40ec1332") && runtime_metrics["flowInfo"].HasMember("processorStatuses") && [&]() { - if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() != 2) { + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + if (processor_statuses.Size() != 2) { return false; } - const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) { throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString());