Skip to content

Commit

Permalink
Add Receiver endpoint mode
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanoColli committed Dec 9, 2024
1 parent 213b2fb commit 82f7b1e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
26 changes: 22 additions & 4 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,7 @@ bool UdpEndpoint::setup(UdpEndpointConfig conf)
return false;
}

this->_mode = conf.mode;
if (!this->open(conf.address.c_str(), conf.port, conf.mode)) {
log_error("Could not open %s:%ld", conf.address.c_str(), conf.port);
return false;
Expand Down Expand Up @@ -1184,7 +1185,7 @@ int UdpEndpoint::open_ipv6(const char *ip, unsigned long port, UdpEndpointConfig
sockaddr6.sin6_port = htons(port);

/* multicast address needs to listen to all, but "filter" incoming packets */
if (mode == UdpEndpointConfig::Mode::Server && ipv6_is_multicast(ip_str)) {
if ((mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) && ipv6_is_multicast(ip_str)) {
sockaddr6.sin6_addr = in6addr_any;

struct ipv6_mreq group;
Expand All @@ -1204,7 +1205,7 @@ int UdpEndpoint::open_ipv6(const char *ip, unsigned long port, UdpEndpointConfig
sockaddr6.sin6_scope_id = ipv6_get_scope_id(ip_str);
}

if (mode == UdpEndpointConfig::Mode::Server) {
if (mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) {
if (bind(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)) < 0) {
log_error("Error binding IPv6 socket for [%s]:%lu (%m)", ip_str, port);
goto fail;
Expand Down Expand Up @@ -1236,7 +1237,7 @@ int UdpEndpoint::open_ipv4(const char *ip, unsigned long port, UdpEndpointConfig
sockaddr.sin_addr.s_addr = inet_addr(ip);
sockaddr.sin_port = htons(port);

if (mode == UdpEndpointConfig::Mode::Server) {
if (mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) {
if (bind(fd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) < 0) {
log_error("Error binding IPv4 socket for %s:%lu (%m)", ip, port);
goto fail;
Expand Down Expand Up @@ -1292,6 +1293,8 @@ bool UdpEndpoint::open(const char *ip, unsigned long port, UdpEndpointConfig::Mo

if (mode == UdpEndpointConfig::Mode::Server) {
log_info("Opened UDP Server [%d]%s: %s:%lu", fd, _name.c_str(), ip, port);
} else if (mode == UdpEndpointConfig::Mode::Receiver) {
log_info("Opened UDP Receiver [%d]%s: %s:%lu", fd, _name.c_str(), ip, port);
} else {
log_info("Opened UDP Client [%d]%s: %s:%lu", fd, _name.c_str(), ip, port);
}
Expand Down Expand Up @@ -1484,6 +1487,8 @@ int UdpEndpoint::parse_udp_mode(const char *val, size_t val_len, void *storage,
*udp_mode = UdpEndpointConfig::Mode::Server;
} else if (memcaseeq(val, val_len, "server", sizeof("server") - 1)) {
*udp_mode = UdpEndpointConfig::Mode::Server;
} else if (memcaseeq(val, val_len, "receiver", sizeof("receiver") - 1)) {
*udp_mode = UdpEndpointConfig::Mode::Receiver;
} else {
log_error("Unknown 'mode' key: %.*s", (int)val_len, val);
return -EINVAL;
Expand Down Expand Up @@ -1514,13 +1519,26 @@ bool UdpEndpoint::validate_config(const UdpEndpointConfig &config)
}

if (config.mode != UdpEndpointConfig::Mode::Client
&& config.mode != UdpEndpointConfig::Mode::Server) {
&& config.mode != UdpEndpointConfig::Mode::Server
&& config.mode != UdpEndpointConfig::Mode::Receiver) {
return false;
}

return true;
}

Endpoint::AcceptState UdpEndpoint::accept_msg(const struct buffer *pbuf) const
{
// reject when UDP endpoint is in receiver mode
if (this->_mode == UdpEndpointConfig::Mode::Receiver) {
log_trace("Endpoint [%d]%s: in Receiver mode, not sending back any msg", fd, _name.c_str());
return Endpoint::AcceptState::Filtered;
}

// otherwise: refer to standard accept rules
return Endpoint::accept_msg(pbuf);
}

TcpEndpoint::TcpEndpoint(std::string name)
: Endpoint{ENDPOINT_TYPE_TCP, std::move(name)}
{
Expand Down
9 changes: 6 additions & 3 deletions src/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct UartEndpointConfig {
};

struct UdpEndpointConfig {
enum class Mode { Undefined = 0, Server, Client };
enum class Mode { Undefined = 0, Server, Client, Receiver };

std::string name;
std::string address;
Expand Down Expand Up @@ -184,7 +184,7 @@ class Endpoint : public Pollable {
return has_sys_comp_id(sys_comp_id);
}

AcceptState accept_msg(const struct buffer *pbuf) const;
AcceptState virtual accept_msg(const struct buffer *pbuf) const;

void filter_add_allowed_out_msg_id(uint32_t msg_id)
{
Expand Down Expand Up @@ -346,6 +346,8 @@ class UdpEndpoint : public Endpoint {
static int parse_udp_mode(const char *val, size_t val_len, void *storage, size_t storage_len);
static bool validate_config(const UdpEndpointConfig &config);

Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const override;

void add_no_coalesce_msg_id(uint32_t msg_id)
{
_coalesce_nodelay.insert(msg_id);
Expand Down Expand Up @@ -375,6 +377,7 @@ class UdpEndpoint : public Endpoint {
unsigned int _coalesce_bytes = 0UL; // max coalescence size
unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together

UdpEndpointConfig::Mode _mode = UdpEndpointConfig::Mode::Undefined;
private:
bool is_ipv6;
struct sockaddr_in sockaddr;
Expand All @@ -393,7 +396,7 @@ class TcpEndpoint : public Endpoint {
bool is_valid() override { return _valid; };
bool is_critical() override { return false; };

Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const;
Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const override;

int accept(int listener_fd); ///< accept incoming connection
bool setup(TcpEndpointConfig conf); ///< open connection and apply config
Expand Down
26 changes: 24 additions & 2 deletions src/mainloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,37 @@ void Mainloop::handle_command_pipe()
log_trace("Malformed port in add command");
return;
}
UdpEndpointConfig::Mode mode = a[5] == "server" ? UdpEndpointConfig::Mode::Server : UdpEndpointConfig::Mode::Client;
auto to_create = std::find_if(g_endpoints.begin(), g_endpoints.end(),
[&a](const std::shared_ptr<Endpoint> e) {return e->get_name() == a[2];});
if (to_create != g_endpoints.end()) {
log_error("Endpoint named \"%s\" already exists, please choose another name", a[2].c_str());
return;
}

// Command to UDP endpoint configuration
UdpEndpointConfig conf{};
conf.mode = mode;

conf.name = a[2];
conf.address = a[3];
conf.port = port;

// UDP endpoint mode
if(a[5] == "server" || a[5] == "Server") {
conf.mode = UdpEndpointConfig::Mode::Server;
}
else if (a[5] == "eavesdropping" || a[5] == "Eavesdropping") {
conf.mode = UdpEndpointConfig::Mode::Server;
}
else if (a[5] == "receiver" || a[5] == "Receiver") {
conf.mode = UdpEndpointConfig::Mode::Receiver;
}
else if (a[5] == "client" || a[5] == "Client") {
conf.mode = UdpEndpointConfig::Mode::Client;
}
else {
conf.mode = UdpEndpointConfig::Mode::Undefined;
}

if (a.size() > 6) { // group name provided
conf.group = a[6] == "NULL" ? "" : a[6];
}
Expand Down

0 comments on commit 82f7b1e

Please sign in to comment.