Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2210 Add C2 debug command to MiNiFi Controller #1669

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions controller/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Controller.h"

#include <utility>
#include <fstream>

#include "io/BufferStream.h"
#include "c2/C2Payload.h"
Expand All @@ -26,6 +27,7 @@
#include "asio/connect.hpp"
#include "core/logging/Logger.h"
#include "utils/net/AsioSocketUtils.h"
#include "utils/file/FileUtils.h"

namespace org::apache::nifi::minifi::controller {

Expand Down Expand Up @@ -225,4 +227,43 @@ bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
return true;
}

nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
szaszm marked this conversation as resolved.
Show resolved Hide resolved
if (connection_stream->initialize() < 0) {
return nonstd::make_unexpected("Could not connect to remote host " + socket_data.host + ":" + std::to_string(socket_data.port));
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::transfer);
buffer.write(&op, 1);
buffer.write("debug");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return nonstd::make_unexpected("Could not write to connection " + socket_data.host + ":" + std::to_string(socket_data.port));
}
connection_stream->read(op);
size_t bundle_size = 0;
connection_stream->read(bundle_size);
if (bundle_size == 0) {
return nonstd::make_unexpected("Failed to retrieve debug bundle");
}

if (std::filesystem::exists(target_dir) && !std::filesystem::is_directory(target_dir)) {
return nonstd::make_unexpected("Object specified as the target directory already exists and it is not a directory");
}

if (!std::filesystem::exists(target_dir) && utils::file::create_dir(target_dir) != 0) {
return nonstd::make_unexpected("Failed to create target directory: " + target_dir.string());
}

std::ofstream out_file(target_dir / "debug.tar.gz");
const size_t BUFFER_SIZE = 4096;
std::array<char, BUFFER_SIZE> out_buffer{};
while (bundle_size > 0) {
const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE);
const auto size_read = connection_stream->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, next_read_size)));
bundle_size -= size_read;
out_file.write(out_buffer.data(), gsl::narrow<std::streamsize>(size_read));
}
return {};
}

} // namespace org::apache::nifi::minifi::controller
2 changes: 2 additions & 0 deletions controller/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>

#include "utils/net/AsioSocketUtils.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::controller {

Expand All @@ -35,5 +36,6 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir);

} // namespace org::apache::nifi::minifi::controller
20 changes: 19 additions & 1 deletion controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <iostream>
#include <string>
#include <string_view>
#include <filesystem>

#include "MainHelper.h"
#include "properties/Configure.h"
Expand Down Expand Up @@ -158,8 +159,16 @@ int main(int argc, char **argv) {
addFlagOption("--manifest", "Generates a manifest for the current binary");
addFlagOption("--noheaders", "Removes headers from output streams");

argument_parser.add_argument("-d", "--debug").metavar("BUNDLE_OUT_DIR")
.help("Get debug bundle");

bool show_headers = true;

if (argc <= 1) {
std::cerr << argument_parser;
std::exit(1);
}

try {
argument_parser.parse_args(argc, argv);
} catch (const std::runtime_error& err) {
Expand Down Expand Up @@ -254,11 +263,20 @@ int main(int argc, char **argv) {
if (!minifi::controller::getJstacks(socket_data, std::cout))
std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
}

if (const auto& debug_path = argument_parser.present("--debug")) {
auto debug_res = minifi::controller::getDebugBundle(socket_data, std::filesystem::path(*debug_path));
if (!debug_res)
std::cout << debug_res.error() << std::endl;
else
std::cout << "Debug bundle written to " << std::filesystem::path(*debug_path) / "debug.tar.gz";
}
} catch (const std::exception &exc) {
// catch anything thrown within try block that derives from std::exception
std::cerr << exc.what() << std::endl;
std::exit(1);
} catch (...) {
std::cerr << argument_parser;
std::cerr << "Caught unknown exception" << std::endl;
std::exit(1);
}
return 0;
Expand Down
55 changes: 55 additions & 0 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <string>
#include <filesystem>
#include <fstream>
#include "range/v3/algorithm/find.hpp"

#include "TestBase.h"
Expand Down Expand Up @@ -518,4 +519,58 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests]
REQUIRE(jstack_stream.str() == expected_trace);
}

TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controllerTests]") {
SECTION("With SSL from service provider") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
}

SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}

SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto output_dir = test_controller.createTempDirectory();
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}

TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-existent folder", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto output_dir = test_controller.createTempDirectory() / "subfolder";
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}

TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target path is an existing file", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

TestController test_controller;
auto invalid_path = test_controller.createTempDirectory() / "test.log";
std::ofstream file(invalid_path);
auto result = minifi::controller::getDebugBundle(controller_socket_data_, invalid_path);
REQUIRE(!result);
REQUIRE(result.error() == "Object specified as the target directory already exists and it is not a directory");
}

} // namespace org::apache::nifi::minifi::test
17 changes: 17 additions & 0 deletions docker/test/integration/cluster/DockerCommunicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tarfile
import os
import io
import uuid


class DockerCommunicator:
Expand Down Expand Up @@ -66,3 +67,19 @@ def write_content_to_container(self, content, container_name, dst_path):
tar.addfile(info, io.BytesIO(content.encode('utf-8')))
with open(os.path.join(td, 'content.tar'), 'rb') as data:
return self.__put_archive(container_name, os.path.dirname(dst_path), data.read())

def copy_file_from_container(self, container_name, src_path_in_container, dest_dir_on_host) -> bool:
try:
container = self.client.containers.get(container_name)
(bits, _) = container.get_archive(src_path_in_container)
tmp_tar_path = os.path.join(dest_dir_on_host, "retrieved_file_" + str(uuid.uuid4()) + ".tar")
with open(tmp_tar_path, 'wb') as out_file:
for chunk in bits:
out_file.write(chunk)
with tarfile.open(tmp_tar_path, 'r') as tar:
tar.extractall(dest_dir_on_host)
os.remove(tmp_tar_path)
return True
except Exception as ex:
logging.error('Exception occurred while copying file from container: %s', str(ex))
return False
41 changes: 40 additions & 1 deletion docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
import logging
import time
import re
import tarfile
import tempfile
import os
import gzip
import shutil

