Skip to content

Commit

Permalink
Add packets coalescing on UPD endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanoColli committed Nov 21, 2024
1 parent c20337b commit 4f2cfd9
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ tags
# Jetbrains IDEs
.idea/
compile_commands.json

# VSCode IDE
.vscode/
99 changes: 84 additions & 15 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/timerfd.h>

#include <common/log.h>
#include <common/util.h>
Expand Down Expand Up @@ -95,6 +96,9 @@ const ConfFile::OptionsTable UdpEndpoint::option_table[] = {
{"AllowSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, allow_src_sys_in)},
{"BlockSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, block_src_sys_in)},
{"group", false, ConfFile::parse_stdstring, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, group)},
{"CoalesceBytes", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_bytes)},
{"CoalesceMs", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_ms)},
{"CoalesceNoDelay", false, ConfFile::parse_uint32_vector, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_nodelay)},
{}
};

Expand Down Expand Up @@ -1067,13 +1071,25 @@ UdpEndpoint::UdpEndpoint(std::string name)
{
bzero(&sockaddr, sizeof(sockaddr));
bzero(&sockaddr6, sizeof(sockaddr6));
_write_schedule_timer = Mainloop::get_instance().add_timeout(
_coalesce_ms, [this](void*)
{
flush_pending_msgs();
return true;
},
this);
}

UdpEndpoint::~UdpEndpoint()
{
if (nomessage_timeout) {
Mainloop::get_instance().del_timeout(nomessage_timeout);
}

if (_write_schedule_timer) {
Mainloop::get_instance().del_timeout(_write_schedule_timer);
_write_schedule_timer = nullptr;
}
}

bool UdpEndpoint::setup(UdpEndpointConfig conf)
Expand Down Expand Up @@ -1127,6 +1143,13 @@ bool UdpEndpoint::setup(UdpEndpointConfig conf)

this->_group_name = conf.group;

// Coalescing configs
this->_coalesce_bytes = conf.coalesce_bytes;
this->_coalesce_ms = conf.coalesce_ms;
for (auto msg_id : conf.coalesce_nodelay) {
this->add_no_coalesce_msg_id(msg_id);
}

return true;
}

Expand Down Expand Up @@ -1325,6 +1348,62 @@ ssize_t UdpEndpoint::_read_msg(uint8_t *buf, size_t len)

int UdpEndpoint::write_msg(const struct buffer *pbuf)
{
// We cannot add the new message to the coalescence or to the tx buffer -> send immediately what is scheduled
if (tx_buf.len > 0 && tx_buf.len + pbuf->len > std::min(_coalesce_bytes, TX_BUF_MAX_SIZE)) {
log_trace("New message would overflow the coalescence or the transmission buffer, sending the pending ones before");
flush_pending_msgs();
}

// Append new data in the tx buffer
memcpy(&tx_buf.data[tx_buf.len], pbuf->data, pbuf->len);
tx_buf.len += pbuf->len;

int ret = pbuf->len;

if (_coalesce_bytes == 0 ||
_coalesce_ms == 0 ||
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end() ||
pbuf->len > _coalesce_bytes) {
// Coalescing disabled, or high priority message, or new message larger than the coalescence size
ret = flush_pending_msgs();
}
else {
// Coalescing enabled
// Start coalescing timer (if not already running)
_schedule_write();

struct itimerspec ts;
timerfd_gettime(_write_schedule_timer->fd, &ts);
double timer_seconds = (double) _coalesce_ms/1e3d - (ts.it_value.tv_sec + ts.it_value.tv_nsec/1e9d);

log_trace("Coalescence state: size=%u/%u, timeout=%.2f/%.2f, is high prio message=%d",
tx_buf.len,
std::min(_coalesce_bytes, TX_BUF_MAX_SIZE),
timer_seconds,
_coalesce_ms/1e3d,
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end());
}
return ret;
}

void UdpEndpoint::_schedule_write()
{
if (!_write_scheduled) {
Mainloop::get_instance().mod_timeout(_write_schedule_timer, _coalesce_ms);
_write_scheduled = true;
}
}

