diff --git a/.clang-format b/.clang-format index 1008f84..2716f15 100644 --- a/.clang-format +++ b/.clang-format @@ -18,6 +18,7 @@ BreakBeforeBraces: Custom BraceWrapping: AfterCaseLabel: false AfterClass: true + AfterStruct: true AfterControlStatement: Always AfterEnum: true AfterFunction: true diff --git a/CMakeLists.txt b/CMakeLists.txt index 675b9b3..a84b34f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,11 +20,11 @@ if (NOT WAITFREEQUEUE_DISABLE_TESTS) add_executable( test_${PROJECT_NAME} - WaitFreeMPSCQueue.h - test/TestWaitFreeMPSCQueue.cpp - test/TestHelpers.h - WaitFreeSPSCQueue.h - test/TestWaitFreeSPSCQueue.cpp + mpmc_queue.h + spsc_queue.h + test/test_mpsc_queue.cpp + test/test_helpers.h + test/test_spsc_queue.cpp ) target_include_directories( @@ -42,6 +42,6 @@ if (NOT WAITFREEQUEUE_DISABLE_TESTS) endif () install(FILES - WaitFreeMPSCQueue.h - WaitFreeSPSCQueue.h + mpmc_queue.h + spsc_queue.h DESTINATION include/waitfreequeue) diff --git a/README.md b/README.md index d26de68..19f82e5 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,28 @@ [![main](https://github.com/marcusspangenberg/waitfreequeue/actions/workflows/cmake-multi-platform.yml/badge.svg?branch=main)](https://github.com/marcusspangenberg/waitfreequeue/actions/workflows/cmake-multi-platform.yml) # waitfreequeue -Single header, wait-free, multiple producer, single consumer queue for C++. + +Single header, wait-free queues for C++. +* mpsc_queue.h - Multiple producer, single consumer queue +* spsc_queue.h - Single producer, single consumer queue with queryable size + +Both queue types are currently being used in production systems, handling low-latency network packet delivery. # Requirements * C++17 * Tested on Linux (clang, gcc) x64, OSX arm64 (clang) and Windows x64 (vs2022) # Usage + +## mpsc_queue + ``` -#include "waitfreequeue/WaitFreeMPSCQueue.h" +#include "waitfreequeue/mpsc_queue.h" // T is the type of the elements in the queue. // S is the maximum number of elements in the queue. S must be a power of 2. -WaitFreeMPSCQueue queue; +waitfree::mpsc_queue queue; // Push an item to the queue. @@ -37,13 +45,13 @@ struct ElementType }; ElementType element; -WaitFreeMPSCQueue queue; +waitfree::mpsc_queue queue; queue.push(1, "some string"); // Pop an item from the queue. -// Returns false if the queue is empty, otherwise true. item is only valid if the function returns true. -// Not thread safe with regards to other pop operations, thread safe with regards to push operations. +// Returns false if the queue is empty, otherwise true. The item output parameter is only valid if the function returns +// true. Not thread safe with regards to other pop operations, thread safe with regards to push operations. // Element values will be moved if possible, otherwise copied. If elements are not trivially destructible, the element // destructor will be called after copy. @@ -57,13 +65,67 @@ if (queue.pop(element)) { // Checks if the queue is empty. // Returns true if the queue is empty, otherwise false. -// Not thread safe with regards to pop operations, thread safe with regards to push operations. Regarding -// thread safety empty() is considered a pop operation. +// Not thread safe with regards to pop operations, thread safe with regards to push operations. if (queue.empty()) { // queue is empty } ``` +## spsc_queue + +``` +#include "waitfreequeue/spsc_queue.h" + + +// T is the type of the elements in the queue. +// S is the maximum number of elements in the queue. S must be a power of 2. +waitfree::spsc_queue queue; + + +// Push an item to the queue. +// Will assert if the queue is full if asserts are enabled, +// otherwise the behaviour is undefined. The queue should be dimensioned so that this never happens. +// Not thread safe with regards to other push operations. Thread safe with regards to pop operations. +T element; +queue.push(element); + + +// Elements can be constructed in place as well +struct ElementType +{ + ElementType(int a, const std::string& b) + : a_(a), b_(b) + {} + + int a_; + std::string b_; +}; + +ElementType element; +waitfree::spsc_queue queue; +queue.push(1, "some string"); + + +// Pop an item from the queue. +// Returns false if the queue is empty, otherwise true. The item output parameter is only valid if the function returns +// true. Not thread safe with regards to other pop operations. Thread safe with regards to push operations. +// Element values will be moved if possible, otherwise copied. If elements are not trivially destructible, the element +// destructor will be called after copy. + +T element; +if (queue.pop(element)) { + // element is valid +} else { + // queue is empty +} + + +// Get the current size of the queue. +// Thread safe with regards to push and pop operations. + +const size_t size = queue.size(); +``` + # License MIT License diff --git a/WaitFreeMPSCQueue.h b/mpmc_queue.h similarity index 69% rename from WaitFreeMPSCQueue.h rename to mpmc_queue.h index 15add18..71b8b87 100644 --- a/WaitFreeMPSCQueue.h +++ b/mpmc_queue.h @@ -38,43 +38,46 @@ SOFTWARE. #include #include +namespace waitfree +{ + /** - * Single header, wait-free, multiple producer, single consumer queue. + * Wait-free, multiple producer, single consumer queue. * * T is the type of the elements in the queue. * S is the maximum number of elements in the queue. S must be a power of 2. */ template -class WaitFreeMPSCQueue +class mpsc_queue { public: - WaitFreeMPSCQueue() + mpsc_queue() : head_(0), tail_(0) { - static_assert(isPowerOfTwo(S)); + static_assert(is_power_of_two(S)); constexpr auto alignment = std::max(alignof(T), sizeof(void*)); - constexpr auto adjustedSize = roundUpToMultipleOf(sizeof(element) * S, alignment); + constexpr auto adjusted_size = round_up_to_multiple_of(sizeof(element) * S, alignment); #ifdef _WIN32 - auto allocResult = _aligned_malloc(adjustedSize, alignment); + auto alloc_result = _aligned_malloc(adjusted_size, alignment); #else - auto allocResult = aligned_alloc(alignment, adjustedSize); + auto alloc_result = aligned_alloc(alignment, adjusted_size); #endif - if (!allocResult) + if (!alloc_result) { throw std::bad_alloc(); } - memset(allocResult, 0, adjustedSize); - elements_ = reinterpret_cast(allocResult); + memset(alloc_result, 0, adjusted_size); + elements_ = reinterpret_cast(alloc_result); } - ~WaitFreeMPSCQueue() + ~mpsc_queue() { if constexpr (!std::is_trivially_destructible_v) { for (size_t i = 0; i < S; ++i) { - if (elements_[i].isUsed_.load(std::memory_order_seq_cst) == 0) + if (elements_[i].is_used_.load(std::memory_order_seq_cst) == 0) { continue; } @@ -100,10 +103,10 @@ class WaitFreeMPSCQueue template void push(U&&... item) noexcept { - const auto tail = tail_.fetch_add(1, std::memory_order_relaxed) & modValue_; + const auto tail = tail_.fetch_add(1, std::memory_order_relaxed) & mod_value_; new (&elements_[tail].value_) T(std::forward(item)...); - assert(elements_[tail].isUsed_.load(std::memory_order_acquire) == 0); - elements_[tail].isUsed_.store(1, std::memory_order_release); + assert(elements_[tail].is_used_.load(std::memory_order_acquire) == 0); + elements_[tail].is_used_.store(1, std::memory_order_release); } /** @@ -116,8 +119,8 @@ class WaitFreeMPSCQueue */ bool pop(T& item) noexcept { - const auto head = head_.fetch_add(1, std::memory_order_relaxed) & modValue_; - if (elements_[head].isUsed_.load(std::memory_order_acquire) == 0) + const auto head = head_.fetch_add(1, std::memory_order_relaxed) & mod_value_; + if (elements_[head].is_used_.load(std::memory_order_acquire) == 0) { head_.fetch_sub(1, std::memory_order_relaxed); return false; @@ -136,7 +139,7 @@ class WaitFreeMPSCQueue } } - elements_[head].isUsed_.store(0, std::memory_order_relaxed); + elements_[head].is_used_.store(0, std::memory_order_relaxed); return true; } @@ -151,32 +154,35 @@ class WaitFreeMPSCQueue */ [[nodiscard]] bool empty() const noexcept { - const auto head = head_.load(std::memory_order_relaxed) & modValue_; - return elements_[head].isUsed_.load(std::memory_order_acquire) == 0; + const auto head = head_.load(std::memory_order_relaxed) & mod_value_; + return elements_[head].is_used_.load(std::memory_order_acquire) == 0; } private: - static constexpr size_t cacheLineSize_ = 64; - static constexpr uint32_t modValue_ = S - 1; + static constexpr size_t cache_line_size_ = 64; + static constexpr uint32_t mod_value_ = S - 1; - struct element { - alignas(cacheLineSize_) T value_; - std::atomic isUsed_; + struct element + { + alignas(cache_line_size_) T value_; + std::atomic is_used_; }; - alignas(cacheLineSize_) element* elements_; - alignas(cacheLineSize_) std::atomic head_; - alignas(cacheLineSize_) std::atomic tail_; + alignas(cache_line_size_) element* elements_; + alignas(cache_line_size_) std::atomic head_; + alignas(cache_line_size_) std::atomic tail_; - static constexpr bool isPowerOfTwo(const size_t size) + static constexpr bool is_power_of_two(const size_t size) { return (size & (size - 1)) == 0; } - static constexpr size_t roundUpToMultipleOf(const size_t size, const size_t multiple) + static constexpr size_t round_up_to_multiple_of(const size_t size, const size_t multiple) { const auto remaining = size % multiple; - const auto adjustedSize = remaining == 0 ? size : size + (multiple - remaining); - return adjustedSize; + const auto adjusted_size = remaining == 0 ? size : size + (multiple - remaining); + return adjusted_size; } }; + +}// namespace waitfree diff --git a/WaitFreeSPSCQueue.h b/spsc_queue.h similarity index 78% rename from WaitFreeSPSCQueue.h rename to spsc_queue.h index a06801c..c520717 100644 --- a/WaitFreeSPSCQueue.h +++ b/spsc_queue.h @@ -38,6 +38,9 @@ SOFTWARE. #include #include +namespace waitfree +{ + /** * Single header, wait-free, single producer, single consumer queue with possibility * to query queue size. @@ -46,38 +49,38 @@ SOFTWARE. * S is the maximum number of elements in the queue. S must be a power of 2. */ template -class WaitFreeSPSCQueue +class spsc_queue { public: - WaitFreeSPSCQueue() + spsc_queue() : size_(0), head_(0), tail_(0) { - static_assert(isPowerOfTwo(S)); + static_assert(is_power_of_two(S)); constexpr auto alignment = std::max(alignof(T), sizeof(void*)); - constexpr auto adjustedSize = roundUpToMultipleOf(sizeof(T) * S, alignment); + constexpr auto adjusted_size = round_up_to_multiple_of(sizeof(T) * S, alignment); #ifdef _WIN32 - auto allocResult = _aligned_malloc(adjustedSize, alignment); + auto alloc_result = _aligned_malloc(adjusted_size, alignment); #else - auto allocResult = aligned_alloc(alignment, adjustedSize); + auto alloc_result = aligned_alloc(alignment, adjusted_size); #endif - if (!allocResult) + if (!alloc_result) { throw std::bad_alloc(); } - memset(allocResult, 0, adjustedSize); - elements_ = reinterpret_cast(allocResult); + memset(alloc_result, 0, adjusted_size); + elements_ = reinterpret_cast(alloc_result); } - ~WaitFreeSPSCQueue() + ~spsc_queue() { if constexpr (!std::is_trivially_destructible_v) { const auto size = size_.load(); for (size_t i = 0; i < size; ++i) { - const auto head = (head_ + i) & modValue_; + const auto head = (head_ + i) & mod_value_; (&elements_[head])->~T(); } } @@ -97,7 +100,7 @@ class WaitFreeSPSCQueue void push(U&&... item) noexcept { const auto tail = tail_; - tail_ = (tail_ + 1) & modValue_; + tail_ = (tail_ + 1) & mod_value_; new (&elements_[tail]) T(std::forward(item)...); [[maybe_unused]] const auto oldValue = size_.fetch_add(1, std::memory_order_acq_rel); assert(oldValue < S); @@ -119,7 +122,7 @@ class WaitFreeSPSCQueue } const auto head = head_; - head_ = (head_ + 1) & modValue_; + head_ = (head_ + 1) & mod_value_; if constexpr (std::is_move_assignable_v) { @@ -149,23 +152,25 @@ class WaitFreeSPSCQueue } private: - static constexpr size_t cacheLineSize_ = 64; - static constexpr uint32_t modValue_ = S - 1; + static constexpr size_t cacheLine_size_ = 64; + static constexpr uint32_t mod_value_ = S - 1; T* elements_; std::atomic size_; - alignas(cacheLineSize_) size_t head_; - alignas(cacheLineSize_) size_t tail_; + alignas(cacheLine_size_) size_t head_; + alignas(cacheLine_size_) size_t tail_; - static constexpr bool isPowerOfTwo(const size_t size) + static constexpr bool is_power_of_two(const size_t size) { return (size & (size - 1)) == 0; } - static constexpr size_t roundUpToMultipleOf(const size_t size, const size_t multiple) + static constexpr size_t round_up_to_multiple_of(const size_t size, const size_t multiple) { const auto remaining = size % multiple; - const auto adjustedSize = remaining == 0 ? size : size + (multiple - remaining); - return adjustedSize; + const auto adjusted_size = remaining == 0 ? size : size + (multiple - remaining); + return adjusted_size; } }; + +}// namespace waitfree diff --git a/test/TestWaitFreeMPSCQueue.cpp b/test/TestWaitFreeMPSCQueue.cpp deleted file mode 100644 index f5389d7..0000000 --- a/test/TestWaitFreeMPSCQueue.cpp +++ /dev/null @@ -1,418 +0,0 @@ -#include "WaitFreeMPSCQueue.h" -#include "test/TestHelpers.h" -#include "gtest/gtest.h" -#include -#include -#include -#include -#include - -namespace -{ - -constexpr size_t numElements = 65536; -constexpr size_t numIterations = 4; - -constexpr uint64_t makeValue(const uint64_t threadId, const uint64_t iteration, const uint64_t elementId) -{ - return (threadId << 32) | (iteration << 16) | elementId; -} - -}// namespace - -TEST(TestWaitFreeQueue, isEmpty) -{ - const size_t totalElements = numElements * 2; - auto queue = std::make_unique>(); - - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(0, 0, i); - queue->push(value); - } - EXPECT_FALSE(queue->empty()); - - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - EXPECT_TRUE(queue->empty()); - - { - const auto value = makeValue(0, 0, 0); - queue->push(value); - } - EXPECT_FALSE(queue->empty()); - - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - EXPECT_TRUE(queue->empty()); -} - -TEST(TestWaitFreeQueue, multiThreadPushPopCorrectness) -{ - const size_t totalElements = numElements * numIterations * 4; - auto queue = std::make_unique>(); - SyncBarrier<3> syncPoint; - - size_t count = 0; - std::unordered_set pushValues; - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(0, iteration, i); - queue->push(value); - pushValues.insert(value); - ++count; - } - } - - size_t count0 = 0; - std::unordered_set pushValues1; - std::unordered_set popValues1; - auto thread0 = std::make_unique([&queue, &syncPoint, &count0, &pushValues1, &popValues1]() { - syncPoint.arrive(0); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(1, iteration, i); - queue->push(value); - pushValues1.insert(value); - ++count0; - - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - if (popResult) - { - --count0; - popValues1.insert(result); - } - } - } - }); - - size_t count1 = 0; - std::unordered_set pushValues2; - auto thread1 = std::make_unique([&queue, &syncPoint, &count1, &pushValues2]() { - syncPoint.arrive(1); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(2, iteration, i); - queue->push(value); - pushValues2.insert(value); - ++count1; - } - } - }); - - size_t count2 = 0; - std::unordered_set pushValues3; - auto thread_2 = std::make_unique([&queue, &syncPoint, &count2, &pushValues3]() { - syncPoint.arrive(2); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(3, iteration, i); - queue->push(value); - pushValues3.insert(value); - ++count2; - } - } - }); - - syncPoint.run(); - thread0->join(); - thread1->join(); - thread_2->join(); - - count += count0 + count1 + count2; - std::unordered_set popValues; - for (size_t iteration = 0; iteration < numIterations * 3; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - if (popResult) - { - --count; - popValues.insert(result); - } - } - } - - pushValues.insert(pushValues1.begin(), pushValues1.end()); - pushValues.insert(pushValues2.begin(), pushValues2.end()); - pushValues.insert(pushValues3.begin(), pushValues3.end()); - popValues.insert(popValues1.begin(), popValues1.end()); - - EXPECT_EQ(0, count); - EXPECT_EQ(totalElements, pushValues.size()); - EXPECT_EQ(totalElements, popValues.size()); - EXPECT_TRUE(queue->empty()); -} - -TEST(TestWaitFreeQueue, multiThreadPushPopCorrectnessPopCanFail) -{ - const size_t totalElements = numElements * numIterations; - auto queue = std::make_unique>(); - - std::unordered_set popValues; - auto thread0 = std::make_unique([&queue, &popValues]() { - size_t popCount = 0; - while (popCount != numElements * numIterations) - { - uint64_t result; - if (queue->pop(result)) - { - popValues.insert(result); - ++popCount; - } - } - }); - - std::unordered_set pushValues; - auto thread1 = std::make_unique([&queue, &pushValues]() { - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(1, iteration, i); - queue->push(value); - pushValues.insert(value); - std::this_thread::yield(); - } - } - }); - - thread0->join(); - thread1->join(); - - for (const auto& value : pushValues) - { - EXPECT_NE(popValues.find(value), popValues.end()); - } - - EXPECT_EQ(totalElements, pushValues.size()); - EXPECT_EQ(totalElements, popValues.size()); -} - -TEST(TestWaitFreeQueue, multiThreadPushPerformance) -{ - auto queue = std::make_unique>(); - SyncBarrier<2> syncPoint; - - auto popThread0 = std::make_unique([&queue, &syncPoint]() { - syncPoint.arrive(0); - ScopedStatsAverage stats("TestWaitFreeQueue::multiThreadPushPerformance 0"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - stats.push(timer.getMs()); - } - }); - - auto popThread1 = std::make_unique([&queue, &syncPoint]() { - syncPoint.arrive(1); - ScopedStatsAverage stats("TestWaitFreeQueue::multiThreadPushPerformance 1"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - stats.push(timer.getMs()); - } - }); - - syncPoint.run(); - popThread0->join(); - popThread1->join(); -} - -TEST(TestWaitFreeQueue, multiThreadPushPopPerformance) -{ - auto queue = std::make_unique>(); - SyncBarrier<2> syncPoint; - - auto popThread0 = std::make_unique([&queue, &syncPoint]() { - syncPoint.arrive(0); - ScopedStatsAverage stats("TestWaitFreeQueue::multiThreadPushPopPerformance push pop"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - queue->push(i); - uint32_t result; - queue->pop(result); - } - stats.push(timer.getMs()); - } - }); - - auto popThread1 = std::make_unique([&queue, &syncPoint]() { - syncPoint.arrive(1); - ScopedStatsAverage stats("TestWaitFreeQueue::multiThreadPushPopPerformance push"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - stats.push(timer.getMs()); - } - }); - - syncPoint.run(); - popThread0->join(); - popThread1->join(); -} - -TEST(TestWaitFreeQueue, popPerformance) -{ - auto queue = std::make_unique>(); - - ScopedStatsAverage stats("TestWaitFreeQueue::popPerformance"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint32_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - uint32_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - stats.push(timer.getMs()); - } - } -} - -TEST(TestWaitFreeQueue, popNonMovableWithNonTrivialDestructor) -{ - uint32_t count = 0; - - struct Element { - Element() = delete; - - explicit Element(const uint32_t a, uint32_t* count) - : a_(a), - count_(count) - { - (*count_)++; - } - - Element(const Element& o) - : a_(o.a_), - count_(o.count_) - { - (*count_)++; - } - Element(Element&& o) noexcept = delete; - - ~Element() - { - a_ = 0; - (*count_)--; - } - - Element& operator=(Element&& o) = delete; - Element& operator=(const Element& o) - { - if (this == &o) - { - return *this; - } - - a_ = o.a_; - count_ = o.count_; - return *this; - } - - uint32_t a_; - uint32_t* count_; - }; - auto queue = std::make_unique>(); - - queue->push(1, &count); - queue->push(2, &count); - - { - Element result(0, &count); - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(1, result.a_); - } - EXPECT_EQ(1, count); - { - Element result(0, &count); - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(2, result.a_); - } - EXPECT_EQ(0, count); -} - -TEST(TestWaitFreeQueue, popNonMovableWithTrivialDestructor) -{ - struct Element { - Element() - : a_(0) - { - } - explicit Element(const uint32_t a) - : a_(a) - { - } - - Element(const Element& o) = default; - Element(Element&& o) noexcept = delete; - - ~Element() = default; - - Element& operator=(Element&& o) = delete; - Element& operator=(const Element& o) = default; - uint32_t a_; - }; - auto queue = std::make_unique>(); - - queue->push(1); - queue->push(2); - - { - Element result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(1, result.a_); - } - { - Element result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(2, result.a_); - } -} diff --git a/test/TestWaitFreeSPSCQueue.cpp b/test/TestWaitFreeSPSCQueue.cpp deleted file mode 100644 index 8899e1f..0000000 --- a/test/TestWaitFreeSPSCQueue.cpp +++ /dev/null @@ -1,373 +0,0 @@ -#include "WaitFreeSPSCQueue.h" -#include "test/TestHelpers.h" -#include "gtest/gtest.h" -#include -#include -#include -#include -#include - -namespace -{ - -constexpr size_t numElements = 65536; -constexpr size_t numIterations = 4; - -constexpr uint64_t makeValue(const uint64_t threadId, const uint64_t iteration, const uint64_t elementId) -{ - return (threadId << 32) | (iteration << 16) | elementId; -} - -}// namespace - -TEST(TestSPSCQueue, size) -{ - const size_t totalElements = numElements * 2; - auto queue = std::make_unique>(); - - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(0, 0, i); - queue->push(value); - } - EXPECT_NE(0, queue->size()); - - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - EXPECT_EQ(0, queue->size()); - - { - const auto value = makeValue(0, 0, 0); - queue->push(value); - } - EXPECT_NE(0, queue->size()); - - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - EXPECT_EQ(0, queue->size()); -} - -TEST(TestSPSCQueue, multiThreadPushPopCorrectness) -{ - const size_t totalElements = numElements * numIterations * 2; - auto queue = std::make_unique>(); - SyncBarrier<2> syncPoint; - - size_t count = 0; - std::unordered_set pushValues; - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(0, iteration, i); - queue->push(value); - pushValues.insert(value); - ++count; - } - } - - size_t count0 = 0; - std::unordered_set popValues1; - auto thread0 = std::make_unique([&]() { - syncPoint.arrive(0); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - if (popResult) - { - --count0; - popValues1.insert(result); - } - } - } - }); - - size_t count1 = 0; - std::unordered_set pushValues2; - auto thread1 = std::make_unique([&]() { - syncPoint.arrive(1); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(2, iteration, i); - queue->push(value); - pushValues2.insert(value); - ++count1; - } - } - }); - - syncPoint.run(); - thread0->join(); - thread1->join(); - - count += count0 + count1; - std::unordered_set popValues; - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - if (popResult) - { - --count; - popValues.insert(result); - } - } - } - - pushValues.insert(pushValues2.begin(), pushValues2.end()); - popValues.insert(popValues1.begin(), popValues1.end()); - - EXPECT_EQ(0, count); - EXPECT_EQ(totalElements, pushValues.size()); - EXPECT_EQ(totalElements, popValues.size()); - EXPECT_EQ(0, queue->size()); -} - -TEST(TestSPSCQueue, multiThreadPushPopCorrectnessPopCanFail) -{ - const size_t totalElements = numElements * numIterations; - auto queue = std::make_unique>(); - - std::unordered_set popValues; - auto thread0 = std::make_unique([&queue, &popValues]() { - size_t popCount = 0; - while (popCount != numElements * numIterations) - { - uint64_t result; - if (queue->pop(result)) - { - popValues.insert(result); - ++popCount; - } - } - }); - - std::unordered_set pushValues; - auto thread1 = std::make_unique([&queue, &pushValues]() { - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - const auto value = makeValue(1, iteration, i); - queue->push(value); - pushValues.insert(value); - std::this_thread::yield(); - } - } - }); - - thread0->join(); - thread1->join(); - - for (const auto& value : pushValues) - { - EXPECT_NE(popValues.find(value), popValues.end()); - } - - EXPECT_EQ(totalElements, pushValues.size()); - EXPECT_EQ(totalElements, popValues.size()); -} - -TEST(TestSPSCQueue, pushPerformance) -{ - auto queue = std::make_unique>(); - - { - ScopedStatsAverage stats("TestSPSCQueue::pushPerformance"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint64_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - stats.push(timer.getMs()); - } - } -} - -TEST(TestSPSCQueue, multiThreadPushPopPerformance) -{ - auto queue = std::make_unique>(); - SyncBarrier<2> syncPoint; - - auto thread0 = std::make_unique([&]() { - syncPoint.arrive(0); - ScopedStatsAverage stats("TestSPSCQueue::multiThreadPushPopPerformance pop"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint64_t i = 0; i < numElements; ++i) - { - uint64_t result; - queue->pop(result); - } - stats.push(timer.getMs()); - } - }); - - auto thread1 = std::make_unique([&]() { - syncPoint.arrive(1); - ScopedStatsAverage stats("TestSPSCQueue::multiThreadPushPopPerformance push"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - ScopedTimer timer; - for (uint64_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - stats.push(timer.getMs()); - } - }); - - syncPoint.run(); - thread0->join(); - thread1->join(); -} - - -TEST(TestSPSCQueue, popPerformance) -{ - auto queue = std::make_unique>(); - - ScopedStatsAverage stats("TestSPSCQueue::popPerformance"); - for (size_t iteration = 0; iteration < numIterations; ++iteration) - { - for (uint64_t i = 0; i < numElements; ++i) - { - queue->push(i); - } - - { - ScopedTimer timer; - for (uint32_t i = 0; i < numElements; ++i) - { - uint64_t result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - } - stats.push(timer.getMs()); - } - } -} - -TEST(TestSPSCQueue, popNonMovableWithNonTrivialDestructor) -{ - uint32_t count = 0; - - struct Element { - Element() = delete; - - explicit Element(const uint32_t a, uint32_t* count) - : a_(a), - count_(count) - { - (*count_)++; - } - - Element(const Element& o) - : a_(o.a_), - count_(o.count_) - { - (*count_)++; - } - Element(Element&& o) noexcept = delete; - - ~Element() - { - a_ = 0; - (*count_)--; - } - - Element& operator=(Element&& o) = delete; - Element& operator=(const Element& o) - { - if (this == &o) - { - return *this; - } - - a_ = o.a_; - count_ = o.count_; - return *this; - } - - uint32_t a_; - uint32_t* count_; - }; - auto queue = std::make_unique>(); - - queue->push(1, &count); - queue->push(2, &count); - - { - Element result(0, &count); - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(1, result.a_); - } - EXPECT_EQ(1, count); - { - Element result(0, &count); - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(2, result.a_); - } - EXPECT_EQ(0, count); -} - -TEST(TestSPSCQueue, popNonMovableWithTrivialDestructor) -{ - struct Element { - Element() - : a_(0) - { - } - explicit Element(const uint32_t a) - : a_(a) - { - } - - Element(const Element& o) = default; - Element(Element&& o) noexcept = delete; - - ~Element() = default; - - Element& operator=(Element&& o) = delete; - Element& operator=(const Element& o) = default; - uint32_t a_; - }; - auto queue = std::make_unique>(); - - queue->push(1); - queue->push(2); - - { - Element result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(1, result.a_); - } - { - Element result; - const auto popResult = queue->pop(result); - EXPECT_TRUE(popResult); - EXPECT_EQ(2, result.a_); - } -} diff --git a/test/TestHelpers.h b/test/test_helpers.h similarity index 63% rename from test/TestHelpers.h rename to test/test_helpers.h index 8ed96fd..df72cf8 100644 --- a/test/TestHelpers.h +++ b/test/test_helpers.h @@ -8,15 +8,15 @@ #include #include -class ScopedTimer +class scoped_timer { public: - explicit ScopedTimer() + explicit scoped_timer() : start_(std::chrono::high_resolution_clock::now()) { } - [[nodiscard]] double getMs() const + [[nodiscard]] double get_ms() const { auto elapsed = std::chrono::high_resolution_clock::now() - start_; return std::chrono::duration(elapsed).count(); @@ -28,15 +28,15 @@ class ScopedTimer }; template -class ScopedStatsAverage +class scoped_stats_average { public: - explicit ScopedStatsAverage(std::string name) + explicit scoped_stats_average(std::string name) : name_(std::move(name)) { } - ~ScopedStatsAverage() + ~scoped_stats_average() { double sum = 0.0; for (const auto value : values_) @@ -60,23 +60,24 @@ class ScopedStatsAverage }; template -struct SyncBarrier { - std::atomic_flag flagsWait_[N] = {ATOMIC_FLAG_INIT}; - std::atomic_flag flagsContinue_[N] = {ATOMIC_FLAG_INIT}; +struct sync_barrier +{ + std::atomic_flag flags_wait_[N] = {ATOMIC_FLAG_INIT}; + std::atomic_flag flags_continue_[N] = {ATOMIC_FLAG_INIT}; - SyncBarrier() + sync_barrier() { for (size_t i = 0; i < N; ++i) { - flagsWait_[i].test_and_set(std::memory_order_acquire); - flagsContinue_[i].test_and_set(std::memory_order_acquire); + flags_wait_[i].test_and_set(std::memory_order_acquire); + flags_continue_[i].test_and_set(std::memory_order_acquire); } } void arrive(const size_t threadId) { - flagsWait_[threadId].clear(std::memory_order_release); - while (flagsContinue_[threadId].test_and_set(std::memory_order_acquire)) + flags_wait_[threadId].clear(std::memory_order_release); + while (flags_continue_[threadId].test_and_set(std::memory_order_acquire)) { } } @@ -85,14 +86,14 @@ struct SyncBarrier { { for (size_t i = 0; i < N; ++i) { - while (flagsWait_[i].test_and_set(std::memory_order_acquire)) + while (flags_wait_[i].test_and_set(std::memory_order_acquire)) { } } for (size_t i = 0; i < N; ++i) { - flagsContinue_[i].clear(std::memory_order_release); + flags_continue_[i].clear(std::memory_order_release); } } }; diff --git a/test/test_mpsc_queue.cpp b/test/test_mpsc_queue.cpp new file mode 100644 index 0000000..9de55a7 --- /dev/null +++ b/test/test_mpsc_queue.cpp @@ -0,0 +1,420 @@ +#include "mpmc_queue.h" +#include "test/test_helpers.h" +#include "gtest/gtest.h" +#include +#include +#include +#include +#include + +namespace +{ + +constexpr size_t num_elements = 65536; +constexpr size_t num_iterations = 4; + +constexpr uint64_t make_value(const uint64_t thread_id, const uint64_t iteration, const uint64_t element_id) +{ + return (thread_id << 32) | (iteration << 16) | element_id; +} + +}// namespace + +TEST(test_mpsc_queue, is_empty) +{ + const size_t total_elements = num_elements * 2; + auto queue = std::make_unique>(); + + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(0, 0, i); + queue->push(value); + } + EXPECT_FALSE(queue->empty()); + + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + EXPECT_TRUE(queue->empty()); + + { + const auto value = make_value(0, 0, 0); + queue->push(value); + } + EXPECT_FALSE(queue->empty()); + + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + EXPECT_TRUE(queue->empty()); +} + +TEST(test_mpsc_queue, multi_thread_push_pop_correctness) +{ + const size_t total_elements = num_elements * num_iterations * 4; + auto queue = std::make_unique>(); + sync_barrier<3> sync_point; + + size_t count = 0; + std::unordered_set push_values; + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(0, iteration, i); + queue->push(value); + push_values.insert(value); + ++count; + } + } + + size_t count_0 = 0; + std::unordered_set push_values_1; + std::unordered_set pop_values_1; + auto thread_0 = std::make_unique([&queue, &sync_point, &count_0, &push_values_1, &pop_values_1]() { + sync_point.arrive(0); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(1, iteration, i); + queue->push(value); + push_values_1.insert(value); + ++count_0; + + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + if (pop_result) + { + --count_0; + pop_values_1.insert(result); + } + } + } + }); + + size_t count_1 = 0; + std::unordered_set push_values_2; + auto thread_1 = std::make_unique([&queue, &sync_point, &count_1, &push_values_2]() { + sync_point.arrive(1); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(2, iteration, i); + queue->push(value); + push_values_2.insert(value); + ++count_1; + } + } + }); + + size_t count_2 = 0; + std::unordered_set push_values3; + auto thread_2 = std::make_unique([&queue, &sync_point, &count_2, &push_values3]() { + sync_point.arrive(2); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(3, iteration, i); + queue->push(value); + push_values3.insert(value); + ++count_2; + } + } + }); + + sync_point.run(); + thread_0->join(); + thread_1->join(); + thread_2->join(); + + count += count_0 + count_1 + count_2; + std::unordered_set pop_values; + for (size_t iteration = 0; iteration < num_iterations * 3; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + if (pop_result) + { + --count; + pop_values.insert(result); + } + } + } + + push_values.insert(push_values_1.begin(), push_values_1.end()); + push_values.insert(push_values_2.begin(), push_values_2.end()); + push_values.insert(push_values3.begin(), push_values3.end()); + pop_values.insert(pop_values_1.begin(), pop_values_1.end()); + + EXPECT_EQ(0, count); + EXPECT_EQ(total_elements, push_values.size()); + EXPECT_EQ(total_elements, pop_values.size()); + EXPECT_TRUE(queue->empty()); +} + +TEST(test_mpsc_queue, multi_thread_push_pop_correctness_pop_can_fail) +{ + const size_t total_elements = num_elements * num_iterations; + auto queue = std::make_unique>(); + + std::unordered_set pop_values; + auto thread_0 = std::make_unique([&queue, &pop_values]() { + size_t popCount = 0; + while (popCount != num_elements * num_iterations) + { + uint64_t result; + if (queue->pop(result)) + { + pop_values.insert(result); + ++popCount; + } + } + }); + + std::unordered_set push_values; + auto thread_1 = std::make_unique([&queue, &push_values]() { + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(1, iteration, i); + queue->push(value); + push_values.insert(value); + std::this_thread::yield(); + } + } + }); + + thread_0->join(); + thread_1->join(); + + for (const auto& value : push_values) + { + EXPECT_NE(pop_values.find(value), pop_values.end()); + } + + EXPECT_EQ(total_elements, push_values.size()); + EXPECT_EQ(total_elements, pop_values.size()); +} + +TEST(test_mpsc_queue, multi_thread_push_performance) +{ + auto queue = std::make_unique>(); + sync_barrier<2> sync_point; + + auto pop_thread_0 = std::make_unique([&queue, &sync_point]() { + sync_point.arrive(0); + scoped_stats_average stats("test_mpsc_queue::multiThreadPushPerformance 0"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + stats.push(timer.get_ms()); + } + }); + + auto pop_thread_1 = std::make_unique([&queue, &sync_point]() { + sync_point.arrive(1); + scoped_stats_average stats("test_mpsc_queue::multiThreadPushPerformance 1"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + stats.push(timer.get_ms()); + } + }); + + sync_point.run(); + pop_thread_0->join(); + pop_thread_1->join(); +} + +TEST(test_mpsc_queue, multi_thread_push_pop_performance) +{ + auto queue = std::make_unique>(); + sync_barrier<2> sync_point; + + auto pop_thread_0 = std::make_unique([&queue, &sync_point]() { + sync_point.arrive(0); + scoped_stats_average stats("test_mpsc_queue::multiThreadPushPopPerformance push pop"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + queue->push(i); + uint32_t result; + queue->pop(result); + } + stats.push(timer.get_ms()); + } + }); + + auto pop_thread_1 = std::make_unique([&queue, &sync_point]() { + sync_point.arrive(1); + scoped_stats_average stats("test_mpsc_queue::multiThreadPushPopPerformance push"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + stats.push(timer.get_ms()); + } + }); + + sync_point.run(); + pop_thread_0->join(); + pop_thread_1->join(); +} + +TEST(test_mpsc_queue, pop_performance) +{ + auto queue = std::make_unique>(); + + scoped_stats_average stats("test_mpsc_queue::popPerformance"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint32_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + uint32_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + stats.push(timer.get_ms()); + } + } +} + +TEST(test_mpsc_queue, pop_non_movable_with_non_trivial_destructor) +{ + uint32_t count = 0; + + struct element + { + element() = delete; + + explicit element(const uint32_t a, uint32_t* count) + : a_(a), + count_(count) + { + (*count_)++; + } + + element(const element& o) + : a_(o.a_), + count_(o.count_) + { + (*count_)++; + } + element(element&& o) noexcept = delete; + + ~element() + { + a_ = 0; + (*count_)--; + } + + element& operator=(element&& o) = delete; + element& operator=(const element& o) + { + if (this == &o) + { + return *this; + } + + a_ = o.a_; + count_ = o.count_; + return *this; + } + + uint32_t a_; + uint32_t* count_; + }; + auto queue = std::make_unique>(); + + queue->push(1, &count); + queue->push(2, &count); + + { + element result(0, &count); + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(1, result.a_); + } + EXPECT_EQ(1, count); + { + element result(0, &count); + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(2, result.a_); + } + EXPECT_EQ(0, count); +} + +TEST(test_mpsc_queue, pop_non_movable_with_trivial_destructor) +{ + struct element + { + element() + : a_(0) + { + } + explicit element(const uint32_t a) + : a_(a) + { + } + + element(const element& o) = default; + element(element&& o) noexcept = delete; + + ~element() = default; + + element& operator=(element&& o) = delete; + element& operator=(const element& o) = default; + uint32_t a_; + }; + auto queue = std::make_unique>(); + + queue->push(1); + queue->push(2); + + { + element result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(1, result.a_); + } + { + element result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(2, result.a_); + } +} diff --git a/test/test_spsc_queue.cpp b/test/test_spsc_queue.cpp new file mode 100644 index 0000000..95301a1 --- /dev/null +++ b/test/test_spsc_queue.cpp @@ -0,0 +1,374 @@ +#include "spsc_queue.h" +#include "test/test_helpers.h" +#include "gtest/gtest.h" +#include +#include +#include +#include +#include + +namespace +{ + +constexpr size_t num_elements = 65536; +constexpr size_t num_iterations = 4; + +constexpr uint64_t make_value(const uint64_t thread_id, const uint64_t iteration, const uint64_t element_id) +{ + return (thread_id << 32) | (iteration << 16) | element_id; +} + +}// namespace + +TEST(test_spsc_queue, size) +{ + const size_t total_elements = num_elements * 2; + auto queue = std::make_unique>(); + + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(0, 0, i); + queue->push(value); + } + EXPECT_NE(0, queue->size()); + + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + EXPECT_EQ(0, queue->size()); + + { + const auto value = make_value(0, 0, 0); + queue->push(value); + } + EXPECT_NE(0, queue->size()); + + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + EXPECT_EQ(0, queue->size()); +} + +TEST(test_spsc_queue, multi_thread_push_pop_correctness) +{ + const size_t total_elements = num_elements * num_iterations * 2; + auto queue = std::make_unique>(); + sync_barrier<2> sync_point; + + size_t count = 0; + std::unordered_set push_values; + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(0, iteration, i); + queue->push(value); + push_values.insert(value); + ++count; + } + } + + size_t count0 = 0; + std::unordered_set pop_values_1; + auto thread_0 = std::make_unique([&]() { + sync_point.arrive(0); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + if (pop_result) + { + --count0; + pop_values_1.insert(result); + } + } + } + }); + + size_t count1 = 0; + std::unordered_set pushValues2; + auto thread_1 = std::make_unique([&]() { + sync_point.arrive(1); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(2, iteration, i); + queue->push(value); + pushValues2.insert(value); + ++count1; + } + } + }); + + sync_point.run(); + thread_0->join(); + thread_1->join(); + + count += count0 + count1; + std::unordered_set pop_values; + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + if (pop_result) + { + --count; + pop_values.insert(result); + } + } + } + + push_values.insert(pushValues2.begin(), pushValues2.end()); + pop_values.insert(pop_values_1.begin(), pop_values_1.end()); + + EXPECT_EQ(0, count); + EXPECT_EQ(total_elements, push_values.size()); + EXPECT_EQ(total_elements, pop_values.size()); + EXPECT_EQ(0, queue->size()); +} + +TEST(test_spsc_queue, multi_thread_push_pop_correctness_pop_can_fail) +{ + const size_t total_elements = num_elements * num_iterations; + auto queue = std::make_unique>(); + + std::unordered_set pop_values; + auto thread_0 = std::make_unique([&queue, &pop_values]() { + size_t pop_count = 0; + while (pop_count != num_elements * num_iterations) + { + uint64_t result; + if (queue->pop(result)) + { + pop_values.insert(result); + ++pop_count; + } + } + }); + + std::unordered_set push_values; + auto thread_1 = std::make_unique([&queue, &push_values]() { + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + const auto value = make_value(1, iteration, i); + queue->push(value); + push_values.insert(value); + std::this_thread::yield(); + } + } + }); + + thread_0->join(); + thread_1->join(); + + for (const auto& value : push_values) + { + EXPECT_NE(pop_values.find(value), pop_values.end()); + } + + EXPECT_EQ(total_elements, push_values.size()); + EXPECT_EQ(total_elements, pop_values.size()); +} + +TEST(test_spsc_queue, push_performance) +{ + auto queue = std::make_unique>(); + + { + scoped_stats_average stats("test_spsc_queue::pushPerformance"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint64_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + stats.push(timer.get_ms()); + } + } +} + +TEST(test_spsc_queue, multi_thread_push_pop_performance) +{ + auto queue = std::make_unique>(); + sync_barrier<2> sync_point; + + auto thread_0 = std::make_unique([&]() { + sync_point.arrive(0); + scoped_stats_average stats("test_spsc_queue::multiThreadPushPopPerformance pop"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint64_t i = 0; i < num_elements; ++i) + { + uint64_t result; + queue->pop(result); + } + stats.push(timer.get_ms()); + } + }); + + auto thread_1 = std::make_unique([&]() { + sync_point.arrive(1); + scoped_stats_average stats("test_spsc_queue::multiThreadPushPopPerformance push"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + scoped_timer timer; + for (uint64_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + stats.push(timer.get_ms()); + } + }); + + sync_point.run(); + thread_0->join(); + thread_1->join(); +} + +TEST(test_spsc_queue, pop_performance) +{ + auto queue = std::make_unique>(); + + scoped_stats_average stats("test_spsc_queue::popPerformance"); + for (size_t iteration = 0; iteration < num_iterations; ++iteration) + { + for (uint64_t i = 0; i < num_elements; ++i) + { + queue->push(i); + } + + { + scoped_timer timer; + for (uint32_t i = 0; i < num_elements; ++i) + { + uint64_t result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + } + stats.push(timer.get_ms()); + } + } +} + +TEST(test_spsc_queue, pop_non_movable_with_non_trivial_destructor) +{ + uint32_t count = 0; + + struct element + { + element() = delete; + + explicit element(const uint32_t a, uint32_t* count) + : a_(a), + count_(count) + { + (*count_)++; + } + + element(const element& o) + : a_(o.a_), + count_(o.count_) + { + (*count_)++; + } + element(element&& o) noexcept = delete; + + ~element() + { + a_ = 0; + (*count_)--; + } + + element& operator=(element&& o) = delete; + element& operator=(const element& o) + { + if (this == &o) + { + return *this; + } + + a_ = o.a_; + count_ = o.count_; + return *this; + } + + uint32_t a_; + uint32_t* count_; + }; + auto queue = std::make_unique>(); + + queue->push(1, &count); + queue->push(2, &count); + + { + element result(0, &count); + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(1, result.a_); + } + EXPECT_EQ(1, count); + { + element result(0, &count); + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(2, result.a_); + } + EXPECT_EQ(0, count); +} + +TEST(test_spsc_queue, pop_non_movable_with_trivial_destructor) +{ + struct element + { + element() + : a_(0) + { + } + explicit element(const uint32_t a) + : a_(a) + { + } + + element(const element& o) = default; + element(element&& o) noexcept = delete; + + ~element() = default; + + element& operator=(element&& o) = delete; + element& operator=(const element& o) = default; + uint32_t a_; + }; + auto queue = std::make_unique>(); + + queue->push(1); + queue->push(2); + + { + element result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(1, result.a_); + } + { + element result; + const auto pop_result = queue->pop(result); + EXPECT_TRUE(pop_result); + EXPECT_EQ(2, result.a_); + } +}