Skip to content

Commit

Permalink
MINIFICPP-2210 Add C2 debug command to MiNiFi Controller
Browse files Browse the repository at this point in the history
Closes #1669

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
lordgamez authored and szaszm committed Oct 26, 2023
1 parent 0276dab commit d9a0415
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 49 deletions.
41 changes: 41 additions & 0 deletions controller/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Controller.h"

#include <utility>
#include <fstream>

#include "io/BufferStream.h"
#include "c2/C2Payload.h"
Expand All @@ -26,6 +27,7 @@
#include "asio/connect.hpp"
#include "core/logging/Logger.h"
#include "utils/net/AsioSocketUtils.h"
#include "utils/file/FileUtils.h"

namespace org::apache::nifi::minifi::controller {

Expand Down Expand Up @@ -225,4 +227,43 @@ bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
return true;
}

nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return nonstd::make_unexpected("Could not connect to remote host " + socket_data.host + ":" + std::to_string(socket_data.port));
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::transfer);
buffer.write(&op, 1);
buffer.write("debug");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return nonstd::make_unexpected("Could not write to connection " + socket_data.host + ":" + std::to_string(socket_data.port));
}
connection_stream->read(op);
size_t bundle_size = 0;
connection_stream->read(bundle_size);
if (bundle_size == 0) {
return nonstd::make_unexpected("Failed to retrieve debug bundle");
}

if (std::filesystem::exists(target_dir) && !std::filesystem::is_directory(target_dir)) {
return nonstd::make_unexpected("Object specified as the target directory already exists and it is not a directory");
}

if (!std::filesystem::exists(target_dir) && utils::file::create_dir(target_dir) != 0) {
return nonstd::make_unexpected("Failed to create target directory: " + target_dir.string());
}

std::ofstream out_file(target_dir / "debug.tar.gz");
const size_t BUFFER_SIZE = 4096;
std::array<char, BUFFER_SIZE> out_buffer{};
while (bundle_size > 0) {
const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE);
const auto size_read = connection_stream->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, next_read_size)));
bundle_size -= size_read;
out_file.write(out_buffer.data(), gsl::narrow<std::streamsize>(size_read));
}
return {};
}

} // namespace org::apache::nifi::minifi::controller
2 changes: 2 additions & 0 deletions controller/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>

#include "utils/net/AsioSocketUtils.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::controller {

Expand All @@ -35,5 +36,6 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir);

} // namespace org::apache::nifi::minifi::controller
20 changes: 19 additions & 1 deletion controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <iostream>
#include <string>
#include <string_view>
#include <filesystem>

#include "MainHelper.h"
#include "properties/Configure.h"
Expand Down Expand Up @@ -158,8 +159,16 @@ int main(int argc, char **argv) {
addFlagOption("--manifest", "Generates a manifest for the current binary");
addFlagOption("--noheaders", "Removes headers from output streams");

argument_parser.add_argument("-d", "--debug").metavar("BUNDLE_OUT_DIR")
.help("Get debug bundle");

bool show_headers = true;

if (argc <= 1) {
std::cerr << argument_parser;
std::exit(1);
}

try {
argument_parser.parse_args(argc, argv);
} catch (const std::runtime_error& err) {
Expand Down Expand Up @@ -254,11 +263,20 @@ int main(int argc, char **argv) {
if (!minifi::controller::getJstacks(socket_data, std::cout))
std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
}

if (const auto& debug_path = argument_parser.present("--debug")) {
auto debug_res = minifi::controller::getDebugBundle(socket_data, std::filesystem::path(*debug_path));
if (!debug_res)
std::cout << debug_res.error() << std::endl;
else
std::cout << "Debug bundle written to " << std::filesystem::path(*debug_path) / "debug.tar.gz";
}
} catch (const std::exception &exc) {
// catch anything thrown within try block that derives from std::exception
std::cerr << exc.what() << std::endl;
std::exit(1);
} catch (...) {
std::cerr << argument_parser;
std::cerr << "Caught unknown exception" << std::endl;
std::exit(1);
}
return 0;
Expand Down
55 changes: 55 additions & 0 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <string>
#include <filesystem>
#include <fstream>
#include "range/v3/algorithm/find.hpp"

#include "TestBase.h"
Expand Down Expand Up @@ -518,4 +519,58 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests]
REQUIRE(jstack_stream.str() == expected_trace);
}

TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controllerTests]") {
SECTION("With SSL from service provider") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
}

SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}

SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto output_dir = test_controller.createTempDirectory();
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}

TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-existent folder", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto output_dir = test_controller.createTempDirectory() / "subfolder";
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}

TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target path is an existing file", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto invalid_path = test_controller.createTempDirectory() / "test.log";
std::ofstream file(invalid_path);
auto result = minifi::controller::getDebugBundle(controller_socket_data_, invalid_path);
REQUIRE(!result);
REQUIRE(result.error() == "Object specified as the target directory already exists and it is not a directory");
}

} // namespace org::apache::nifi::minifi::test
17 changes: 17 additions & 0 deletions docker/test/integration/cluster/DockerCommunicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tarfile
import os
import io
import uuid


class DockerCommunicator:
Expand Down Expand Up @@ -66,3 +67,19 @@ def write_content_to_container(self, content, container_name, dst_path):
tar.addfile(info, io.BytesIO(content.encode('utf-8')))
with open(os.path.join(td, 'content.tar'), 'rb') as data:
return self.__put_archive(container_name, os.path.dirname(dst_path), data.read())

def copy_file_from_container(self, container_name, src_path_in_container, dest_dir_on_host) -> bool:
try:
container = self.client.containers.get(container_name)
(bits, _) = container.get_archive(src_path_in_container)
tmp_tar_path = os.path.join(dest_dir_on_host, "retrieved_file_" + str(uuid.uuid4()) + ".tar")
with open(tmp_tar_path, 'wb') as out_file:
for chunk in bits:
out_file.write(chunk)
with tarfile.open(tmp_tar_path, 'r') as tar:
tar.extractall(dest_dir_on_host)
os.remove(tmp_tar_path)
return True
except Exception as ex:
logging.error('Exception occurred while copying file from container: %s', str(ex))
return False
41 changes: 40 additions & 1 deletion docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
import logging
import time
import re
import tarfile
import tempfile
import os
import gzip
import shutil

