From 02bfdad38e9a168af9eadf80afeff046779bebbc Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Tue, 2 Feb 2021 16:22:19 +0100 Subject: [PATCH 1/8] Add a deduplication utility for raw buffers --- Makefile.am | 1 + src/mavlink-router/dedup.cpp | 89 ++++++++++++++++++++++++++++++++++++ src/mavlink-router/dedup.h | 42 +++++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 src/mavlink-router/dedup.cpp create mode 100644 src/mavlink-router/dedup.h diff --git a/Makefile.am b/Makefile.am index 47fdd92b..9a478945 100644 --- a/Makefile.am +++ b/Makefile.am @@ -139,6 +139,7 @@ mavlink_routerd_SOURCES = \ src/common/conf_file.cpp \ src/common/conf_file.h \ src/common/dbg.h \ + src/mavlink-router/dedup.cpp \ src/common/mavlink.h \ src/mavlink-router/endpoint.cpp \ src/mavlink-router/endpoint.h \ diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp new file mode 100644 index 00000000..4823d9f8 --- /dev/null +++ b/src/mavlink-router/dedup.cpp @@ -0,0 +1,89 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "dedup.h" + +#include +#include +#include +#include + +class DedupImpl { + using hash_t = uint64_t; + using time_t = uint64_t; + +public: + + bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) + { + bool new_packet_hash = true; + using namespace std::chrono; + time_t timestamp = duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + // pop data from front queue, delete corresponding data from multiset + while (_time_hash_queue.size() > 0 && _time_hash_queue.front().first > timestamp + dedup_period_ms) { + hash_t hash_to_delete = _time_hash_queue.front().second; + _multiset.erase(_multiset.find(hash_to_delete)); // NOTE: don't call erase on key, it will delete all + _time_hash_queue.pop(); + } + + // hash buffer + // TODO: with C++17 use a string_view instead, or use a custom hash function + _hash_buffer.assign((const char*)buffer, (uint64_t)size); + hash_t hash = std::hash{}(_hash_buffer); + + if (_multiset.find(hash) != _multiset.end()) { + new_packet_hash = false; + } + + // add hash and timestamp to back of queue, and add another copy of hash to multiset + _multiset.insert(hash); + _time_hash_queue.emplace(timestamp, hash); + + return new_packet_hash; + } + +private: + std::queue> _time_hash_queue; + std::unordered_multiset _multiset; + std::string _hash_buffer; +}; + +Dedup::Dedup(uint32_t dedup_period_ms) : _dedup_period_ms(dedup_period_ms), _impl(new DedupImpl()) +{ +} + +Dedup::~Dedup() +{ +} + +void Dedup::set_dedup_period(uint32_t dedup_period_ms) +{ + _dedup_period_ms = dedup_period_ms; +} + +Dedup::PacketStatus Dedup::add_check_packet(const uint8_t* buffer, uint32_t size) +{ + // short circuit if disabled + if (_dedup_period_ms == 0) { + return PacketStatus::NEW_PACKET_OR_TIMED_OUT; + } + + if (_impl->add_check_packet(buffer, size, _dedup_period_ms)) { + return PacketStatus::NEW_PACKET_OR_TIMED_OUT; + } + return PacketStatus::ALREADY_EXISTS_IN_BUFFER; +} diff --git a/src/mavlink-router/dedup.h b/src/mavlink-router/dedup.h new file mode 100644 index 00000000..692073f5 --- /dev/null +++ b/src/mavlink-router/dedup.h @@ -0,0 +1,42 @@ +/* + * This file is part of the MAVLink Router project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +#pragma once + +#include + +class DedupImpl; +class Dedup { +public: + + enum class PacketStatus { + NEW_PACKET_OR_TIMED_OUT, + ALREADY_EXISTS_IN_BUFFER + }; + + Dedup(uint32_t dedup_period_ms = 0); + ~Dedup(); + + void set_dedup_period(uint32_t dedup_period_ms); + + PacketStatus add_check_packet(const uint8_t* buffer, uint32_t size); +private: + + uint32_t _dedup_period_ms; + std::unique_ptr _impl; +}; From 85b8a980d6715301595584660cb8e1ebdae2d250 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Wed, 3 Feb 2021 15:35:56 +0100 Subject: [PATCH 2/8] Insert deduplication into endpoint handling --- Makefile.am | 2 ++ src/mavlink-router/endpoint.cpp | 9 ++++++++- src/mavlink-router/endpoint.h | 3 +++ src/mavlink-router/mainloop.cpp | 10 ++++++++++ src/mavlink-router/mainloop.h | 5 +++++ 5 files changed, 28 insertions(+), 1 deletion(-) diff --git a/Makefile.am b/Makefile.am index 9a478945..99a02876 100644 --- a/Makefile.am +++ b/Makefile.am @@ -214,6 +214,8 @@ mainloop_test_SOURCES = \ src/mavlink-router/binlog.cpp \ src/mavlink-router/binlog.h \ src/mavlink-router/comm.h \ + src/mavlink-router/dedup.h \ + src/mavlink-router/dedup.cpp \ src/mavlink-router/endpoint.h \ src/mavlink-router/endpoint.cpp \ src/mavlink-router/endpoint.h \ diff --git a/src/mavlink-router/endpoint.cpp b/src/mavlink-router/endpoint.cpp index f2845f4d..62ddf8d0 100644 --- a/src/mavlink-router/endpoint.cpp +++ b/src/mavlink-router/endpoint.cpp @@ -81,7 +81,7 @@ int Endpoint::handle_read() while ((r = read_msg(&buf, &target_sysid, &target_compid, &src_sysid, &src_compid, &msg_id)) > 0) { - if (allowed_by_filter(msg_id) && allowed_by_dropout()) + if (allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) Mainloop::get_instance().route_msg(&buf, target_sysid, target_compid, src_sysid, src_compid, msg_id); } @@ -374,6 +374,13 @@ bool Endpoint::allowed_by_dropout() return true; } +bool Endpoint::allowed_by_dedup(const buffer* buf) +{ + if (Mainloop::get_instance().add_check_dedup(buf)) { + return false; + } + return true; +} void Endpoint::postprocess_msg(int target_sysid, int target_compid, uint8_t src_sysid, uint8_t src_compid, uint32_t msg_id) diff --git a/src/mavlink-router/endpoint.h b/src/mavlink-router/endpoint.h index 496f9713..7c2dcc81 100644 --- a/src/mavlink-router/endpoint.h +++ b/src/mavlink-router/endpoint.h @@ -103,9 +103,12 @@ class Endpoint : public Pollable { bool allowed_by_filter(uint32_t msg_id); void add_message_to_filter(uint32_t msg_id) { _message_filter.push_back(msg_id); } void add_message_to_nodelay(uint32_t msg_id) { _message_nodelay.push_back(msg_id); } + bool allowed_by_dropout(); void set_dropout_percentage (uint32_t dropout_percentage) { _dropout_percentage = dropout_percentage; } + bool allowed_by_dedup(const buffer* buf); + void start_expire_timer(); void reset_expire_timer(); diff --git a/src/mavlink-router/mainloop.cpp b/src/mavlink-router/mainloop.cpp index d3aeb37c..65c1e9bf 100644 --- a/src/mavlink-router/mainloop.cpp +++ b/src/mavlink-router/mainloop.cpp @@ -394,6 +394,16 @@ static bool _print_statistics_timeout_cb(void *data) return true; } +bool Mainloop::add_check_dedup(const buffer* buf) +{ + Dedup::PacketStatus status = _dedup.add_check_packet(buf->data, buf->len); + if (status == Dedup::PacketStatus::NEW_PACKET_OR_TIMED_OUT) { + return true; + } + return false; +} + + bool Mainloop::remove_dynamic_endpoint(Endpoint *endpoint) { if (!endpoint) { diff --git a/src/mavlink-router/mainloop.h b/src/mavlink-router/mainloop.h index 308eb501..c742899d 100644 --- a/src/mavlink-router/mainloop.h +++ b/src/mavlink-router/mainloop.h @@ -25,6 +25,7 @@ #include "binlog.h" #include "comm.h" +#include "dedup.h" #include "endpoint.h" #include "timeout.h" #include "ulog.h" @@ -86,6 +87,8 @@ class Mainloop { bool remove_dynamic_endpoint(const dynamic_command& command); bool remove_dynamic_endpoint(Endpoint *endpoint); + bool add_check_dedup(const buffer* buf); + void print_statistics(); int epollfd = -1; @@ -135,6 +138,8 @@ class Mainloop { Timeout *_timeouts = nullptr; + Dedup _dedup; + std::atomic _should_exit {false}; struct { From 23e81ad71bcf3ad15cd26eb77e5b4d5778e6823c Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Thu, 4 Feb 2021 09:38:40 +0100 Subject: [PATCH 3/8] Small improvements on dedup logic --- src/mavlink-router/dedup.cpp | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp index 4823d9f8..63ae7fd6 100644 --- a/src/mavlink-router/dedup.cpp +++ b/src/mavlink-router/dedup.cpp @@ -24,19 +24,18 @@ class DedupImpl { using hash_t = uint64_t; - using time_t = uint64_t; + using time_t = uint32_t; public: bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) { - bool new_packet_hash = true; using namespace std::chrono; - time_t timestamp = duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + time_t timestamp = duration_cast(std::chrono::system_clock::now() - _start_time).count(); // pop data from front queue, delete corresponding data from multiset while (_time_hash_queue.size() > 0 && _time_hash_queue.front().first > timestamp + dedup_period_ms) { hash_t hash_to_delete = _time_hash_queue.front().second; - _multiset.erase(_multiset.find(hash_to_delete)); // NOTE: don't call erase on key, it will delete all + _packet_hash_set.erase(_packet_hash_set.find(hash_to_delete)); _time_hash_queue.pop(); } @@ -45,20 +44,24 @@ class DedupImpl { _hash_buffer.assign((const char*)buffer, (uint64_t)size); hash_t hash = std::hash{}(_hash_buffer); - if (_multiset.find(hash) != _multiset.end()) { + bool new_packet_hash = true; + if (_packet_hash_set.find(hash) == _packet_hash_set.end()) { + // add hash and timestamp to back of queue, and add hash to multiset + _packet_hash_set.insert(hash); + _time_hash_queue.emplace(timestamp, hash); + } else { new_packet_hash = false; } - // add hash and timestamp to back of queue, and add another copy of hash to multiset - _multiset.insert(hash); - _time_hash_queue.emplace(timestamp, hash); return new_packet_hash; } private: + const std::chrono::time_point _start_time; + std::queue> _time_hash_queue; - std::unordered_multiset _multiset; + std::unordered_set _packet_hash_set; std::string _hash_buffer; }; From e57b024aae9130fcb222b784dab861d2b19debf5 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Thu, 11 Feb 2021 17:48:07 +0100 Subject: [PATCH 4/8] Fix dedup bool logic --- src/mavlink-router/endpoint.cpp | 5 +---- src/mavlink-router/mainloop.cpp | 6 +----- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/mavlink-router/endpoint.cpp b/src/mavlink-router/endpoint.cpp index 62ddf8d0..fabe8787 100644 --- a/src/mavlink-router/endpoint.cpp +++ b/src/mavlink-router/endpoint.cpp @@ -376,10 +376,7 @@ bool Endpoint::allowed_by_dropout() bool Endpoint::allowed_by_dedup(const buffer* buf) { - if (Mainloop::get_instance().add_check_dedup(buf)) { - return false; - } - return true; + return Mainloop::get_instance().add_check_dedup(buf); } void Endpoint::postprocess_msg(int target_sysid, int target_compid, uint8_t src_sysid, diff --git a/src/mavlink-router/mainloop.cpp b/src/mavlink-router/mainloop.cpp index 65c1e9bf..7e30bb99 100644 --- a/src/mavlink-router/mainloop.cpp +++ b/src/mavlink-router/mainloop.cpp @@ -396,11 +396,7 @@ static bool _print_statistics_timeout_cb(void *data) bool Mainloop::add_check_dedup(const buffer* buf) { - Dedup::PacketStatus status = _dedup.add_check_packet(buf->data, buf->len); - if (status == Dedup::PacketStatus::NEW_PACKET_OR_TIMED_OUT) { - return true; - } - return false; + return _dedup.add_check_packet(buf->data, buf->len) == Dedup::PacketStatus::NEW_PACKET_OR_TIMED_OUT; } From bf9ae79a2cf99d990ab434972448686d9cb86714 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Wed, 17 Mar 2021 08:13:49 +0100 Subject: [PATCH 5/8] Fix dedup queue pop condition --- src/mavlink-router/dedup.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp index 63ae7fd6..64d33086 100644 --- a/src/mavlink-router/dedup.cpp +++ b/src/mavlink-router/dedup.cpp @@ -33,7 +33,7 @@ class DedupImpl { using namespace std::chrono; time_t timestamp = duration_cast(std::chrono::system_clock::now() - _start_time).count(); // pop data from front queue, delete corresponding data from multiset - while (_time_hash_queue.size() > 0 && _time_hash_queue.front().first > timestamp + dedup_period_ms) { + while (_time_hash_queue.size() > 0 && timestamp > _time_hash_queue.front().first + dedup_period_ms) { hash_t hash_to_delete = _time_hash_queue.front().second; _packet_hash_set.erase(_packet_hash_set.find(hash_to_delete)); _time_hash_queue.pop(); From 5a20c3a7e3c7c1e79f2e3ae6c26335ff2276d8b3 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Wed, 17 Mar 2021 08:17:03 +0100 Subject: [PATCH 6/8] Correctly initialize _start_time --- src/mavlink-router/dedup.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp index 64d33086..f64bffeb 100644 --- a/src/mavlink-router/dedup.cpp +++ b/src/mavlink-router/dedup.cpp @@ -25,7 +25,9 @@ class DedupImpl { using hash_t = uint64_t; using time_t = uint32_t; - + DedupImpl() : _start_time(std::chrono::system_clock::now()) + { + } public: bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) From 66526643bbbb96043ca965fa21f82ddfb452b275 Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Wed, 17 Mar 2021 08:57:09 +0100 Subject: [PATCH 7/8] Only track source sys/comp ids *after* filtering/dedup --- src/mavlink-router/dedup.cpp | 2 +- src/mavlink-router/endpoint.cpp | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/mavlink-router/dedup.cpp b/src/mavlink-router/dedup.cpp index f64bffeb..031f9bbf 100644 --- a/src/mavlink-router/dedup.cpp +++ b/src/mavlink-router/dedup.cpp @@ -25,10 +25,10 @@ class DedupImpl { using hash_t = uint64_t; using time_t = uint32_t; +public: DedupImpl() : _start_time(std::chrono::system_clock::now()) { } -public: bool add_check_packet(const uint8_t* buffer, uint32_t size, uint32_t dedup_period_ms) { diff --git a/src/mavlink-router/endpoint.cpp b/src/mavlink-router/endpoint.cpp index fabe8787..df4cab64 100644 --- a/src/mavlink-router/endpoint.cpp +++ b/src/mavlink-router/endpoint.cpp @@ -81,9 +81,13 @@ int Endpoint::handle_read() while ((r = read_msg(&buf, &target_sysid, &target_compid, &src_sysid, &src_compid, &msg_id)) > 0) { - if (allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) + if (allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) { + if (r == ReadOk) { + _add_sys_comp_id(((uint16_t)src_sysid << 8) | src_compid); + } Mainloop::get_instance().route_msg(&buf, target_sysid, target_compid, src_sysid, src_compid, msg_id); + } } return r; @@ -234,7 +238,6 @@ int Endpoint::read_msg(struct buffer *pbuf, int *target_sysid, int *target_compi _stat.read.crc_error_bytes += expected_size; return 0; } - _add_sys_comp_id(((uint16_t)*src_sysid << 8) | *src_compid); } _stat.read.handled++; From 437bb983d280bcd13146eb750284dc5b469c5add Mon Sep 17 00:00:00 2001 From: Julian Kent Date: Wed, 17 Mar 2021 09:01:39 +0100 Subject: [PATCH 8/8] If there is a CRC error still process the remaining messages --- src/mavlink-router/endpoint.cpp | 4 ++-- src/mavlink-router/endpoint.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mavlink-router/endpoint.cpp b/src/mavlink-router/endpoint.cpp index df4cab64..e6d3d2d9 100644 --- a/src/mavlink-router/endpoint.cpp +++ b/src/mavlink-router/endpoint.cpp @@ -81,7 +81,7 @@ int Endpoint::handle_read() while ((r = read_msg(&buf, &target_sysid, &target_compid, &src_sysid, &src_compid, &msg_id)) > 0) { - if (allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) { + if (r != CrcError && allowed_by_filter(msg_id) && allowed_by_dropout() && allowed_by_dedup(&buf)) { if (r == ReadOk) { _add_sys_comp_id(((uint16_t)src_sysid << 8) | src_compid); } @@ -236,7 +236,7 @@ int Endpoint::read_msg(struct buffer *pbuf, int *target_sysid, int *target_compi if (!_check_crc(msg_entry)) { _stat.read.crc_error++; _stat.read.crc_error_bytes += expected_size; - return 0; + return CrcError; } } diff --git a/src/mavlink-router/endpoint.h b/src/mavlink-router/endpoint.h index 7c2dcc81..710f3287 100644 --- a/src/mavlink-router/endpoint.h +++ b/src/mavlink-router/endpoint.h @@ -74,6 +74,7 @@ class Endpoint : public Pollable { enum read_msg_result { ReadOk = 1, ReadUnkownMsg, + CrcError }; Endpoint(const std::string& name);