int UdpEndpoint::flush_pending_msgs()
{
Mainloop::get_instance().mod_timeout(_write_schedule_timer, 0);
_write_scheduled = false;

if (tx_buf.len == 0) {
log_trace("No data in tx buffer, skipping write");
return 0;
}

struct sockaddr *sock;
socklen_t addrlen;

Expand All @@ -1333,11 +1412,6 @@ int UdpEndpoint::write_msg(const struct buffer *pbuf)
return -EINVAL;
}

/* TODO: send any pending data */
if (tx_buf.len > 0) {
;
}

bool sock_connected = false;
if (this->is_ipv6) {
addrlen = sizeof(sockaddr6);
Expand All @@ -1354,7 +1428,7 @@ int UdpEndpoint::write_msg(const struct buffer *pbuf)
return 0;
}

ssize_t r = ::sendto(fd, pbuf->data, pbuf->len, 0, sock, addrlen);
ssize_t r = ::sendto(fd, tx_buf.data, tx_buf.len, 0, sock, addrlen);
if (r == -1) {
if (errno != EAGAIN && errno != ECONNREFUSED && errno != ENETUNREACH) {
log_error("UDP %s: Error sending udp packet (%m)", _name.c_str());
Expand All @@ -1363,16 +1437,11 @@ int UdpEndpoint::write_msg(const struct buffer *pbuf)
};

_stat.write.total++;
_stat.write.bytes += pbuf->len;
_stat.write.bytes += r;

/* Incomplete packet, we warn and discard the rest */
if (r != (ssize_t)pbuf->len) {
_incomplete_msgs++;
log_debug("UDP %s: Discarding packet, incomplete write %zd but len=%u",
_name.c_str(),
r,
pbuf->len);
}
tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r);
// memcpy isn't safe for overlapping regions
memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len);

log_trace("UDP [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r);

Expand Down
21 changes: 20 additions & 1 deletion src/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <utility>
#include <vector>
#include <set>

#include "comm.h"
#include "pollable.h"
Expand Down Expand Up @@ -76,6 +77,9 @@ struct UdpEndpointConfig {
std::vector<uint8_t> allow_src_sys_in;
std::vector<uint8_t> block_src_sys_in;
std::string group;
unsigned long coalesce_bytes;
unsigned long coalesce_ms;
std::vector<uint32_t> coalesce_nodelay;
};

struct TcpEndpointConfig {
Expand Down Expand Up @@ -328,7 +332,7 @@ class UdpEndpoint : public Endpoint {
~UdpEndpoint() override;

int write_msg(const struct buffer *pbuf) override;
int flush_pending_msgs() override { return -ENOSYS; }
int flush_pending_msgs() override;

bool setup(UdpEndpointConfig config); ///< open socket and apply config

Expand All @@ -337,6 +341,11 @@ class UdpEndpoint : public Endpoint {
static int parse_udp_mode(const char *val, size_t val_len, void *storage, size_t storage_len);
static bool validate_config(const UdpEndpointConfig &config);

void add_no_coalesce_msg_id(uint32_t msg_id)
{
_coalesce_nodelay.insert(msg_id);
}

protected:
bool open(const char *ip, unsigned long port,
UdpEndpointConfig::Mode mode = UdpEndpointConfig::Mode::Client);
Expand All @@ -353,10 +362,20 @@ class UdpEndpoint : public Endpoint {
Timeout *nomessage_timeout = nullptr;
bool _nomessage_timeout_cb(void *data);

void _schedule_write();
bool _write_scheduled = false;

Timeout *_write_schedule_timer = nullptr;

unsigned int _coalesce_bytes = 0UL; // max coalescence size
unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together

private:
bool is_ipv6;
struct sockaddr_in sockaddr;
struct sockaddr_in6 sockaddr6;
std::set<uint32_t> _coalesce_nodelay{}; // immediately send if a mavlink msg_id is in this set

};

class TcpEndpoint : public Endpoint {
Expand Down

0 comments on commit 4f2cfd9

Please sign in to comment.