from .LogSource import LogSource
from .ContainerStore import ContainerStore
Expand Down Expand Up @@ -340,7 +345,41 @@ def check_connections_full_through_controller(self, connection_count: int, conta
def check_connection_size_through_controller(self, connection: str, size: int, max_size: int, container_name: str) -> bool:
return self.minifi_controller_executor.get_connection_size(connection, container_name) == (size, max_size)

@retry_check(10, 1000)
@retry_check(10, 1)
def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool:
manifest = self.minifi_controller_executor.get_manifest(container_name)
return '"agentManifest": {' in manifest and '"componentManifest": {' in manifest and '"agentType": "cpp"' in manifest

@retry_check(10, 1)
def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool:
with tempfile.TemporaryDirectory() as td:
result = self.minifi_controller_executor.get_debug_bundle(container_name, td)
if not result:
logging.error("Failed to get debug bundle")
return False

with tarfile.open(os.path.join(td, "debug.tar.gz")) as file:
file.extractall(td)

if not os.path.exists(os.path.join(td, "config.yml")):
logging.error("config.yml file was not found in debug bundle")
return False

if not os.path.exists(os.path.join(td, "minifi.properties")):
logging.error("minifi.properties file was not found in debug bundle")
return False

if not os.path.exists(os.path.join(td, "minifi.log.gz")):
logging.error("minifi.log.gz file was not found in debug bundle")
return False

with gzip.open(os.path.join(td, "minifi.log.gz"), 'rb') as f_in:
with open(os.path.join(td, "minifi.log"), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

with open(os.path.join(td, "minifi.log")) as f:
if 'MiNiFi started' not in f.read():
logging.error("'MiNiFi started' log entry was not found in minifi.log file")
return False

return True
9 changes: 9 additions & 0 deletions docker/test/integration/cluster/MinifiControllerExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from .DockerCommunicator import DockerCommunicator


Expand Down Expand Up @@ -66,3 +67,11 @@ def get_manifest(self, container_name: str) -> str:
if not line.startswith('['):
manifest += line
return manifest

def get_debug_bundle(self, container_name: str, dest: str) -> bool:
(code, _) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--debug", "/opt/minifi/minifi-current/"])
if code != 0:
logging.error("Minifi controller debug command failed with code: %d", code)
return False

return self.container_communicator.copy_file_from_container(container_name, "/opt/minifi/minifi-current/debug.tar.gz", dest)
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,6 @@ def manifest_can_be_retrieved_through_minifi_controller(self, container_name: st

def enable_log_metrics_publisher_in_minifi(self):
self.cluster.enable_log_metrics_publisher_in_minifi()

def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str):
assert self.cluster.debug_bundle_can_be_retrieved_through_minifi_controller(container_name) or self.cluster.log_app_output()
7 changes: 7 additions & 0 deletions docker/test/integration/features/minifi_controller.feature
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ Feature: MiNiFi Controller functionalities
And controller socket properties are set up
When all instances start up
Then manifest can be retrieved through MiNiFi controller

Scenario: Debug bundle can be retrieved
Given a GenerateFlowFile processor
And a file with the content "test" is present in "/tmp/input"
And controller socket properties are set up
When all instances start up
Then debug bundle can be retrieved through MiNiFi controller
10 changes: 10 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,3 +1204,13 @@ def step_impl(context, minifi_container_name: str):
@then(u'manifest can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then manifest can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")


@then(u'debug bundle can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.debug_bundle_can_be_retrieved_through_minifi_controller(minifi_container_name)


@then(u'debug bundle can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then debug bundle can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
5 changes: 5 additions & 0 deletions libminifi/include/c2/C2Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

#include <string>
#include <memory>
#include <map>

#include "properties/Configure.h"
#include "utils/StringUtils.h"
#include "io/ArchiveStream.h"
#include "utils/expected.h"
#include "io/BufferStream.h"

namespace org::apache::nifi::minifi::c2 {

Expand All @@ -32,5 +36,6 @@ inline constexpr const char* CONTROLLER_SOCKET_METRICS_PUBLISHER = "ControllerSo

bool isC2Enabled(const std::shared_ptr<Configure>& configuration);
bool isControllerSocketEnabled(const std::shared_ptr<Configure>& configuration);
nonstd::expected<std::shared_ptr<io::BufferStream>, std::string> createDebugBundleArchive(const std::map<std::string, std::unique_ptr<io::InputStream>>& files);

} // namespace org::apache::nifi::minifi::c2
2 changes: 2 additions & 0 deletions libminifi/include/c2/ControllerSocketProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ class ControllerSocketProtocol {
void handleStop(io::BaseStream &stream);
void handleClear(io::BaseStream &stream);
void handleUpdate(io::BaseStream &stream);
void handleTransfer(io::BaseStream &stream);
void writeQueueSizesResponse(io::BaseStream &stream);
void writeComponentsResponse(io::BaseStream &stream);
void writeConnectionsResponse(io::BaseStream &stream);
void writeGetFullResponse(io::BaseStream &stream);
void writeManifestResponse(io::BaseStream &stream);
void writeJstackResponse(io::BaseStream &stream);
void writeDebugBundleResponse(io::BaseStream &stream);
void handleDescribe(io::BaseStream &stream);
asio::awaitable<void> handleCommand(std::unique_ptr<io::BaseStream> stream);
asio::awaitable<void> handshakeAndHandleCommand(asio::ip::tcp::socket&& socket, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service);
Expand Down
Loading

0 comments on commit d9a0415

Please sign in to comment.