Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename files and classes #8

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ BreakBeforeBraces: Custom
BraceWrapping:
AfterCaseLabel: false
AfterClass: true
AfterStruct: true
AfterControlStatement: Always
AfterEnum: true
AfterFunction: true
Expand Down
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ if (NOT WAITFREEQUEUE_DISABLE_TESTS)

add_executable(
test_${PROJECT_NAME}
WaitFreeMPSCQueue.h
test/TestWaitFreeMPSCQueue.cpp
test/TestHelpers.h
WaitFreeSPSCQueue.h
test/TestWaitFreeSPSCQueue.cpp
mpmc_queue.h
spsc_queue.h
test/test_mpsc_queue.cpp
test/test_helpers.h
test/test_spsc_queue.cpp
)

target_include_directories(
Expand All @@ -42,6 +42,6 @@ if (NOT WAITFREEQUEUE_DISABLE_TESTS)
endif ()

install(FILES
WaitFreeMPSCQueue.h
WaitFreeSPSCQueue.h
mpmc_queue.h
spsc_queue.h
DESTINATION include/waitfreequeue)
78 changes: 70 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
[![main](https://github.com/marcusspangenberg/waitfreequeue/actions/workflows/cmake-multi-platform.yml/badge.svg?branch=main)](https://github.com/marcusspangenberg/waitfreequeue/actions/workflows/cmake-multi-platform.yml)

# waitfreequeue
Single header, wait-free, multiple producer, single consumer queue for C++.

Single header, wait-free queues for C++.
* mpsc_queue.h - Multiple producer, single consumer queue
* spsc_queue.h - Single producer, single consumer queue with queryable size

Both queue types are currently being used in production systems, handling low-latency network packet delivery.

# Requirements
* C++17
* Tested on Linux (clang, gcc) x64, OSX arm64 (clang) and Windows x64 (vs2022)

# Usage

## mpsc_queue

```
#include "waitfreequeue/WaitFreeMPSCQueue.h"
#include "waitfreequeue/mpsc_queue.h"


// T is the type of the elements in the queue.
// S is the maximum number of elements in the queue. S must be a power of 2.
WaitFreeMPSCQueue<T, S> queue;
waitfree::mpsc_queue<T, S> queue;


// Push an item to the queue.
Expand All @@ -37,13 +45,13 @@ struct ElementType
};

ElementType element;
WaitFreeMPSCQueue<ElementType, 16> queue;
waitfree::mpsc_queue<ElementType, 16> queue;
queue.push(1, "some string");


// Pop an item from the queue.
// Returns false if the queue is empty, otherwise true. item is only valid if the function returns true.
// Not thread safe with regards to other pop operations, thread safe with regards to push operations.
// Returns false if the queue is empty, otherwise true. The item output parameter is only valid if the function returns
// true. Not thread safe with regards to other pop operations, thread safe with regards to push operations.
// Element values will be moved if possible, otherwise copied. If elements are not trivially destructible, the element
// destructor will be called after copy.

Expand All @@ -57,13 +65,67 @@ if (queue.pop(element)) {

// Checks if the queue is empty.
// Returns true if the queue is empty, otherwise false.
// Not thread safe with regards to pop operations, thread safe with regards to push operations. Regarding
// thread safety empty() is considered a pop operation.
// Not thread safe with regards to pop operations, thread safe with regards to push operations.

if (queue.empty()) {
// queue is empty
}
```

## spsc_queue

```
#include "waitfreequeue/spsc_queue.h"


// T is the type of the elements in the queue.
// S is the maximum number of elements in the queue. S must be a power of 2.
waitfree::spsc_queue<T, S> queue;


// Push an item to the queue.
// Will assert if the queue is full if asserts are enabled,
// otherwise the behaviour is undefined. The queue should be dimensioned so that this never happens.
// Not thread safe with regards to other push operations. Thread safe with regards to pop operations.
T element;
queue.push(element);


// Elements can be constructed in place as well
struct ElementType
{
ElementType(int a, const std::string& b)
: a_(a), b_(b)
{}

int a_;
std::string b_;
};

ElementType element;
waitfree::spsc_queue<ElementType, 16> queue;
queue.push(1, "some string");


// Pop an item from the queue.
// Returns false if the queue is empty, otherwise true. The item output parameter is only valid if the function returns
// true. Not thread safe with regards to other pop operations. Thread safe with regards to push operations.
// Element values will be moved if possible, otherwise copied. If elements are not trivially destructible, the element
// destructor will be called after copy.

T element;
if (queue.pop(element)) {
// element is valid
} else {
// queue is empty
}


// Get the current size of the queue.
// Thread safe with regards to push and pop operations.

const size_t size = queue.size();
```

# License
MIT License
70 changes: 38 additions & 32 deletions WaitFreeMPSCQueue.h → mpmc_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,46 @@ SOFTWARE.
#include <type_traits>
#include <utility>

namespace waitfree
{

/**
* Single header, wait-free, multiple producer, single consumer queue.
* Wait-free, multiple producer, single consumer queue.
*
* T is the type of the elements in the queue.
* S is the maximum number of elements in the queue. S must be a power of 2.
*/
template<typename T, size_t S>
class WaitFreeMPSCQueue
class mpsc_queue
{
public:
WaitFreeMPSCQueue()
mpsc_queue()
: head_(0),
tail_(0)
{
static_assert(isPowerOfTwo(S));
static_assert(is_power_of_two(S));
constexpr auto alignment = std::max(alignof(T), sizeof(void*));
constexpr auto adjustedSize = roundUpToMultipleOf(sizeof(element) * S, alignment);
constexpr auto adjusted_size = round_up_to_multiple_of(sizeof(element) * S, alignment);
#ifdef _WIN32
auto allocResult = _aligned_malloc(adjustedSize, alignment);
auto alloc_result = _aligned_malloc(adjusted_size, alignment);
#else
auto allocResult = aligned_alloc(alignment, adjustedSize);
auto alloc_result = aligned_alloc(alignment, adjusted_size);
#endif
if (!allocResult)
if (!alloc_result)
{
throw std::bad_alloc();
}
memset(allocResult, 0, adjustedSize);
elements_ = reinterpret_cast<element*>(allocResult);
memset(alloc_result, 0, adjusted_size);
elements_ = reinterpret_cast<element*>(alloc_result);
}

~WaitFreeMPSCQueue()
~mpsc_queue()
{
if constexpr (!std::is_trivially_destructible_v<T>)
{
for (size_t i = 0; i < S; ++i)
{
if (elements_[i].isUsed_.load(std::memory_order_seq_cst) == 0)
if (elements_[i].is_used_.load(std::memory_order_seq_cst) == 0)
{
continue;
}
Expand All @@ -100,10 +103,10 @@ class WaitFreeMPSCQueue
template<typename... U>
void push(U&&... item) noexcept
{
const auto tail = tail_.fetch_add(1, std::memory_order_relaxed) & modValue_;
const auto tail = tail_.fetch_add(1, std::memory_order_relaxed) & mod_value_;
new (&elements_[tail].value_) T(std::forward<U>(item)...);
assert(elements_[tail].isUsed_.load(std::memory_order_acquire) == 0);
elements_[tail].isUsed_.store(1, std::memory_order_release);
assert(elements_[tail].is_used_.load(std::memory_order_acquire) == 0);
elements_[tail].is_used_.store(1, std::memory_order_release);
}

/**
Expand All @@ -116,8 +119,8 @@ class WaitFreeMPSCQueue
*/
bool pop(T& item) noexcept
{
const auto head = head_.fetch_add(1, std::memory_order_relaxed) & modValue_;
if (elements_[head].isUsed_.load(std::memory_order_acquire) == 0)
const auto head = head_.fetch_add(1, std::memory_order_relaxed) & mod_value_;
if (elements_[head].is_used_.load(std::memory_order_acquire) == 0)
{
head_.fetch_sub(1, std::memory_order_relaxed);
return false;
Expand All @@ -136,7 +139,7 @@ class WaitFreeMPSCQueue
}
}

elements_[head].isUsed_.store(0, std::memory_order_relaxed);
elements_[head].is_used_.store(0, std::memory_order_relaxed);
return true;
}

Expand All @@ -151,32 +154,35 @@ class WaitFreeMPSCQueue
*/
[[nodiscard]] bool empty() const noexcept
{
const auto head = head_.load(std::memory_order_relaxed) & modValue_;
return elements_[head].isUsed_.load(std::memory_order_acquire) == 0;
const auto head = head_.load(std::memory_order_relaxed) & mod_value_;
return elements_[head].is_used_.load(std::memory_order_acquire) == 0;
}

private:
static constexpr size_t cacheLineSize_ = 64;
static constexpr uint32_t modValue_ = S - 1;
static constexpr size_t cache_line_size_ = 64;
static constexpr uint32_t mod_value_ = S - 1;

struct element {
alignas(cacheLineSize_) T value_;
std::atomic<uint_fast32_t> isUsed_;
struct element
{
alignas(cache_line_size_) T value_;
std::atomic<uint_fast32_t> is_used_;
};

alignas(cacheLineSize_) element* elements_;
alignas(cacheLineSize_) std::atomic<uint_fast32_t> head_;
alignas(cacheLineSize_) std::atomic<uint_fast32_t> tail_;
alignas(cache_line_size_) element* elements_;
alignas(cache_line_size_) std::atomic<uint_fast32_t> head_;
alignas(cache_line_size_) std::atomic<uint_fast32_t> tail_;

static constexpr bool isPowerOfTwo(const size_t size)
static constexpr bool is_power_of_two(const size_t size)
{
return (size & (size - 1)) == 0;
}

static constexpr size_t roundUpToMultipleOf(const size_t size, const size_t multiple)
static constexpr size_t round_up_to_multiple_of(const size_t size, const size_t multiple)
{
const auto remaining = size % multiple;
const auto adjustedSize = remaining == 0 ? size : size + (multiple - remaining);
return adjustedSize;
const auto adjusted_size = remaining == 0 ? size : size + (multiple - remaining);
return adjusted_size;
}
};

}// namespace waitfree
Loading
Loading