diff --git a/C2.md b/C2.md index ad492ec57a..8f36f6c7d8 100644 --- a/C2.md +++ b/C2.md @@ -26,10 +26,23 @@ options defined are located in minifi.properties. - [Description](#description) - [Configuration](#configuration) - [Base Options](#base-options) + - [Flow Id and URL](#flow-id-and-url) + - [Agent Identifier Fallback](#agent-identifier-fallback) - [Metrics](#metrics) - [UpdatePolicies](#updatepolicies) - [Triggers](#triggers) - [C2 File triggers](#c2-file-triggers) + - [C2 Response Nodes](#c2-response-nodes) + - [AgentInformation](#agentinformation) + - [AgentStatus](#agentstatus) + - [AssetInformation](#assetinformation) + - [BuildInformation](#buildinformation) + - [ConfigurationChecksums](#configurationchecksums) + - [DeviceInfoNode](#deviceinfonode) + - [FlowInformation](#flowinformation) + - [QueueMetrics](#queuemetrics) + - [RepositoryMetrics](#repositorymetrics) + - [Processor Metric Response Nodes](#processor-metric-response-nodes) ## Description @@ -271,3 +284,341 @@ in minifi.properties to activate the file update trigger specify # specifying a trigger nifi.c2.agent.trigger.classes=FileUpdateTrigger nifi.c2.file.watch= + +## C2 Response Nodes + +The following is a list of nodes that can be defined in the minifi.properties file for the C2 heartbeat response as part of the C2 root nodes defined in the `nifi.c2.root.classes` property or in the metrics nodes defined in the tree under `nifi.c2.root.class.definitions` as stated above. + +### AgentInformation + +Contains information about the agent's build, extensions, supported C2 operations and status of its components. + +``` +"agentInfo": { + "agentManifest": { + "buildInfo": { + "compiler": "/usr/lib/ccache/g++", + "flags": " -std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1", + "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758", + "timestamp": 1734001238, + "version": "0.99.1" + }, + "bundles": [ + { + "componentManifest": { + "processors": [ + ... + ] + }, + "artifact": "minifi-civet-extensions", + "group": "org.apache.nifi.minifi", + "version": "0.99.1" + } + ], + "schedulingDefaults": { + "defaultMaxConcurrentTasks": 1, + "defaultRunDurationNanos": 0, + "defaultSchedulingPeriodMillis": 1000, + "defaultSchedulingStrategy": "TIMER_DRIVEN", + "penalizationPeriodMillis": 30000, + "yieldDurationMillis": 1000 + }, + "supportedOperations": [ + { + "type": "acknowledge" + } + ... + ], + "agentType": "cpp", + "identifier": "bH77vXakM0Lkgt8VcDOGZVW3" + }, + "status": { + "repositories": { + "content_repo": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "flow_file_repo": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } + }, + "components": { + "LogAttribute": { + "running": true, + "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" + }, + "GenerateFlowFile": { + "running": true, + "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" + }, + "FlowController": { + "running": true, + "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" + } + }, + "resourceConsumption": { + "cpuUtilization": 0.05, + "memoryUsage": 97955840 + }, + "uptime": 1025 + }, + "agentClass": "test", + "agentManifestHash": "9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77", + "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64" +} +``` + +### AgentStatus + +Contains information about the agent's status, including the status of its components, repositories, and resource consumption. + +``` +"AgentStatus": { + "repositories": { + "repo_name": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "ff": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } + }, + "components": { + "LogAttribute": { + "running": true, + "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990" + }, + "GenerateFlowFile": { + "running": true, + "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f" + }, + "FlowController": { + "running": true, + "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990" + } + }, + "resourceConsumption": { + "cpuUtilization": 0.0028846153846153849, + "memoryUsage": 97955840 + }, + "uptime": 995 +} +``` + +### AssetInformation + +Contains the calculated hash of the assets. + +``` +"resourceInfo": { + "hash": "null" +} +``` + +### BuildInformation + +Contains information about the agent's build. + +``` +"BuildInformation": { + "compiler": { + "compiler_command": "/usr/lib/ccache/g++", + "compiler_flags": " -std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1", + "compiler_version": "11.4.0" + }, + "build_date": "1734001238", + "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758", + "build_version": "0.99.1", + "device_id": "bH77vXakM0Lkgt8VcDOGZVW3" +} +``` + +### ConfigurationChecksums + +Metric node that defines checksums of configuration files in the C2 protocol. + +``` +"configurationChecksums": { + "SHA256": { + "TestC2Metrics.yml": "9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c", + "minifi.properties": "06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df" + } +}, +``` + +### DeviceInfoNode + +Contains information about the device the agent is running on. + +``` +"deviceInfo": { + "systemInfo": { + "cpuLoadAverage": 1.271484375, + "cpuUtilization": 0.06179499754781756, + "machineArch": "x86_64", + "memoryUsage": 12681670656, + "operatingSystem": "Linux", + "physicalMem": 67081129984, + "vCores": 20 + }, + "networkInfo": { + "hostname": "ggyimesi-5570-ubuntu", + "ipAddress": "10.255.0.1" + }, + "identifier": "16475557466943148337" +} +``` + +### FlowInformation + +Contains information about the flow the agent is running, including the versioned flow snapshot URI, queues, components, and processor statuses. + +``` +"flowInfo": { + "versionedFlowSnapshotURI": { + "bucketId": "default", + "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64" + }, + "queues": { + "8368e3c8-015a-1003-52ca-83af40ec1332": { + "dataSize": 40, + "dataSizeMax": 1048576, + "name": "GenerateFlowFile/success/LogAttribute", + "size": 4, + "sizeMax": 0, + "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332" + } + }, + "processorStatuses": [ + { + "id": "5128e3c8-015a-1000-79ca-83af40ec1990", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "bytesRead": 0, + "bytesWritten": 0, + "flowFilesIn": 0, + "flowFilesOut": 0, + "bytesIn": 0, + "bytesOut": 0, + "invocations": 0, + "processingNanos": 0, + "activeThreadCount": -1, + "terminatedThreadCount": -1, + "running": true + }, + { + "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "bytesRead": 0, + "bytesWritten": 40, + "flowFilesIn": 0, + "flowFilesOut": 4, + "bytesIn": 0, + "bytesOut": 40, + "invocations": 4, + "processingNanos": 2119148, + "activeThreadCount": -1, + "terminatedThreadCount": -1, + "running": true + } + ], + "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64", + "running": true +} +``` + +### QueueMetrics + +Contains information about the queues in the flow, including the contained data and number of flow files. + +``` +"QueueMetrics": { + "GenerateFlowFile/success/LogAttribute": { + "datasize": "40", + "datasizemax": "1048576", + "queued": "4", + "queuedmax": "0" + } +} +``` + +### RepositoryMetrics + +Contains information about the repositories in the agent, including the number of entries, size, and whether the repository is full. + + +``` +"RepositoryMetrics": { + "repo_name": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "ff": { + "entryCount": 0, + "full": false, + "maxSize": 0, + "running": true, + "size": 0 + }, + "org::apache::nifi::minifi::core::repository::VolatileContentRepository": { + "entryCount": 4, + "full": false, + "maxSize": 7864320, + "running": true, + "size": 40 + } +} +``` + +### Processor Metric Response Nodes + +Each processor can have its own metrics. These metric nodes can be configured in the minifi.properties by requesting metrics in the \Metric format, for example GetTCPMetrics to request metrics for the GetTCP processors. Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix. For example `processorMetrics/Get.*Metrics` will match all processor metrics that start with Get. + +``` +"GetTCPMetrics": { + "2438e3c8-015a-1000-79ca-83af40ec1991": { + "AverageOnTriggerRunTime": 0, + "AverageSessionCommitRunTime": 0, + "BytesRead": 0, + "BytesWritten": 0, + "IncomingBytes": 0, + "IncomingFlowFiles": 0, + "LastOnTriggerRunTime": 0, + "LastSessionCommitRunTime": 0, + "OnTriggerInvocations": 11, + "ProcessingNanos": 729328, + "TransferredBytes": 0, + "TransferredFlowFiles": 0 + } +} +``` diff --git a/METRICS.md b/METRICS.md index 7aa53fda7f..e28f77a400 100644 --- a/METRICS.md +++ b/METRICS.md @@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports metrics for the register | rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) | | rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) | -| Label | Description | -|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------| +| Label | Description | +|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------| | repository_name | Name of the reported repository. There are three repositories present with the following names: `flowfile`, `content` and `provenance` | ### DeviceInfoNode @@ -181,13 +181,22 @@ DeviceInfoNode is a system level metric that reports metrics about the system re FlowInformation is a system level metric that reports component and queue related metrics. -| Metric name | Labels | Description | -|----------------------|----------------------------------|--------------------------------------------| -| queue_data_size | connection_uuid, connection_name | Current queue data size | -| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | -| queue_size | connection_uuid, connection_name | Current queue size | -| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | -| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| Metric name | Labels | Description | +|----------------------|----------------------------------|----------------------------------------------------------------------------| +| queue_data_size | connection_uuid, connection_name | Current queue data size | +| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure | +| queue_size | connection_uuid, connection_name | Current queue size | +| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| bytes_read | processor_uuid, processor_name | Number of bytes read by the processor | +| bytes_written | processor_uuid, processor_name | Number of bytes written by the processor | +| flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor | +| flow_files_out | processor_uuid, processor_name | Number of flow files transferred to outgoing relationship by the processor | +| bytes_in | processor_uuid, processor_name | Sum of data from the incoming queue processed by the processor | +| bytes_out | processor_uuid, processor_name | Sum of data transferred to outgoing relationship by the processor | +| invocations | processor_uuid, processor_name | Number of times the processor was triggered | +| processing_nanos | processor_uuid, processor_name | Sum of the runtime spent in the processor in nanoseconds | + | Label | Description | |-----------------|--------------------------------------------------------------| @@ -195,6 +204,8 @@ FlowInformation is a system level metric that reports component and queue relate | connection_name | Name of the connection defined in the flow configuration | | component_uuid | UUID of the component | | component_name | Name of the component | +| processor_uuid | UUID of the processor | +| processor_name | Name of the processor | ### AgentStatus @@ -251,6 +262,11 @@ There are general metrics that are available for all processors. Besides these m | transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship | | transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship | | transferred_to_\ | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship | +| incoming_flow_files | metric_class, processor_name, processor_uuid | Number of flow files from the incoming queue processed by the processor | +| incoming_bytes | metric_class, processor_name, processor_uuid | Sum of data from the incoming queue processed by the processor | +| bytes_read | metric_class, processor_name, processor_uuid | Number of bytes read by the processor | +| bytes_written | metric_class, processor_name, processor_uuid | Number of bytes written by the processor | +| processing_nanos | metric_class, processor_name, processor_uuid | Sum of the runtime spent in the processor in nanoseconds | | Label | Description | |----------------|------------------------------------------------------------------------| diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 8c12bc25fd..2c97965a8f 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -60,8 +60,12 @@ def verify_queue_metrics(self): def verify_general_processor_metrics(self, metric_class, processor_name): labels = {'processor_name': processor_name} return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds', - 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \ - self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels) + 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds', + 'minifi_incoming_flow_files', 'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written', + 'minifi_processing_nanos'], metric_class, labels) and \ + self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', + 'minifi_transferred_bytes', 'minifi_processing_nanos'], + metric_class, labels) def verify_getfile_metrics(self, metric_class, processor_name): labels = {'processor_name': processor_name} @@ -69,7 +73,9 @@ def verify_getfile_metrics(self, metric_class, processor_name): self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels) def verify_flow_information_metrics(self): - return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \ + return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max', + 'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out', + 'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \ self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'}) def verify_device_info_node_metrics(self): diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 6f25c176c3..8dfc3db778 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -165,6 +165,14 @@ class Processor : public Connectable, public ConfigurableComponent, public state active_tasks_ = 0; } + std::string getProcessGroupUUIDStr() const { + return process_group_uuid_; + } + + void setProcessGroupUUIDStr(const std::string &uuid) { + process_group_uuid_ = uuid; + } + void yield() override; void yield(std::chrono::steady_clock::duration delta_time); @@ -256,6 +264,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state // an outgoing connection allows us to reach these nodes std::unordered_map> reachable_processors_; + + std::string process_group_uuid_; }; } // namespace core diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h index de5c3d157e..231f2efb4c 100644 --- a/libminifi/include/core/ProcessorMetrics.h +++ b/libminifi/include/core/ProcessorMetrics.h @@ -52,10 +52,16 @@ class ProcessorMetrics : public state::response::ResponseNode { std::chrono::milliseconds getAverageSessionCommitRuntime() const; std::chrono::milliseconds getLastSessionCommitRuntime() const; void addLastSessionCommitRuntime(std::chrono::milliseconds runtime); + std::optional getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const; - std::atomic iterations{0}; + std::atomic invocations{0}; + std::atomic incoming_flow_files{0}; std::atomic transferred_flow_files{0}; + std::atomic incoming_bytes{0}; std::atomic transferred_bytes{0}; + std::atomic bytes_read{0}; + std::atomic bytes_written{0}; + std::atomic processing_nanos{0}; protected: template @@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode { [[nodiscard]] std::unordered_map getCommonLabels() const; static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10; - std::mutex transferred_relationships_mutex_; + mutable std::mutex transferred_relationships_mutex_; std::unordered_map transferred_relationships_; const Processor& source_processor_; Averager on_trigger_runtime_averager_; diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index a605bd8b25..5bfc257e14 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -26,6 +26,7 @@ #include "core/state/nodes/StateMonitor.h" #include "Connection.h" #include "core/state/ConnectionStore.h" +#include "core/Processor.h" namespace org::apache::nifi::minifi::state::response { @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation { std::shared_ptr identifier; }; -class FlowMonitor : public StateMonitorNode { +class FlowInformation : public StateMonitorNode { public: - FlowMonitor(std::string_view name, const utils::Identifier &uuid) + FlowInformation(std::string_view name, const utils::Identifier &uuid) : StateMonitorNode(name, uuid) { } - explicit FlowMonitor(std::string_view name) + explicit FlowInformation(std::string_view name) : StateMonitorNode(name) { } + MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; + + std::string getName() const override { + return "flowInfo"; + } + void setFlowVersion(std::shared_ptr flow_version) { flow_version_ = std::move(flow_version); } @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode { connection_store_.updateConnection(connection); } - protected: - std::shared_ptr flow_version_; - ConnectionStore connection_store_; -}; - -/** - * Justification and Purpose: Provides flow version Information - */ -class FlowInformation : public FlowMonitor { - public: - FlowInformation(std::string_view name, const utils::Identifier &uuid) - : FlowMonitor(name, uuid) { - } - - explicit FlowInformation(std::string_view name) - : FlowMonitor(name) { - } - - MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; - - std::string getName() const override { - return "flowInfo"; + void setProcessors(std::vector processors) { + processors_ = std::move(processors); } std::vector serialize() override; std::vector calculateMetrics() override; + + private: + std::shared_ptr flow_version_; + ConnectionStore connection_store_; + std::vector processors_; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 9eb55f413e..284496c0a3 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -62,7 +62,7 @@ class ResponseNodeLoader { void initializeAgentNode(const SharedResponseNode& response_node) const; void initializeAgentStatus(const SharedResponseNode& response_node) const; void initializeConfigurationChecksums(const SharedResponseNode& response_node) const; - void initializeFlowMonitor(const SharedResponseNode& response_node) const; + void initializeFlowInformation(const SharedResponseNode& response_node) const; void initializeAssetInformation(const SharedResponseNode& response_node) const; std::vector getMatchingComponentMetricsNodes(const std::string& regex_str) const; diff --git a/libminifi/include/io/StreamCallback.h b/libminifi/include/io/StreamCallback.h index 6d4e6c00a6..8467c0f966 100644 --- a/libminifi/include/io/StreamCallback.h +++ b/libminifi/include/io/StreamCallback.h @@ -18,16 +18,22 @@ #include #include +#include namespace org::apache::nifi::minifi::io { class InputStream; class OutputStream; +struct ReadWriteResult { + int64_t bytes_written = 0; + int64_t bytes_read = 0; +}; + // FlowFile IO Callback functions for input and output // throw exception for error using InputStreamCallback = std::function& input_stream)>; using OutputStreamCallback = std::function& output_stream)>; -using InputOutputStreamCallback = std::function& input_stream, const std::shared_ptr& output_stream)>; +using InputOutputStreamCallback = std::function(const std::shared_ptr& input_stream, const std::shared_ptr& output_stream)>; } // namespace org::apache::nifi::minifi::io diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h index 0d68fe5680..b52821d8ef 100644 --- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h +++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback { public: using CallbackType = std::function; explicit LineByLineInputOutputStreamCallback(CallbackType callback); - int64_t operator()(const std::shared_ptr& input, const std::shared_ptr& output); + std::optional operator()(const std::shared_ptr& input, const std::shared_ptr& output); private: int64_t readInput(io::InputStream& stream); diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 7fdcf175d9..fcf916ceb1 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -86,6 +86,7 @@ std::tuple ProcessGroup::addProcessor(std::unique_ptrgetName(); std::lock_guard lock(mutex_); + processor->setProcessGroupUUIDStr(getUUIDStr()); const auto [iter, inserted] = processors_.insert(std::move(processor)); if (inserted) { logger_->log_debug("Add processor {} into process group {}", name, name_); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 79c02ff63e..15941f0b77 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -268,6 +268,9 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback& std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow.getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); provenance_report_->modifyContent(flow, details, duration); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -280,6 +283,7 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback& void ProcessSession::writeBuffer(const std::shared_ptr& flow_file, std::span buffer) { writeBuffer(flow_file, as_bytes(buffer)); } + void ProcessSession::writeBuffer(const std::shared_ptr& flow_file, std::span buffer) { write(flow_file, [buffer](const std::shared_ptr& output_stream) { const auto write_status = output_stream->write(buffer); @@ -316,6 +320,9 @@ void ProcessSession::append(const std::shared_ptr &flow, const i throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback)); + if (metrics_) { + metrics_->bytes_written += stream->size() - stream_size_before_callback; + } std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); @@ -373,6 +380,9 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr if (ret < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } + if (metrics_) { + metrics_->bytes_read += ret; + } return ret; } catch (const std::exception& exception) { logger_->log_debug("Caught Exception {}", exception.what()); @@ -383,7 +393,6 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr } } - int64_t ProcessSession::readWrite(const std::shared_ptr &flow, const io::InputOutputStreamCallback& callback) { gsl_Expects(callback); @@ -409,19 +418,23 @@ int64_t ProcessSession::readWrite(const std::shared_ptr &flow, c throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write"); } - int64_t bytes_written = callback(input_stream, output_stream); - if (bytes_written < 0) { + auto read_write_result = callback(input_stream, output_stream); + if (!read_write_result) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } input_stream->close(); output_stream->close(); - flow->setSize(gsl::narrow(bytes_written)); + flow->setSize(gsl::narrow(read_write_result->bytes_written)); flow->setOffset(0); flow->setResourceClaim(output_claim); + if (metrics_) { + metrics_->bytes_written += read_write_result->bytes_written; + metrics_->bytes_read += read_write_result->bytes_read; + } - return bytes_written; + return read_write_result->bytes_written; } catch (const std::exception& exception) { logger_->log_debug("Caught exception during process session readWrite, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -486,6 +499,9 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptrgetOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr()); content_stream->close(); + if (metrics_) { + metrics_->bytes_written += content_stream->size(); + } std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); @@ -547,6 +563,9 @@ void ProcessSession::import(const std::string& source, const std::shared_ptrgetUUIDStr()); stream->close(); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } input.close(); if (!keepSource) { (void)std::remove(source.c_str()); @@ -649,6 +668,9 @@ void ProcessSession::import(const std::string& source, std::vectorlog_debug("Import offset {} length {} into content {}, FlowFile UUID {}", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); stream->close(); + if (metrics_) { + metrics_->bytes_written += stream->size(); + } std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); provenance_report_->modifyContent(*flowFile, details, duration); @@ -819,7 +841,7 @@ void ProcessSession::commit() { const auto commit_start_time = std::chrono::steady_clock::now(); try { std::unordered_map transfers; - auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) { + auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) { ++transfers[relationship.getName()].transfer_count; transfers[relationship.getName()].transfer_size += record.getSize(); }; @@ -930,8 +952,11 @@ void ProcessSession::commit() { // persistent the provenance report this->provenance_report_->commit(); logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessorNode()->getName()); - if (metrics_) - metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - commit_start_time)); + if (metrics_) { + auto time_delta = std::chrono::steady_clock::now() - commit_start_time; + metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast(time_delta)); + metrics_->processing_nanos += std::chrono::duration_cast(time_delta).count(); + } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -1141,6 +1166,10 @@ std::shared_ptr ProcessSession::get() { if (flow_version != nullptr) { ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId()); } + if (metrics_) { + metrics_->incoming_bytes += ret->getSize(); + ++metrics_->incoming_flow_files; + } return ret; } current = dynamic_cast(process_context_->getProcessorNode()->pickIncomingConnection()); diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index b5cf616e23..77b8d3fa1d 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const std::shared_ptr& context, } void Processor::trigger(const std::shared_ptr& context, const std::shared_ptr& process_session) { - ++metrics_->iterations; + ++metrics_->invocations; const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - start)); diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp index 8cd0548883..95e00c4872 100644 --- a/libminifi/src/core/ProcessorMetrics.cpp +++ b/libminifi/src/core/ProcessorMetrics.cpp @@ -44,13 +44,18 @@ std::vector ProcessorMetrics::serialize state::response::SerializedResponseNode root_node { .name = source_processor_.getUUIDStr(), .children = { - {.name = "OnTriggerInvocations", .value = static_cast(iterations.load())}, + {.name = "OnTriggerInvocations", .value = static_cast(invocations.load())}, {.name = "AverageOnTriggerRunTime", .value = static_cast(getAverageOnTriggerRuntime().count())}, {.name = "LastOnTriggerRunTime", .value = static_cast(getLastOnTriggerRuntime().count())}, {.name = "AverageSessionCommitRunTime", .value = static_cast(getAverageSessionCommitRuntime().count())}, {.name = "LastSessionCommitRunTime", .value = static_cast(getLastSessionCommitRuntime().count())}, {.name = "TransferredFlowFiles", .value = static_cast(transferred_flow_files.load())}, - {.name = "TransferredBytes", .value = transferred_bytes.load()} + {.name = "TransferredBytes", .value = static_cast(transferred_bytes.load())}, + {.name = "IncomingFlowFiles", .value = static_cast(incoming_flow_files.load())}, + {.name = "IncomingBytes", .value = static_cast(incoming_bytes.load())}, + {.name = "BytesRead", .value = static_cast(bytes_read.load())}, + {.name = "BytesWritten", .value = static_cast(bytes_written.load())}, + {.name = "ProcessingNanos", .value = static_cast(processing_nanos.load())} } }; @@ -59,7 +64,7 @@ std::vector ProcessorMetrics::serialize for (const auto& [relationship, count] : transferred_relationships_) { gsl_Expects(!relationship.empty()); state::response::SerializedResponseNode transferred_to_relationship_node; - transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1)); + transferred_to_relationship_node.name = std::string("TransferredTo").append(1, gsl::narrow(toupper(relationship[0]))).append(relationship.substr(1)); transferred_to_relationship_node.value = static_cast(count); root_node.children.push_back(transferred_to_relationship_node); @@ -73,13 +78,18 @@ std::vector ProcessorMetrics::serialize std::vector ProcessorMetrics::calculateMetrics() { std::vector metrics = { - {"onTrigger_invocations", static_cast(iterations.load()), getCommonLabels()}, + {"onTrigger_invocations", static_cast(invocations.load()), getCommonLabels()}, {"average_onTrigger_runtime_milliseconds", static_cast(getAverageOnTriggerRuntime().count()), getCommonLabels()}, {"last_onTrigger_runtime_milliseconds", static_cast(getLastOnTriggerRuntime().count()), getCommonLabels()}, {"average_session_commit_runtime_milliseconds", static_cast(getAverageSessionCommitRuntime().count()), getCommonLabels()}, {"last_session_commit_runtime_milliseconds", static_cast(getLastSessionCommitRuntime().count()), getCommonLabels()}, {"transferred_flow_files", static_cast(transferred_flow_files.load()), getCommonLabels()}, - {"transferred_bytes", static_cast(transferred_bytes.load()), getCommonLabels()} + {"transferred_bytes", static_cast(transferred_bytes.load()), getCommonLabels()}, + {"incoming_flow_files", static_cast(incoming_flow_files.load()), getCommonLabels()}, + {"incoming_bytes", static_cast(incoming_bytes.load()), getCommonLabels()}, + {"bytes_read", static_cast(bytes_read.load()), getCommonLabels()}, + {"bytes_written", static_cast(bytes_written.load()), getCommonLabels()}, + {"processing_nanos", static_cast(processing_nanos.load()), getCommonLabels()} }; { @@ -122,6 +132,15 @@ std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const return session_commit_runtime_averager_.getLastValue(); } +std::optional ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const { + std::lock_guard lock(transferred_relationships_mutex_); + const auto relationship_it = transferred_relationships_.find(relationship); + if (relationship_it != std::end(transferred_relationships_)) { + return relationship_it->second; + } + return {}; +} + template requires Summable && DividableByInteger ValueType ProcessorMetrics::Averager::getAverage() const { diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 5756072acf..b8066b2a86 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -34,6 +34,12 @@ std::vector FlowInformation::serialize() { {.name = "flowId", .value = flow_version_->getFlowId()} }; + if (nullptr != monitor_) { + monitor_->executeOnComponent("FlowController", [&serialized](StateController& component) { + serialized.push_back({.name = "running", .value = component.isRunning()}); + }); + } + SerializedResponseNode uri; uri.name = "versionedFlowSnapshotURI"; for (auto &entry : flow_version_->serialize()) { @@ -61,19 +67,35 @@ std::vector FlowInformation::serialize() { serialized.push_back(queues); } - if (nullptr != monitor_) { - SerializedResponseNode componentsNode{.name = "components", .collapsible = false}; - monitor_->executeOnAllComponents([&componentsNode](StateController& component){ - componentsNode.children.push_back({ - .name = component.getComponentName(), + if (!processors_.empty()) { + SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", .array = true, .collapsible = false}; + for (const auto processor : processors_) { + if (!processor) { + continue; + } + + auto metrics = processor->getMetrics(); + processorsStatusesNode.children.push_back({ + .name = processor->getName(), .collapsible = false, .children = { - {.name = "running", .value = component.isRunning()}, - {.name = "uuid", .value = std::string{component.getComponentUUID().to_string()}} + {.name = "id", .value = std::string{processor->getUUIDStr()}}, + {.name = "groupId", .value = processor->getProcessGroupUUIDStr()}, + {.name = "bytesRead", .value = metrics->bytes_read.load()}, + {.name = "bytesWritten", .value = metrics->bytes_written.load()}, + {.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()}, + {.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()}, + {.name = "bytesIn", .value = metrics->incoming_bytes.load()}, + {.name = "bytesOut", .value = metrics->transferred_bytes.load()}, + {.name = "invocations", .value = metrics->invocations.load()}, + {.name = "processingNanos", .value = metrics->processing_nanos.load()}, + {.name = "activeThreadCount", .value = -1}, + {.name = "terminatedThreadCount", .value = -1}, + {.name = "running", .value = processor->isRunning()} } }); - }); - serialized.push_back(componentsNode); + } + serialized.push_back(processorsStatusesNode); } return serialized; @@ -81,13 +103,38 @@ std::vector FlowInformation::serialize() { std::vector FlowInformation::calculateMetrics() { std::vector metrics = connection_store_.calculateConnectionMetrics("FlowInformation"); - if (nullptr != monitor_) { - monitor_->executeOnAllComponents([&metrics](StateController& component){ + monitor_->executeOnComponent("FlowController", [&metrics](StateController& component) { metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); }); } + + for (const auto& processor : processors_) { + if (!processor) { + continue; + } + auto processor_metrics = processor->getMetrics(); + metrics.push_back({"bytes_read", gsl::narrow(processor_metrics->bytes_read.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_written", gsl::narrow(processor_metrics->bytes_written.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"flow_files_in", gsl::narrow(processor_metrics->incoming_flow_files.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"flow_files_out", gsl::narrow(processor_metrics->transferred_flow_files.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_in", gsl::narrow(processor_metrics->incoming_bytes.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"bytes_out", gsl::narrow(processor_metrics->transferred_bytes.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"invocations", gsl::narrow(processor_metrics->invocations.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"processing_nanos", gsl::narrow(processor_metrics->processing_nanos.load()), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0), + {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); + } + return metrics; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 68b74da48d..fa50f4e73a 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -209,9 +209,9 @@ void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& re } } -void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const { - auto flowMonitor = dynamic_cast(response_node.get()); - if (flowMonitor == nullptr) { +void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& response_node) const { + auto flow_information = dynamic_cast(response_node.get()); + if (flow_information == nullptr) { return; } @@ -222,11 +222,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& respons } for (auto &con : connections) { - flowMonitor->updateConnection(con.second); + flow_information->updateConnection(con.second); } - flowMonitor->setStateMonitor(update_sink_); + flow_information->setStateMonitor(update_sink_); if (flow_configuration_) { - flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion()); + flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); + } + + if (root_) { + std::vector processors; + root_->getAllProcessors(processors); + flow_information->setProcessors(processors); } } @@ -245,7 +251,7 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: initializeAgentNode(response_node); initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); - initializeFlowMonitor(response_node); + initializeFlowInformation(response_node); initializeAssetInformation(response_node); } return response_nodes; diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp index af2307e5c8..6be8b7fd3a 100644 --- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp +++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp @@ -26,27 +26,37 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac : callback_(std::move(callback)) { } -int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { +std::optional LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr& input, const std::shared_ptr& output) { gsl_Expects(input); gsl_Expects(output); + io::ReadWriteResult result; + if (int64_t status = readInput(*input); status <= 0) { - return status; + if (status < 0) { + return std::nullopt; + } + return result; } - std::size_t total_bytes_written_ = 0; + result.bytes_read = gsl::narrow(input_.size()); + + std::size_t total_bytes_written = 0; bool is_first_line = true; readLine(); do { readLine(); std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); const auto bytes_written = output->write(reinterpret_cast(output_line.data()), output_line.size()); - if (io::isError(bytes_written)) { return -1; } - total_bytes_written_ += bytes_written; + if (io::isError(bytes_written)) { + return std::nullopt; + } + total_bytes_written += bytes_written; is_first_line = false; } while (!isLastLine()); - return gsl::narrow(total_bytes_written_); + result.bytes_written = gsl::narrow(total_bytes_written); + return result; } int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) { diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 39a0a19c75..6398446fbd 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "unit/TestBase.h" #include "integration/HTTPIntegrationBase.h" @@ -92,7 +93,10 @@ class MetricsHandler: public HeartbeatHandler { VERIFY_UPDATED_METRICS }; - static constexpr const char* GETTCP1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; + static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; + static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992"; + static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990"; + static constexpr const char* GENERATE_FLOWFILE_UUID = "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"; static void sendEmptyHeartbeatResponse(struct mg_connection* conn) { mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); @@ -126,28 +130,62 @@ class MetricsHandler: public HeartbeatHandler { } } - static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { + static bool processorMetricsAreValid(const auto& processor) { + return processor["bytesRead"].GetInt() >= 0 && + processor["bytesWritten"].GetInt() >= 0 && + processor["flowFilesIn"].GetInt() >= 0 && + processor["flowFilesOut"].GetInt() >= 0 && + processor["bytesIn"].GetInt() >= 0 && + processor["bytesOut"].GetInt() >= 0 && + processor["invocations"].GetInt() >= 0 && + processor["processingNanos"].GetInt() >= 0 && + processor["activeThreadCount"].GetInt() == -1 && + processor["terminatedThreadCount"].GetInt() == -1 && + processor["running"].GetBool(); + } + + static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value& runtime_metrics, const std::string& queue_id) { return runtime_metrics.HasMember("deviceInfo") && runtime_metrics.HasMember("flowInfo") && + runtime_metrics["flowInfo"].HasMember("flowId") && + runtime_metrics["flowInfo"].HasMember("running") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"].HasMember("components") && - runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") && - runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && - runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) && + runtime_metrics["flowInfo"].HasMember("processorStatuses"); + } + + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { + return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && + [&]() { + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + if (processor_statuses.Size() != 2) { + return false; + } + return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { + if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) { + throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); + } + return processorMetricsAreValid(processor); + }); + }(); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { - return runtime_metrics.HasMember("deviceInfo") && - runtime_metrics.HasMember("flowInfo") && - runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && - runtime_metrics["flowInfo"].HasMember("queues") && - runtime_metrics["flowInfo"].HasMember("components") && - runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") && - runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && - runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + return verifyCommonRuntimeMetricNodes(runtime_metrics, "8368e3c8-015a-1003-52ca-83af40ec1332") && + runtime_metrics["flowInfo"].HasMember("processorStatuses") && + [&]() { + const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); + if (processor_statuses.Size() != 2) { + return false; + } + return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { + if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) { + throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); + } + return processorMetricsAreValid(processor); + }); + }(); } static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) { @@ -169,13 +207,18 @@ class MetricsHandler: public HeartbeatHandler { static bool verifyProcessorMetrics(const rapidjson::Value& processor_metrics) { return processor_metrics.HasMember("GetTCPMetrics") && - processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint() > 0 && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime") && - processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes"); + processor_metrics["GetTCPMetrics"].HasMember(GETTCP_UUID) && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("OnTriggerInvocations") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID]["OnTriggerInvocations"].GetUint() > 0 && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredFlowFiles") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("AverageOnTriggerRunTime") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("LastOnTriggerRunTime") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredBytes") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingFlowFiles") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingBytes") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesRead") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesWritten") && + processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("ProcessingNanos"); } std::atomic_bool& metrics_updated_successfully_; diff --git a/libminifi/test/libtest/unit/SingleProcessorTestController.h b/libminifi/test/libtest/unit/SingleProcessorTestController.h index 0245e3314c..5928c6bc51 100644 --- a/libminifi/test/libtest/unit/SingleProcessorTestController.h +++ b/libminifi/test/libtest/unit/SingleProcessorTestController.h @@ -39,7 +39,7 @@ class SingleProcessorTestController : public TestController { public: explicit SingleProcessorTestController(std::unique_ptr processor) { auto name = processor->getName(); - processor_ = plan->addProcessor(std::move(processor), name); + processor_ = plan->addProcessor(std::move(processor), name, {}); input_ = plan->addConnection(nullptr, core::Relationship{"success", "success"}, processor_); outgoing_connections_ = [this] { std::unordered_map result; diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index ee337986e1..19f5de7980 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -539,11 +539,13 @@ bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& ve if (verify) { auto current_session = std::make_shared(context); + current_session->setMetrics(processor->getMetrics()); process_sessions_.push_back(current_session); verify(context, current_session); current_session->commit(); } else { auto session_factory = std::make_shared(context, [&] (auto current_session) { + current_session->setMetrics(processor->getMetrics()); process_sessions_.push_back(current_session); }); logger_->log_info("Running {}", processor->getName()); diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 16e732c185..a8c4ad5ee7 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -27,6 +27,7 @@ #include "unit/ProvenanceTestHelper.h" #include "unit/DummyProcessor.h" #include "range/v3/algorithm/find_if.hpp" +#include "unit/SingleProcessorTestController.h" using namespace std::literals::chrono_literals; @@ -285,4 +286,76 @@ TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") { REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms); } +class DuplicateContentProcessor : public minifi::core::Processor { + using minifi::core::Processor::Processor; + + public: + DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : Processor(name, uuid) {} + explicit DuplicateContentProcessor(std::string_view name) : Processor(name) {} + static constexpr const char* Description = "A processor that creates two more of the same flow file."; + static constexpr auto Properties = std::array{}; + static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"}; + static constexpr auto Original = core::RelationshipDefinition{"original", "Original FlowFile"}; + static constexpr auto Relationships = std::array{Success, Original}; + static constexpr bool SupportsDynamicProperties = false; + static constexpr bool SupportsDynamicRelationships = false; + static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + static constexpr bool IsSingleThreaded = false; + void initialize() override { + setSupportedRelationships(Relationships); + } + void onTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) override { + auto flow_file = session.get(); + if (!flow_file) { + return; + } + + auto flow_file_copy = session.create(); + std::vector buffer; + session.read(flow_file, [&](const std::shared_ptr& stream) -> int64_t { + buffer.resize(stream->size()); + return gsl::narrow(stream->read(buffer)); + }); + session.write(flow_file_copy, [&](const std::shared_ptr& stream) -> int64_t { + return gsl::narrow(stream->write(buffer)); + }); + session.append(flow_file_copy, [&](const std::shared_ptr& stream) -> int64_t { + return gsl::narrow(stream->write(buffer)); + }); + session.transfer(flow_file_copy, Success); + session.transfer(flow_file, Original); + } + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS +}; + +TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") { + minifi::test::SingleProcessorTestController test_controller(std::make_unique("DuplicateContentProcessor")); + test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); + auto metrics = test_controller.getProcessor()->getMetrics(); + CHECK(metrics->invocations == 1); + CHECK(metrics->incoming_flow_files == 1); + CHECK(metrics->transferred_flow_files == 2); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1); + CHECK(metrics->incoming_bytes == 10); + CHECK(metrics->transferred_bytes == 30); + CHECK(metrics->bytes_read == 10); + CHECK(metrics->bytes_written == 20); + auto old_nanos = metrics->processing_nanos.load(); + CHECK(metrics->processing_nanos > 0); + + test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}}); + CHECK(metrics->invocations == 2); + CHECK(metrics->incoming_flow_files == 2); + CHECK(metrics->transferred_flow_files == 4); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2); + CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2); + CHECK(metrics->incoming_bytes == 24); + CHECK(metrics->transferred_bytes == 72); + CHECK(metrics->bytes_read == 24); + CHECK(metrics->bytes_written == 48); + CHECK(metrics->processing_nanos > old_nanos); +} + + } // namespace org::apache::nifi::minifi::test