from .LogSource import LogSource
from .ContainerStore import ContainerStore
Expand Down Expand Up @@ -340,7 +345,41 @@ def check_connections_full_through_controller(self, connection_count: int, conta
def check_connection_size_through_controller(self, connection: str, size: int, max_size: int, container_name: str) -> bool:
return self.minifi_controller_executor.get_connection_size(connection, container_name) == (size, max_size)

@retry_check(10, 1000)
@retry_check(10, 1)
def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool:
manifest = self.minifi_controller_executor.get_manifest(container_name)
return '"agentManifest": {' in manifest and '"componentManifest": {' in manifest and '"agentType": "cpp"' in manifest

@retry_check(10, 1)
def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool:
with tempfile.TemporaryDirectory() as td:
result = self.minifi_controller_executor.get_debug_bundle(container_name, td)
if not result:
logging.error("Failed to get debug bundle")
return False

with tarfile.open(os.path.join(td, "debug.tar.gz")) as file:
file.extractall(td)

if not os.path.exists(os.path.join(td, "config.yml")):
logging.error("config.yml file was not found in debug bundle")
return False

if not os.path.exists(os.path.join(td, "minifi.properties")):
logging.error("minifi.properties file was not found in debug bundle")
return False

if not os.path.exists(os.path.join(td, "minifi.log.gz")):
logging.error("minifi.log.gz file was not found in debug bundle")
return False

with gzip.open(os.path.join(td, "minifi.log.gz"), 'rb') as f_in:
with open(os.path.join(td, "minifi.log"), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

with open(os.path.join(td, "minifi.log")) as f:
if 'MiNiFi started' not in f.read():
logging.error("'MiNiFi started' log entry was not found in minifi.log file")
return False

return True
9 changes: 9 additions & 0 deletions docker/test/integration/cluster/MinifiControllerExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from .DockerCommunicator import DockerCommunicator


Expand Down Expand Up @@ -66,3 +67,11 @@ def get_manifest(self, container_name: str) -> str:
if not line.startswith('['):
manifest += line
return manifest

def get_debug_bundle(self, container_name: str, dest: str) -> bool:
(code, _) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--debug", "/opt/minifi/minifi-current/"])
if code != 0:
logging.error("Minifi controller debug command failed with code: %d", code)
return False

return self.container_communicator.copy_file_from_container(container_name, "/opt/minifi/minifi-current/debug.tar.gz", dest)
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,6 @@ def manifest_can_be_retrieved_through_minifi_controller(self, container_name: st

def enable_log_metrics_publisher_in_minifi(self):
self.cluster.enable_log_metrics_publisher_in_minifi()

def debug_bundle_can_be_retrieved_through_minifi_controller(self, container_name: str):
assert self.cluster.debug_bundle_can_be_retrieved_through_minifi_controller(container_name) or self.cluster.log_app_output()
7 changes: 7 additions & 0 deletions docker/test/integration/features/minifi_controller.feature
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ Feature: MiNiFi Controller functionalities
And controller socket properties are set up
When all instances start up
Then manifest can be retrieved through MiNiFi controller

Scenario: Debug bundle can be retrieved
Given a GenerateFlowFile processor
And a file with the content "test" is present in "/tmp/input"
And controller socket properties are set up
When all instances start up
Then debug bundle can be retrieved through MiNiFi controller
10 changes: 10 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,3 +1204,13 @@ def step_impl(context, minifi_container_name: str):
@then(u'manifest can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then manifest can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")


@then(u'debug bundle can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.debug_bundle_can_be_retrieved_through_minifi_controller(minifi_container_name)


@then(u'debug bundle can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then debug bundle can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
5 changes: 5 additions & 0 deletions libminifi/include/c2/C2Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

#include <string>
#include <memory>
#include <map>

#include "properties/Configure.h"
#include "utils/StringUtils.h"
#include "io/ArchiveStream.h"
#include "utils/expected.h"
#include "io/BufferStream.h"

namespace org::apache::nifi::minifi::c2 {

Expand All @@ -32,5 +36,6 @@ inline constexpr const char* CONTROLLER_SOCKET_METRICS_PUBLISHER = "ControllerSo

bool isC2Enabled(const std::shared_ptr<Configure>& configuration);
bool isControllerSocketEnabled(const std::shared_ptr<Configure>& configuration);
nonstd::expected<std::shared_ptr<io::BufferStream>, std::string> createDebugBundleArchive(const std::map<std::string, std::unique_ptr<io::InputStream>>& files);

} // namespace org::apache::nifi::minifi::c2
2 changes: 2 additions & 0 deletions libminifi/include/c2/ControllerSocketProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ class ControllerSocketProtocol {
void handleStop(io::BaseStream &stream);
void handleClear(io::BaseStream &stream);
void handleUpdate(io::BaseStream &stream);
void handleTransfer(io::BaseStream &stream);
void writeQueueSizesResponse(io::BaseStream &stream);
void writeComponentsResponse(io::BaseStream &stream);
void writeConnectionsResponse(io::BaseStream &stream);
void writeGetFullResponse(io::BaseStream &stream);
void writeManifestResponse(io::BaseStream &stream);
void writeJstackResponse(io::BaseStream &stream);
void writeDebugBundleResponse(io::BaseStream &stream);
void handleDescribe(io::BaseStream &stream);
asio::awaitable<void> handleCommand(std::unique_ptr<io::BaseStream> stream);
asio::awaitable<void> handshakeAndHandleCommand(asio::ip::tcp::socket&& socket, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service);
Expand Down
Loading
Loading