diff --git a/controller/Controller.cpp b/controller/Controller.cpp index 627d2558d9..411c58ac22 100644 --- a/controller/Controller.cpp +++ b/controller/Controller.cpp @@ -17,6 +17,7 @@ #include "Controller.h" #include +#include #include "io/BufferStream.h" #include "c2/C2Payload.h" @@ -26,6 +27,7 @@ #include "asio/connect.hpp" #include "core/logging/Logger.h" #include "utils/net/AsioSocketUtils.h" +#include "utils/file/FileUtils.h" namespace org::apache::nifi::minifi::controller { @@ -225,4 +227,43 @@ bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) { return true; } +nonstd::expected getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) { + std::unique_ptr connection_stream = std::make_unique(socket_data); + if (connection_stream->initialize() < 0) { + return nonstd::make_unexpected("Could not connect to remote host " + socket_data.host + ":" + std::to_string(socket_data.port)); + } + io::BufferStream buffer; + auto op = static_cast(c2::Operation::transfer); + buffer.write(&op, 1); + buffer.write("debug"); + if (io::isError(connection_stream->write(buffer.getBuffer()))) { + return nonstd::make_unexpected("Could not write to connection " + socket_data.host + ":" + std::to_string(socket_data.port)); + } + connection_stream->read(op); + size_t bundle_size = 0; + connection_stream->read(bundle_size); + if (bundle_size == 0) { + return nonstd::make_unexpected("Failed to retrieve debug bundle"); + } + + if (std::filesystem::exists(target_dir) && !std::filesystem::is_directory(target_dir)) { + return nonstd::make_unexpected("Object specified as the target directory already exists and it is not a directory"); + } + + if (!std::filesystem::exists(target_dir) && utils::file::create_dir(target_dir) != 0) { + return nonstd::make_unexpected("Failed to create target directory: " + target_dir.string()); + } + + std::ofstream out_file(target_dir / "debug.tar.gz"); + const size_t BUFFER_SIZE = 4096; + std::array out_buffer{}; + while (bundle_size > 0) { + const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE); + const auto size_read = connection_stream->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, next_read_size))); + bundle_size -= size_read; + out_file.write(out_buffer.data(), gsl::narrow(size_read)); + } + return {}; +} + } // namespace org::apache::nifi::minifi::controller diff --git a/controller/Controller.h b/controller/Controller.h index dfc1661b6b..aeb9e64c76 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -21,6 +21,7 @@ #include #include "utils/net/AsioSocketUtils.h" +#include "utils/expected.h" namespace org::apache::nifi::minifi::controller { @@ -35,5 +36,6 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true); bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out); bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out); +nonstd::expected getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir); } // namespace org::apache::nifi::minifi::controller diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp index 3975c41f16..f574c807cb 100644 --- a/controller/MiNiFiController.cpp +++ b/controller/MiNiFiController.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "MainHelper.h" #include "properties/Configure.h" @@ -158,8 +159,16 @@ int main(int argc, char **argv) { addFlagOption("--manifest", "Generates a manifest for the current binary"); addFlagOption("--noheaders", "Removes headers from output streams"); + argument_parser.add_argument("-d", "--debug").metavar("BUNDLE_OUT_DIR") + .help("Get debug bundle"); + bool show_headers = true; + if (argc <= 1) { + std::cerr << argument_parser; + std::exit(1); + } + try { argument_parser.parse_args(argc, argv); } catch (const std::runtime_error& err) { @@ -254,11 +263,20 @@ int main(int argc, char **argv) { if (!minifi::controller::getJstacks(socket_data, std::cout)) std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl; } + + if (const auto& debug_path = argument_parser.present("--debug")) { + auto debug_res = minifi::controller::getDebugBundle(socket_data, std::filesystem::path(*debug_path)); + if (!debug_res) + std::cout << debug_res.error() << std::endl; + else + std::cout << "Debug bundle written to " << std::filesystem::path(*debug_path) / "debug.tar.gz"; + } } catch (const std::exception &exc) { // catch anything thrown within try block that derives from std::exception std::cerr << exc.what() << std::endl; + std::exit(1); } catch (...) { - std::cerr << argument_parser; + std::cerr << "Caught unknown exception" << std::endl; std::exit(1); } return 0; diff --git a/controller/tests/ControllerTests.cpp b/controller/tests/ControllerTests.cpp index fdda4fbc70..a4907f225c 100644 --- a/controller/tests/ControllerTests.cpp +++ b/controller/tests/ControllerTests.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "range/v3/algorithm/find.hpp" #include "TestBase.h" @@ -518,4 +519,58 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests] REQUIRE(jstack_stream.str() == expected_trace); } +TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controllerTests]") { + SECTION("With SSL from service provider") { + setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER); + } + + SECTION("With SSL from properties") { + setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION); + } + + SECTION("Without SSL") { + setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE); + } + + auto reporter = std::make_shared("ControllerSocketMetricsPublisher"); + auto response_node_loader = std::make_shared(configuration_, std::vector>{}, nullptr); + reporter->initialize(configuration_, response_node_loader); + initalizeControllerSocket(reporter); + + TestController test_controller; + auto output_dir = test_controller.createTempDirectory(); + REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir)); + REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz")); +} + +TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-existent folder", "[controllerTests]") { + setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE); + + auto reporter = std::make_shared("ControllerSocketMetricsPublisher"); + auto response_node_loader = std::make_shared(configuration_, std::vector>{}, nullptr); + reporter->initialize(configuration_, response_node_loader); + initalizeControllerSocket(reporter); + + TestController test_controller; + auto output_dir = test_controller.createTempDirectory() / "subfolder"; + REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir)); + REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz")); +} + +TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target path is an existing file", "[controllerTests]") { + setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE); + + auto reporter = std::make_shared("ControllerSocketMetricsPublisher"); + auto response_node_loader = std::make_shared(configuration_, std::vector>{}, nullptr); + reporter->initialize(configuration_, response_node_loader); + initalizeControllerSocket(reporter); + + TestController test_controller; + auto invalid_path = test_controller.createTempDirectory() / "test.log"; + std::ofstream file(invalid_path); + auto result = minifi::controller::getDebugBundle(controller_socket_data_, invalid_path); + REQUIRE(!result); + REQUIRE(result.error() == "Object specified as the target directory already exists and it is not a directory"); +} + } // namespace org::apache::nifi::minifi::test diff --git a/docker/test/integration/cluster/DockerCommunicator.py b/docker/test/integration/cluster/DockerCommunicator.py index 71acdd367e..7d448cf34b 100644 --- a/docker/test/integration/cluster/DockerCommunicator.py +++ b/docker/test/integration/cluster/DockerCommunicator.py @@ -19,6 +19,7 @@ import tarfile import os import io +import uuid class DockerCommunicator: @@ -66,3 +67,19 @@ def write_content_to_container(self, content, container_name, dst_path): tar.addfile(info, io.BytesIO(content.encode('utf-8'))) with open(os.path.join(td, 'content.tar'), 'rb') as data: return self.__put_archive(container_name, os.path.dirname(dst_path), data.read()) + + def copy_file_from_container(self, container_name, src_path_in_container, dest_dir_on_host) -> bool: + try: + container = self.client.containers.get(container_name) + (bits, _) = container.get_archive(src_path_in_container) + tmp_tar_path = os.path.join(dest_dir_on_host, "retrieved_file_" + str(uuid.uuid4()) + ".tar") + with open(tmp_tar_path, 'wb') as out_file: + for chunk in bits: + out_file.write(chunk) + with tarfile.open(tmp_tar_path, 'r') as tar: + tar.extractall(dest_dir_on_host) + os.remove(tmp_tar_path) + return True + except Exception as ex: + logging.error('Exception occurred while copying file from container: %s', str(ex)) + return False diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index e7146ed06f..e850cc4bc1 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -15,6 +15,11 @@ import logging import time import re +import tarfile +import tempfile +import os +import gzip +import shutil from .LogSource import LogSource from .ContainerStore import ContainerStore @@ -340,7 +345,41 @@ def check_connections_full_through_controller(self, connection_count: int, conta def check_connection_size_through_controller(self, connection: str, size: int, max_size: int, container_name: str) -> bool: return self.minifi_controller_executor.get_connection_size(connection, container_name) == (size, max_size) - @retry_check(10, 1000) + @retry_check(10, 1) def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool: manifest = self.minifi_controller_executor.get_manifest(container_name) return '"agentManifest": {' in manifest and '"componentManifest": {' in manifest and '"agentType": "cpp"' in manifest + + @retry_check(10, 1) + def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool: + with tempfile.TemporaryDirectory() as td: + result = self.minifi_controller_executor.get_debug_bundle(container_name, td) + if not result: + logging.error("Failed to get debug bundle") + return False + + with tarfile.open(os.path.join(td, "debug.tar.gz")) as file: + file.extractall(td) + + if not os.path.exists(os.path.join(td, "config.yml")): + logging.error("config.yml file was not found in debug bundle") + return False + + if not os.path.exists(os.path.join(td, "minifi.properties")): + logging.error("minifi.properties file was not found in debug bundle") + return False + + if not os.path.exists(os.path.join(td, "minifi.log.gz")): + logging.error("minifi.log.gz file was not found in debug bundle") + return False + + with gzip.open(os.path.join(td, "minifi.log.gz"), 'rb') as f_in: + with open(os.path.join(td, "minifi.log"), 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + + with open(os.path.join(td, "minifi.log")) as f: + if 'MiNiFi started' not in f.read(): + logging.error("'MiNiFi started' log entry was not found in minifi.log file") + return False + + return True diff --git a/docker/test/integration/cluster/MinifiControllerExecutor.py b/docker/test/integration/cluster/MinifiControllerExecutor.py index d7290df89a..8da19835ce 100644 --- a/docker/test/integration/cluster/MinifiControllerExecutor.py +++ b/docker/test/integration/cluster/MinifiControllerExecutor.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from .DockerCommunicator import DockerCommunicator @@ -66,3 +67,11 @@ def get_manifest(self, container_name: str) -> str: if not line.startswith('['): manifest += line return manifest + + def get_debug_bundle(self, container_name: str, dest: str) -> bool: + (code, _) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--debug", "/opt/minifi/minifi-current/"]) + if code != 0: + logging.error("Minifi controller debug command failed with code: %d", code) + return False + + return self.container_communicator.copy_file_from_container(container_name, "/opt/minifi/minifi-current/debug.tar.gz", dest) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 5dc82e64c1..10c4eac3d7 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -448,3 +448,6 @@ def manifest_can_be_retrieved_through_minifi_controller(self, container_name: st def enable_log_metrics_publisher_in_minifi(self): self.cluster.enable_log_metrics_publisher_in_minifi() + + def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str): + assert self.cluster.debug_bundle_can_be_retrieved_through_minifi_controller(container_name) or self.cluster.log_app_output() diff --git a/docker/test/integration/features/minifi_controller.feature b/docker/test/integration/features/minifi_controller.feature index 486cb1370d..c78f91ca45 100644 --- a/docker/test/integration/features/minifi_controller.feature +++ b/docker/test/integration/features/minifi_controller.feature @@ -74,3 +74,10 @@ Feature: MiNiFi Controller functionalities And controller socket properties are set up When all instances start up Then manifest can be retrieved through MiNiFi controller + + Scenario: Debug bundle can be retrieved + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + Then debug bundle can be retrieved through MiNiFi controller diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 72bad48c8b..dba4c75f79 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1204,3 +1204,13 @@ def step_impl(context, minifi_container_name: str): @then(u'manifest can be retrieved through MiNiFi controller') def step_impl(context): context.execute_steps(f"then manifest can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow") + + +@then(u'debug bundle can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str): + context.test.debug_bundle_can_be_retrieved_through_minifi_controller(minifi_container_name) + + +@then(u'debug bundle can be retrieved through MiNiFi controller') +def step_impl(context): + context.execute_steps(f"then debug bundle can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow") diff --git a/libminifi/include/c2/C2Utils.h b/libminifi/include/c2/C2Utils.h index ae879c0439..f49b83b6c3 100644 --- a/libminifi/include/c2/C2Utils.h +++ b/libminifi/include/c2/C2Utils.h @@ -20,9 +20,13 @@ #include #include +#include #include "properties/Configure.h" #include "utils/StringUtils.h" +#include "io/ArchiveStream.h" +#include "utils/expected.h" +#include "io/BufferStream.h" namespace org::apache::nifi::minifi::c2 { @@ -32,5 +36,6 @@ inline constexpr const char* CONTROLLER_SOCKET_METRICS_PUBLISHER = "ControllerSo bool isC2Enabled(const std::shared_ptr& configuration); bool isControllerSocketEnabled(const std::shared_ptr& configuration); +nonstd::expected, std::string> createDebugBundleArchive(const std::map>& files); } // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/include/c2/ControllerSocketProtocol.h b/libminifi/include/c2/ControllerSocketProtocol.h index f6045b4e26..93c279a2ba 100644 --- a/libminifi/include/c2/ControllerSocketProtocol.h +++ b/libminifi/include/c2/ControllerSocketProtocol.h @@ -51,12 +51,14 @@ class ControllerSocketProtocol { void handleStop(io::BaseStream &stream); void handleClear(io::BaseStream &stream); void handleUpdate(io::BaseStream &stream); + void handleTransfer(io::BaseStream &stream); void writeQueueSizesResponse(io::BaseStream &stream); void writeComponentsResponse(io::BaseStream &stream); void writeConnectionsResponse(io::BaseStream &stream); void writeGetFullResponse(io::BaseStream &stream); void writeManifestResponse(io::BaseStream &stream); void writeJstackResponse(io::BaseStream &stream); + void writeDebugBundleResponse(io::BaseStream &stream); void handleDescribe(io::BaseStream &stream); asio::awaitable handleCommand(std::unique_ptr stream); asio::awaitable handshakeAndHandleCommand(asio::ip::tcp::socket&& socket, std::shared_ptr ssl_context_service); diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index eed786167c..3bd7bc0e5c 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -44,6 +44,7 @@ #include "io/ArchiveStream.h" #include "io/StreamPipe.h" #include "utils/Id.h" +#include "c2/C2Utils.h" using namespace std::literals::chrono_literals; @@ -356,7 +357,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { case Operation::start: case Operation::stop: { if (resp.name == "C2" || resp.name == "c2") { - raise(SIGTERM); + (void)raise(SIGTERM); } // stop all referenced components. @@ -431,7 +432,7 @@ C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) co } void C2Agent::handle_clear(const C2ContentResponse &resp) { - ClearOperand operand; + ClearOperand operand = ClearOperand::connection; try { operand = utils::enumCast(resp.name, true); } catch(const std::runtime_error&) { @@ -486,7 +487,7 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) { * to be put into the acknowledgement */ void C2Agent::handle_describe(const C2ContentResponse &resp) { - DescribeOperand operand; + DescribeOperand operand = DescribeOperand::metrics; try { operand = utils::enumCast(resp.name, true); } catch(const std::runtime_error&) { @@ -588,7 +589,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { } void C2Agent::handle_update(const C2ContentResponse &resp) { - UpdateOperand operand; + UpdateOperand operand = UpdateOperand::configuration; try { operand = utils::enumCast(resp.name, true); } catch(const std::runtime_error&) { @@ -665,56 +666,34 @@ C2Agent::UpdateResult C2Agent::update_property(const std::string &property_name, } C2Payload C2Agent::bundleDebugInfo(std::map>& files) { - C2Payload payload(Operation::transfer, false); - auto stream_provider = core::ClassLoader::getDefaultClassLoader().instantiate( - "ArchiveStreamProvider", "ArchiveStreamProvider"); - if (!stream_provider) { - throw C2DebugBundleError("Couldn't instantiate archiver provider"); - } - auto bundle = std::make_shared(); - auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, logger_); - if (!archiver) { - throw C2DebugBundleError("Couldn't instantiate archiver"); - } - for (auto& [filename, stream] : files) { - size_t file_size = stream->size(); - if (!archiver->newEntry({filename, file_size})) { - throw C2DebugBundleError("Couldn't initialize archive entry for '" + filename + "'"); - } - if (gsl::narrow(file_size) != internal::pipe(*stream, *archiver)) { - // we have touched the input streams, they cannot be reused - throw C2DebugBundleError("Error while writing file '" + filename + "' into the debug bundle"); - } - } + static constexpr const char* MANIFEST_FILE_NAME = "manifest.json"; + auto manifest_stream = std::make_unique(); if (auto node_reporter = node_reporter_.lock()) { - static constexpr const char* MANIFEST_FILE_NAME = "manifest.json"; auto reported_manifest = node_reporter->getAgentManifest(); std::string manifest_str = state::response::SerializedResponseNode{ .name = std::move(reported_manifest.name), .array = reported_manifest.is_array, .children = std::move(reported_manifest.serialized_nodes) }.to_pretty_string(); - if (!archiver->newEntry({MANIFEST_FILE_NAME, manifest_str.size()})) { - throw C2DebugBundleError(fmt::format("Couldn't initialize archive entry for '{}'", MANIFEST_FILE_NAME)); - } - io::BufferStream manifest_stream; - manifest_stream.write(as_bytes(std::span(manifest_str))); - if (gsl::narrow(manifest_stream.size()) != internal::pipe(manifest_stream, *archiver)) { - throw C2DebugBundleError(fmt::format("Error while writing file '{}'", MANIFEST_FILE_NAME)); - } + manifest_stream->write(as_bytes(std::span(manifest_str))); } - if (!archiver->finish()) { - throw C2DebugBundleError("Failed to complete debug bundle archive"); + files[MANIFEST_FILE_NAME] = std::move(manifest_stream); + + auto bundle = createDebugBundleArchive(files); + if (!bundle) { + throw C2DebugBundleError(bundle.error()); } + C2Payload file(Operation::transfer, true); file.setLabel("debug.tar.gz"); - file.setRawData(bundle->moveBuffer()); + file.setRawData(bundle.value()->moveBuffer()); + C2Payload payload(Operation::transfer, false); payload.addPayload(std::move(file)); return payload; } void C2Agent::handle_transfer(const C2ContentResponse &resp) { - TransferOperand operand; + TransferOperand operand = TransferOperand::debug; try { operand = utils::enumCast(resp.name, true); } catch(const std::runtime_error&) { @@ -1030,7 +1009,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { { std::ofstream file{file_path, std::ofstream::binary}; - file.write(reinterpret_cast(raw_data.data()), raw_data.size()); + file.write(reinterpret_cast(raw_data.data()), gsl::narrow(raw_data.size())); } C2Payload response(Operation::acknowledge, state::UpdateState::FULLY_APPLIED, resp.ident, true); diff --git a/libminifi/src/c2/C2Utils.cpp b/libminifi/src/c2/C2Utils.cpp index 24b71fc8ff..ae22fb07ae 100644 --- a/libminifi/src/c2/C2Utils.cpp +++ b/libminifi/src/c2/C2Utils.cpp @@ -17,6 +17,9 @@ */ #include "c2/C2Utils.h" +#include "core/ClassLoader.h" +#include "io/StreamPipe.h" + namespace org::apache::nifi::minifi::c2 { bool isC2Enabled(const std::shared_ptr& configuration) { @@ -31,4 +34,31 @@ bool isControllerSocketEnabled(const std::shared_ptr& configuration) return utils::StringUtils::toBool(controller_socket_enable_str).value_or(false); } +nonstd::expected, std::string> createDebugBundleArchive(const std::map>& files) { + auto stream_provider = core::ClassLoader::getDefaultClassLoader().instantiate( + "ArchiveStreamProvider", "ArchiveStreamProvider"); + if (!stream_provider) { + return nonstd::make_unexpected("Couldn't instantiate archiver provider"); + } + auto bundle = std::make_shared(); + auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, nullptr); + if (!archiver) { + return nonstd::make_unexpected("Couldn't instantiate archiver"); + } + for (const auto& [filename, stream] : files) { + size_t file_size = stream->size(); + if (!archiver->newEntry({filename, file_size})) { + return nonstd::make_unexpected("Couldn't initialize archive entry for '" + filename + "'"); + } + if (gsl::narrow(file_size) != minifi::internal::pipe(*stream, *archiver)) { + // we have touched the input streams, they cannot be reused + return nonstd::make_unexpected("Error while writing file '" + filename + "' into the debug bundle"); + } + } + if (!archiver->finish()) { + return nonstd::make_unexpected("Failed to complete debug bundle archive"); + } + return bundle; +} + } // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index e122da1d58..ea75df0e46 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -31,6 +31,7 @@ #include "asio/ssl/stream.hpp" #include "asio/detached.hpp" #include "utils/net/AsioSocketUtils.h" +#include "c2/C2Utils.h" namespace org::apache::nifi::minifi::c2 { @@ -185,7 +186,7 @@ void ControllerSocketProtocol::handleStart(io::BaseStream &stream) { }); } } else { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); } } @@ -197,7 +198,7 @@ void ControllerSocketProtocol::handleStop(io::BaseStream &stream) { component.stop(); }); } else { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); } } @@ -214,7 +215,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream &stream) { { const auto size = stream.read(what); if (io::isError(size)) { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); return; } } @@ -223,7 +224,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream &stream) { { const auto size = stream.read(ff_loc); if (io::isError(size)) { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); return; } } @@ -238,7 +239,7 @@ void ControllerSocketProtocol::writeQueueSizesResponse(io::BaseStream &stream) { std::string connection; const auto size_ = stream.read(connection); if (io::isError(size_)) { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); return; } std::unordered_map sizes; @@ -351,7 +352,7 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream &stream) { std::string what; const auto size = stream.read(what); if (io::isError(size)) { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); return; } if (what == "queue") { @@ -371,10 +372,51 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream &stream) { } } +void ControllerSocketProtocol::writeDebugBundleResponse(io::BaseStream &stream) { + auto files = update_sink_.getDebugInfo(); + auto bundle = createDebugBundleArchive(files); + io::BufferStream resp; + auto op = static_cast(Operation::transfer); + resp.write(&op, 1); + if (!bundle) { + logger_->log_error("Creating debug bundle failed: {}", bundle.error()); + resp.write(static_cast(0)); + stream.write(resp.getBuffer()); + return; + } + + size_t bundle_size = bundle.value()->size(); + resp.write(bundle_size); + const size_t BUFFER_SIZE = 4096; + std::array out_buffer{}; + while (bundle_size > 0) { + const auto next_write_size = (std::min)(bundle_size, BUFFER_SIZE); + const auto size_read = bundle.value()->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, next_write_size))); + resp.write(reinterpret_cast(out_buffer.data()), size_read); + bundle_size -= size_read; + } + + stream.write(resp.getBuffer()); +} + +void ControllerSocketProtocol::handleTransfer(io::BaseStream &stream) { + std::string what; + const auto size = stream.read(what); + if (io::isError(size)) { + logger_->log_error("Connection broke"); + return; + } + if (what == "debug") { + writeDebugBundleResponse(stream); + } else { + logger_->log_error("Unknown C2 transfer parameter: {}", what); + } +} + asio::awaitable ControllerSocketProtocol::handleCommand(std::unique_ptr stream) { - uint8_t head; + uint8_t head = 0; if (stream->read(head) != 1) { - logger_->log_debug("Connection broke"); + logger_->log_error("Connection broke"); co_return; } @@ -400,6 +442,9 @@ asio::awaitable ControllerSocketProtocol::handleCommand(std::unique_ptrlog_error("Unhandled C2 operation: {}", head); }