Skip to content

Commit

Permalink
Command pipe fixes:
Browse files Browse the repository at this point in the history
- Add config entry to specify named pipe path
- Clean pipe upon exit
- Attempt to recreate pipe upon connection error
- Minor refactoring
  • Loading branch information
StefanoColli committed Dec 2, 2024
1 parent 40a7273 commit 7e0923e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
6 changes: 6 additions & 0 deletions examples/config.sample
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
# Default: 0 (de-duplication disabled)
#DeDuplicationPeriod = 0

# Command pipe path. A named pipe will be created at the provided path. It
# is possible to send create and delete commands over such channel to dynamically
# manage UDP endpoints.
# Default: "" (command disabled)
#CommandPipePath = /tmp/mavlink_router_pipe

## TCP Server Endpoints

# Listen for TCP connections on this port. Set to 0 to disable.
Expand Down
11 changes: 6 additions & 5 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,12 @@ static int parse_confs(ConfFile &conffile, Configuration &config)
struct ConfFile::section_iter iter;
// clang-format off
static const ConfFile::OptionsTable global_option_table[] = {
{"TcpServerPort", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, tcp_port)},
{"ReportStats", false, ConfFile::parse_bool, OPTIONS_TABLE_STRUCT_FIELD(Configuration, report_msg_statistics)},
{"DebugLogLevel", false, parse_log_level, OPTIONS_TABLE_STRUCT_FIELD(Configuration, debug_log_level)},
{"DeduplicationPeriod", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, dedup_period_ms)},
{"SnifferSysid", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, sniffer_sysid)},
{"TcpServerPort", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, tcp_port)},
{"ReportStats", false, ConfFile::parse_bool, OPTIONS_TABLE_STRUCT_FIELD(Configuration, report_msg_statistics)},
{"DebugLogLevel", false, parse_log_level, OPTIONS_TABLE_STRUCT_FIELD(Configuration, debug_log_level)},
{"DeduplicationPeriod", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, dedup_period_ms)},
{"SnifferSysid", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(Configuration, sniffer_sysid)},
{"CommandPipePath", false, ConfFile::parse_stdstring, OPTIONS_TABLE_STRUCT_FIELD(Configuration, command_pipe_path)},
{}
};
// clang-format on
Expand Down
33 changes: 25 additions & 8 deletions src/mainloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void Mainloop::handle_tcp_connection()
delete tcp;
}

void Mainloop::handle_us_command()
void Mainloop::handle_command_pipe()
{
char buf[1024];
auto bytes = read(g_commands_fd, buf, sizeof(buf));
Expand Down Expand Up @@ -392,7 +392,13 @@ int Mainloop::loop()
}

if(events[i].data.ptr == &g_commands_fd){
handle_us_command();
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
// Reopen the command pipe if there was an error in reading it
clean_command_pipe();
open_command_pipe(command_pipe_path);
}

handle_command_pipe();
continue;
}

Expand Down Expand Up @@ -438,6 +444,7 @@ int Mainloop::loop()
}

clear_endpoints();
clean_command_pipe();

// free all remaning Timeouts
while (_timeouts != nullptr) {
Expand Down Expand Up @@ -547,8 +554,10 @@ bool Mainloop::add_endpoints(const Configuration &config)
}

// Create command server endpoint
// TODO add config (ie address of the socket and enable/disable option)
g_commands_fd = command_us_open("/tmp/mavlink_router_pipe");
if (!config.command_pipe_path.empty()) {
command_pipe_path = config.command_pipe_path;
g_commands_fd = open_command_pipe(command_pipe_path);
}

// Create Log endpoint
auto conf = config.log_config;
Expand Down Expand Up @@ -633,21 +642,29 @@ int Mainloop::tcp_open(unsigned long tcp_port)
return fd;
}

int Mainloop::command_us_open(const std::string& address)
int Mainloop::open_command_pipe(const std::string& address)
{
mkfifo(address.c_str(), 0600);

int fd = ::open(address.c_str(), O_RDWR);
int fd = ::open(address.c_str(), O_RDWR | O_CLOEXEC | O_NONBLOCK);
if (fd < 0) {
log_error("Failed to open Commands Server: %s", address.c_str());
log_error("Failed to open Command Server: %s", address.c_str());
} else {
add_fd(fd, &g_commands_fd, EPOLLIN);
log_info("Opened Commands Server [%d] at %s", fd, address.c_str());
log_info("Opened Command Server [%d] at %s", fd, address.c_str());
}

return fd;
}

void Mainloop::clean_command_pipe() {
// clean command pipe after use
if (g_commands_fd != -1) {
::close(g_commands_fd);
g_commands_fd = -1;
}
::remove(command_pipe_path.c_str());
}

Timeout *Mainloop::add_timeout(uint32_t timeout_msec, std::function<bool(void *)> cb,
const void *data)
Expand Down
9 changes: 6 additions & 3 deletions src/mainloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct Configuration {
Log::Level debug_log_level{Log::Level::INFO}; ///< conf "DebugLogLevel" or CLI "debug-log-level"
Log::Backend log_backend{Log::Backend::STDERR}; ///< CLI "syslog"
unsigned long dedup_period_ms; ///< conf "DeduplicationPeriod"
std::string command_pipe_path{""};

LogOptions log_config; ///< logging is in General config section, but internally an endpoint
std::vector<UartEndpointConfig> uart_configs;
Expand All @@ -60,7 +61,7 @@ class Mainloop {
int loop();
void route_msg(struct buffer *buf);
void handle_tcp_connection();
void handle_us_command();
void handle_command_pipe();
int write_msg(const std::shared_ptr<Endpoint> &e, const struct buffer *buf) const;
void process_tcp_hangups();
Timeout *add_timeout(uint32_t timeout_msec, std::function<bool(void *)> cb, const void *data);
Expand Down Expand Up @@ -113,7 +114,8 @@ class Mainloop {

std::vector<std::shared_ptr<Endpoint>> g_endpoints{};
int g_tcp_fd = -1; ///< for TCP server
int g_commands_fd = -1; ///< for the Unix socket commands endpoint
int g_commands_fd = -1; ///< for the named pipe commands endpoint
std::string command_pipe_path = "";
std::shared_ptr<LogEndpoint> _log_endpoint{nullptr};

Timeout *_timeouts = nullptr;
Expand All @@ -127,7 +129,8 @@ class Mainloop {
int _retcode;

int tcp_open(unsigned long tcp_port);
int command_us_open(const std::string& address);
int open_command_pipe(const std::string& address);
void clean_command_pipe();
void _del_timeouts();
bool _retry_timeout_cb(void *data);
bool _log_aggregate_timeout(void *data);
Expand Down

0 comments on commit 7e0923e

Please sign in to comment.