diff --git a/README.md b/README.md
index a95a08c0..6f683c85 100644
--- a/README.md
+++ b/README.md
@@ -101,14 +101,13 @@ if you prefer to use only lock-free data structures.
## Provided data structures
-| Name | Module in `Saturn`
(in `Saturn_lockfree`) | Description | Sources |
-| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
-| Treiber Stack | [`Stack`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Stack/index.html) (same) | A classic multi-producer multi-consumer stack, robust and flexible. Recommended starting point when needing a LIFO structure | |
-| Michael-Scott Queue | [`Queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Queue/index.html) (same) | A classic multi-producer multi-consumer queue, robust and flexible. Recommended starting point when needing a FIFO structure. | [Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf) |
-| Chase-Lev Work-Stealing Dequeue | [`Work_stealing_deque`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Work_stealing_deque/index.html) (same) | Single-producer, multi-consumer, dynamic-size, double-ended queue (deque). Ideal for throughput-focused scheduling using per-core work distribution. Note, `pop` and `steal` follow different ordering (respectively LIFO and FIFO) and have different linearisation contraints. | [Dynamic Circular Work-Stealing Deque](https://dl.acm.org/doi/10.1145/1073970.1073974) and [Correct and Efficient Work-Stealing for Weak Memory Models](https://dl.acm.org/doi/abs/10.1145/2442516.2442524)) |
-| SPSC Queue | [`Single_prod_single_`
`cons_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_prod_single_cons_queue/index.html) (same) | Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time. | |
-| MPMC Bounded Relaxed Queue | [`Relaxed_queue`](https://ocaml-multicore.github.io/saturn/saturn/Saturn/Relaxed_queue/index.html) ([same](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Relaxed_queue/index.html)) | Multi-producer, multi-consumer, fixed-size relaxed queue. Optimised for high number of threads. Not strictly FIFO. Note, it exposes two interfaces: a lockfree and a non-lockfree (albeit more practical) one. See the `mli` for details. | |
-| MPSC Queue | [`Single_consumer_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_consumer_queue/index.html) (same) | A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). | It is a single consumer version of the queue described in [Implementing Lock-Free Queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf). |
+| Name | Module in `Saturn`
(in `Saturn_lockfree`) | Description | Sources |
+| ------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| Treiber Stack | [`Stack`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Stack/index.html) (same) | A classic multi-producer multi-consumer stack, robust and flexible. Recommended starting point when needing a LIFO structure | |
+| Michael-Scott Queue | [`Queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Queue/index.html) (same) | A classic multi-producer multi-consumer queue, robust and flexible. Recommended starting point when needing a FIFO structure. | [Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf) |
+| Chase-Lev Work-Stealing Dequeue | [`Work_stealing_deque`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Work_stealing_deque/index.html) (same) | Single-producer, multi-consumer, dynamic-size, double-ended queue (deque). Ideal for throughput-focused scheduling using per-core work distribution. Note, `pop` and `steal` follow different ordering (respectively LIFO and FIFO) and have different linearisation constraints. | [Dynamic Circular Work-Stealing Deque](https://dl.acm.org/doi/10.1145/1073970.1073974) and [Correct and Efficient Work-Stealing for Weak Memory Models](https://dl.acm.org/doi/abs/10.1145/2442516.2442524) |
+| SPSC Queue | [`Single_prod_single_cons_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_prod_single_cons_queue/index.html) (same) | Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time. |
+| MPSC Queue | [`Single_consumer_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_consumer_queue/index.html) (same) | A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). | It is a single consumer version of the queue described in [Implementing Lock-Free Queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf). |
## Motivation
diff --git a/bench/bench_relaxed_queue.ml b/bench/bench_relaxed_queue.ml
deleted file mode 100644
index 9d8f1cfa..00000000
--- a/bench/bench_relaxed_queue.ml
+++ /dev/null
@@ -1,115 +0,0 @@
-open Multicore_bench
-module Queue = Saturn.Relaxed_queue
-module Spin = Queue.Spin
-module Not_lockfree = Queue.Not_lockfree
-module CAS_interface = Queue.Not_lockfree.CAS_interface
-
-let run_one ~budgetf ~n_adders ~n_takers ?(n_msgs = 50 * Util.iter_factor)
- ?(api = `Spin) () =
- let n_domains = n_adders + n_takers in
-
- let t = Queue.create ~size_exponent:10 () in
-
- let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
- let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in
-
- let init _ =
- assert (Not_lockfree.pop t == None);
- Atomic.set n_msgs_to_take n_msgs;
- Atomic.set n_msgs_to_add n_msgs
- in
- let work i () =
- if i < n_adders then
- let rec work () =
- let n = Util.alloc n_msgs_to_add in
- if n <> 0 then begin
- match api with
- | `Spin ->
- for i = 1 to n do
- Spin.push t i
- done;
- work ()
- | `Not_lockfree ->
- let rec loop n =
- if 0 < n then
- if Not_lockfree.push t i then loop (n - 1)
- else begin
- Domain.cpu_relax ();
- loop n
- end
- else work ()
- in
- loop n
- | `CAS_interface ->
- let rec loop n =
- if 0 < n then
- if CAS_interface.push t i then loop (n - 1)
- else begin
- Domain.cpu_relax ();
- loop n
- end
- else work ()
- in
- loop n
- end
- in
- work ()
- else
- let rec work () =
- let n = Util.alloc n_msgs_to_take in
- if n <> 0 then
- match api with
- | `Spin ->
- for _ = 1 to n do
- Spin.pop t |> ignore
- done;
- work ()
- | `Not_lockfree ->
- let rec loop n =
- if 0 < n then begin
- match Not_lockfree.pop t with
- | None ->
- Domain.cpu_relax ();
- loop n
- | Some _ -> loop (n - 1)
- end
- else work ()
- in
- loop n
- | `CAS_interface ->
- let rec loop n =
- if 0 < n then begin
- match CAS_interface.pop t with
- | None ->
- Domain.cpu_relax ();
- loop n
- | Some _ -> loop (n - 1)
- end
- else work ()
- in
- loop n
- in
- work ()
- in
-
- let config =
- let plural role n =
- Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
- in
- Printf.sprintf "%s, %s (%s)" (plural "adder" n_adders)
- (plural "taker" n_takers)
- (match api with
- | `Spin -> "spin"
- | `Not_lockfree -> "not lf"
- | `CAS_interface -> "cas")
- in
-
- Times.record ~budgetf ~n_domains ~init ~work ()
- |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
-
-let run_suite ~budgetf =
- Util.cross
- [ `Spin; `Not_lockfree; `CAS_interface ]
- (Util.cross [ 1; 2 ] [ 1; 2 ])
- |> List.concat_map @@ fun (api, (n_adders, n_takers)) ->
- run_one ~budgetf ~n_adders ~n_takers ~api ()
diff --git a/bench/bench_ws_deque.ml b/bench/bench_ws_deque.ml
index a6f5fce4..585225cf 100644
--- a/bench/bench_ws_deque.ml
+++ b/bench/bench_ws_deque.ml
@@ -1,5 +1,5 @@
open Multicore_bench
-module Ws_deque = Saturn_lockfree.Work_stealing_deque.M
+module Ws_deque = Saturn_lockfree.Work_stealing_deque
let run_as_scheduler ~budgetf ?(n_domains = 1) () =
let spawns =
diff --git a/bench/main.ml b/bench/main.ml
index be50b92b..6724fa12 100644
--- a/bench/main.ml
+++ b/bench/main.ml
@@ -1,6 +1,5 @@
let benchmarks =
[
- ("Saturn Relaxed_queue", Bench_relaxed_queue.run_suite);
("Saturn_lockfree Queue", Bench_queue.run_suite);
("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn_lockfree Size", Bench_size.run_suite);
diff --git a/src/mpmc_relaxed_queue.ml b/src/mpmc_relaxed_queue.ml
deleted file mode 100644
index 24ffc811..00000000
--- a/src/mpmc_relaxed_queue.ml
+++ /dev/null
@@ -1,152 +0,0 @@
-include Saturn_lockfree.Relaxed_queue
-
-module Spin = struct
- let push = push
- let pop = pop
-end
-
-(* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *)
-let ccas cell seen v =
- if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v
-
-module Not_lockfree = struct
- (* [spin_threshold] Number of times on spin on a slot before trying an exit strategy. *)
- let spin_threshold = 30
-
- (* [try_other_exit_every_n] There is two strategies that push/pop can take to fix state (
- to be able to return without completion). Generally, we want to try to do "rollback" more
- than "push forward", as the latter adds contention to the side that might already not
- be keeping up. *)
- let try_other_exit_every_n = 10
- let time_to_try_push_forward n = n mod try_other_exit_every_n == 0
-
- let push { array; tail; head; mask; _ } item =
- let tail_val = Atomic.fetch_and_add tail 1 in
- let index = tail_val land mask in
- let cell = Array.get array index in
-
- (* spin for a bit *)
- let i = ref 0 in
- while
- !i < spin_threshold && not (Atomic.compare_and_set cell None (Some item))
- do
- i := !i + 1
- done;
-
- (* define clean up function *)
- let rec take_or_rollback nth_attempt =
- if Atomic.compare_and_set cell None (Some item) then
- (* succedded to push *)
- true
- else if ccas tail (tail_val + 1) tail_val then (* rolled back tail *)
- false
- else if
- time_to_try_push_forward nth_attempt && ccas head tail_val (tail_val + 1)
- then (* pushed forward head *)
- false
- else begin
- Domain.cpu_relax ();
- (* retry *)
- take_or_rollback (nth_attempt + 1)
- end
- in
-
- (* if succeeded return true otherwise clean up *)
- if !i < spin_threshold then true else take_or_rollback 0
-
- let take_item cell =
- let value = Atomic.get cell in
- if Option.is_some value && Atomic.compare_and_set cell value None then value
- else None
-
- let pop queue =
- let ({ array; head; tail; mask; _ } : 'a t) = queue in
- let head_value = Atomic.get head in
- let tail_value = Atomic.get tail in
- if head_value - tail_value >= 0 then None
- else
- let old_head = Atomic.fetch_and_add head 1 in
- let cell = Array.get array (old_head land mask) in
-
- (* spin for a bit *)
- let i = ref 0 in
- let item = ref None in
- while !i < spin_threshold && not (Option.is_some !item) do
- item := take_item cell;
- i := !i + 1
- done;
-
- (* define clean up function *)
- let rec take_or_rollback nth_attempt =
- let value = Atomic.get cell in
- if Option.is_some value && Atomic.compare_and_set cell value None then
- (* dequeued an item, return it *)
- value
- else if ccas head (old_head + 1) old_head then
- (* rolled back head *)
- None
- else if
- time_to_try_push_forward nth_attempt
- && ccas tail old_head (old_head + 1)
- then (* pushed tail forward *)
- None
- else begin
- Domain.cpu_relax ();
- take_or_rollback (nth_attempt + 1)
- end
- in
-
- (* return if got item, clean up otherwise *)
- if Option.is_some !item then !item else take_or_rollback 0
-
- module CAS_interface = struct
- let rec push ({ array; tail; head; mask; _ } as t) item =
- let tail_val = Atomic.get tail in
- let head_val = Atomic.get head in
- let size = mask + 1 in
- if tail_val - head_val >= size then false
- else if ccas tail tail_val (tail_val + 1) then begin
- let index = tail_val land mask in
- let cell = Array.get array index in
- (*
- Given that code above checks for overlap, is this CAS needed?
-
- Yes. Even though a thread cannot explicitely enter overlap,
- it can still occur just because enqueuer may theoretically be
- unscheduled for unbounded amount of time between incrementing
- index and filling the slot.
-
- I doubt we'd observe that case in real-life (outside some
- extreme circumstances), but this optimization has to be left
- for the user to decide. After all, algorithm would not pass
- model-checking without it.
-
- Incidentally, it also makes this method interoperable with
- standard interface.
- *)
- while not (Atomic.compare_and_set cell None (Some item)) do
- Domain.cpu_relax ()
- done;
- true
- end
- else push t item
-
- let rec pop ({ array; tail; head; mask; _ } as t) =
- let tail_val = Atomic.get tail in
- let head_val = Atomic.get head in
- if head_val - tail_val >= 0 then None
- else if ccas head head_val (head_val + 1) then begin
- let index = head_val land mask in
- let cell = Array.get array index in
- let item = ref (Atomic.get cell) in
- while
- not (Option.is_some !item && Atomic.compare_and_set cell !item None)
- do
- Domain.cpu_relax ();
- item := Atomic.get cell
- done;
- !item
- end
- else pop t
- end
-end
diff --git a/src/mpmc_relaxed_queue.mli b/src/mpmc_relaxed_queue.mli
deleted file mode 100644
index b7267926..00000000
--- a/src/mpmc_relaxed_queue.mli
+++ /dev/null
@@ -1,68 +0,0 @@
-(**
- A multi-producer, multi-consumer, thread-safe, bounded relaxed-FIFO queue.
-
- It exposes two interfaces: [Spin] and [Not_lockfree]. [Spin] is lock-free
- formally, but the property is achieved in a fairly counterintuitive way -
- - by using the fact that lock-freedom does not impose any constraints on
- partial methods. In simple words, an invocation of function that cannot
- logically terminate (`push` on full queue, `pop` on empty queue), it is
- allowed to *busy-wait* until the precondition is meet.
-
- Above interface is impractical outside specialized applications. Thus,
- [Mpmc_relaxed_queue] also exposes [Not_lockfree] interface. [Not_lockfree]
- contains non-lockfree paths. While formally a locked algorithm, it will
- often be the more practical solution as it allows having an overflow
- queue, etc.
-*)
-
-type 'a t = private {
- array : 'a Option.t Atomic.t Array.t;
- head : int Atomic.t;
- tail : int Atomic.t;
- mask : int;
-}
-(** A queue of items of type ['a]. Implementation exposed for testing. *)
-
-val create : size_exponent:int -> unit -> 'a t
-(** [create ~size_exponent:int] creates an empty queue of size
- [2^size_exponent]. *)
-
-module Spin : sig
- (** [Spin] exposes a formally lock-free interface as per the [A lock-free relaxed
- concurrent queue for fast work distribution] paper. Functions here busy-wait if
- the action cannot be completed (i.e. [push] on full queue, [pop] on empty queue). *)
-
- val push : 'a t -> 'a -> unit
- (** [push t x] adds [x] to the tail of the queue. If the queue is full, [push]
- busy-waits until another thread removes an item. *)
-
- val pop : 'a t -> 'a
- (** [pop t] removes an item from the head of the queue. If the queue is empty,
- [pop] busy-waits until an item appear. *)
-end
-
-module Not_lockfree : sig
- (** [Non_lockfree] exposes an interface that contains non-lockfree paths, i.e. threads
- may need to cooperate to terminate. It is often more practical than [Spin], in
- particular when using a fair OS scheduler. *)
-
- val push : 'a t -> 'a -> bool
- (** [push t x] adds [x] to the tail of the queue.
- Returns [false] if [t] is currently full. *)
-
- val pop : 'a t -> 'a option
- (** [pop t] removes the head item from [t] and returns it.
- Returns [None] if [t] is currently empty. *)
-
- module CAS_interface : sig
- (** Alternative interface, which may perform better on architectures without
- FAD instructions (e.g. AArch).
-
- CAS_interface should not be the default choice. It may be a little faster
- on ARM, but it is going to be a few times slower than standard on x86.
- *)
-
- val push : 'a t -> 'a -> bool
- val pop : 'a t -> 'a option
- end
-end
diff --git a/src/saturn.ml b/src/saturn.ml
index f8464bae..8df10afd 100644
--- a/src/saturn.ml
+++ b/src/saturn.ml
@@ -38,7 +38,6 @@ module Single_prod_single_cons_queue_unsafe =
Saturn_lockfree.Single_prod_single_cons_queue_unsafe
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
-module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Htbl = Saturn_lockfree.Htbl
module Htbl_unsafe = Saturn_lockfree.Htbl_unsafe
diff --git a/src/saturn.mli b/src/saturn.mli
index 2906447c..c684525a 100644
--- a/src/saturn.mli
+++ b/src/saturn.mli
@@ -42,7 +42,6 @@ module Single_prod_single_cons_queue_unsafe =
Saturn_lockfree.Single_prod_single_cons_queue_unsafe
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
-module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Htbl = Saturn_lockfree.Htbl
module Htbl_unsafe = Saturn_lockfree.Htbl_unsafe
diff --git a/src_lockfree/mpmc_relaxed_queue.ml b/src_lockfree/mpmc_relaxed_queue.ml
deleted file mode 100644
index db2ca43e..00000000
--- a/src_lockfree/mpmc_relaxed_queue.ml
+++ /dev/null
@@ -1,134 +0,0 @@
-(*
- # General idea
-
- It is the easiest to explain the general idea on an array of infinite size.
- Let's start with that. Each element in such an array constitutes a single-use
- exchange slot. Enqueuer increments [tail] and treats prior value as index of
- its slot. Same for dequeuer and [head]. This effectively creates pairs
- (enqueuer, dequeuer) assigned to the same slot. Enqueuer leaves the value in
- the slot, dequer copies it out.
-
- Enqueuer never fails. It always gets a brand-new slot and places item in it.
- Dequeuer, on the other hand, may witness an empty slot. That's because [head]
- may jump behind [tail]. Remember, indices are implemented blindy. For now,
- assume dequeuer simply spins on the empty slot until an item appears.
-
- That's it. There's a few things flowing from this construction:
- * Slots are atomic. This is where paired enqueuer and dequeuer communicate.
- * [head] overshooting [tail] is a normal condition and that's good - we want
- to keep operations on [head] and [tail] independent.
-
- # Finite array
-
- Now, to make it work in real-world, simply treat finite array as circular,
- i.e. wrap around when reached the end. Slots are now re-used, so we need to be
- more careful.
-
- Firstly, if there's too many items, enqueuer may witness a full slot. Let's assume
- enqueuer simply spins on full slot until some dequeuer appears and takes the old
- value.
-
- Secondly, in the case of overlap, there can be more than 2 threads (1x enqueuer,
- 1x dequeuer) assigned to a single slot (imagine 10 enqueuers spinning on an 8-slot
- array). In fact, it could be any number. Thus, all operations on slot have to use
- CAS to ensure that no item is overwrriten on store and no item is dequeued by two
- threads at once.
-
- Above works okay in practise, and there is some relevant literature, e.g.
- (DOI: 10.1145/3437801.3441583) analyzed this particular design. There's also
- plenty older papers looking at similar approaches
- (e.g. DOI: 10.1145/2851141.2851168).
-
- Note, this design may violate FIFO (on overlap). The risk can be minimized by
- ensuring size of array >> number of threads but it's never zero.
- (github.com/rigtorp/MPMCQueue has a nice way of fixing this, we could add it).
-
- # Blocking (non-lockfree paths on full, empty)
-
- Up until now [push] and [pop] were allowed to block indefinitely on empty and full
- queue. Overall, what can be done in those states?
-
- 1. Busy wait until able to finish.
- 2. Rollback own index with CAS (unassign itself from slot).
- 3. Move forward other index with CAS (assign itself to the same slot as opposite
- action).
- 4. Mark slot as burned - dequeue only.
-
- Which one then?
-
- Let's optimize for stability, i.e. some reasonable latency that won't get much worse
- under heavy load. Busy wait is great because it does not cause any contention in the
- hotspots ([head], [tail]). Thus, start with busy wait (1). If queue is busy and
- moving fast, there is a fair chance that within, say, 30 spins, we'll manage to
- complete action without having to add contention elsewhere.
-
- Once N busy-loops happen and nothing changes, we probably want to return even if its
- costs. (2), (3) both allow that. (2) doesn't add contention to the other index like
- (3) does. Say, there's a lot more dequeuers than enqueuers, if all dequeurs did (3),
- they would add a fair amount of contention to the [tail] index and slow the
- already-outnumbered enqueuers further. So, (2) > (3) for that reason.
-
- However, with just (2), some dequeuers will struggle to return. If many dequeuers
- constatly try to pop an element and fail, they will form a chain.
-
- tl hd
- | |
- [.]-[A]-[B]-[C]-..-[X]
-
- For A to rollback, B has to rollback first. For B to rollback C has to rollback first.
-
- [A] is likely to experience a large latency spike. In such a case, it is easier for [A]
- to do (3) rather than hope all other active dequeuers will unblock it at some point.
- Thus, it's worthwile also trying to do (3) periodically.
-
- Thus, the current policy does (1) for a bit, then (1), (2) with periodic (3).
-
- What about burned slots (4)?
-
- It's present in the literature. Weakly I'm not a fan. If dequeuers are faster to remove
- items than enqueuers supply them, slots burned by dequeuers are going to make enqueuers
- do even more work.
-
- # Resizing
-
- The queue does not support resizing, but it can be simulated by wrapping it in a
- lockfree list.
-*)
-
-type 'a t = {
- array : 'a Option.t Atomic.t Array.t;
- head : int Atomic.t;
- tail : int Atomic.t;
- mask : int;
-}
-
-let create ~size_exponent () : 'a t =
- let size = 1 lsl size_exponent in
- let array = Array.init size (fun _ -> Atomic.make None) in
- let mask = size - 1 in
- let head = Atomic.make 0 in
- let tail = Atomic.make 0 in
- { array; head; tail; mask }
-
-(* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *)
-let ccas cell seen v =
- if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v
-
-let push { array; tail; mask; _ } item =
- let tail_val = Atomic.fetch_and_add tail 1 in
- let index = tail_val land mask in
- let cell = Array.get array index in
- while not (ccas cell None (Some item)) do
- Domain.cpu_relax ()
- done
-
-let pop { array; head; mask; _ } =
- let head_val = Atomic.fetch_and_add head 1 in
- let index = head_val land mask in
- let cell = Array.get array index in
- let item = ref (Atomic.get cell) in
- while Option.is_none !item || not (ccas cell !item None) do
- Domain.cpu_relax ();
- item := Atomic.get cell
- done;
- Option.get !item
diff --git a/src_lockfree/mpmc_relaxed_queue.mli b/src_lockfree/mpmc_relaxed_queue.mli
deleted file mode 100644
index 04049734..00000000
--- a/src_lockfree/mpmc_relaxed_queue.mli
+++ /dev/null
@@ -1,36 +0,0 @@
-(**
- A multi-producer, multi-consumer, thread-safe, bounded relaxed-FIFO queue.
-
- This interface is a subset of the one in `Saturn.Relaxed_queue`
- that exposes a formally lock-free interface as per the [A
- lock-free relaxed concurrent queue for fast work distribution]
- paper.
-
- [push] and [pop] are said to be `lock-free formally`, because the
- property is achieved in a fairly counterintuitive way - by using
- the fact that lock-freedom does not impose any constraints on
- partial methods. In simple words, an invocation of function that
- cannot logically terminate (`push` on full queue, `pop` on empty
- queue), it is allowed to *busy-wait* until the precondition is
- meet.
-*)
-
-type 'a t = private {
- array : 'a Option.t Atomic.t Array.t;
- head : int Atomic.t;
- tail : int Atomic.t;
- mask : int;
-}
-(** A queue of items of type ['a]. Implementation exposed for testing. *)
-
-val create : size_exponent:int -> unit -> 'a t
-(** [create ~size_exponent:int] creates an empty queue of size
- [2^size_exponent]. *)
-
-val push : 'a t -> 'a -> unit
-(** [push t x] adds [x] to the tail of the queue. If the queue is full, [push]
- busy-waits until another thread removes an item. *)
-
-val pop : 'a t -> 'a
-(** [pop t] removes an item from the head of the queue. If the queue is empty,
- [pop] busy-waits until an item appear. *)
diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml
index 3c1fd09c..eff05f9d 100644
--- a/src_lockfree/saturn_lockfree.ml
+++ b/src_lockfree/saturn_lockfree.ml
@@ -33,7 +33,6 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe
module Single_consumer_queue = Mpsc_queue
-module Relaxed_queue = Mpmc_relaxed_queue
module Size = Size
module Skiplist = Skiplist
module Htbl = Htbl
diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli
index 214a92e9..eac1789b 100644
--- a/src_lockfree/saturn_lockfree.mli
+++ b/src_lockfree/saturn_lockfree.mli
@@ -37,7 +37,6 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe
module Single_consumer_queue = Mpsc_queue
-module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist
module Size = Size
module Htbl = Htbl
diff --git a/src_lockfree/ws_deque.ml b/src_lockfree/ws_deque.ml
index 02bd6d6d..e09e9530 100644
--- a/src_lockfree/ws_deque.ml
+++ b/src_lockfree/ws_deque.ml
@@ -27,17 +27,6 @@
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
*)
-module type S = sig
- type 'a t
-
- val create : unit -> 'a t
- val push : 'a t -> 'a -> unit
- val pop : 'a t -> 'a
- val pop_opt : 'a t -> 'a option
- val steal : 'a t -> 'a
- val steal_opt : 'a t -> 'a option
-end
-
module CArray = struct
type 'a t = 'a array
@@ -85,105 +74,103 @@ module CArray = struct
dst
end
-module M : S = struct
- let min_size = 32
- let shrink_const = 3
-
- type 'a t = {
- top : int Atomic.t;
- bottom : int Atomic.t;
- tab : 'a ref CArray.t Atomic.t;
- mutable next_shrink : int;
+let min_size = 32
+let shrink_const = 3
+
+type 'a t = {
+ top : int Atomic.t;
+ bottom : int Atomic.t;
+ tab : 'a ref CArray.t Atomic.t;
+ mutable next_shrink : int;
+}
+
+let create () =
+ {
+ top = Atomic.make 1;
+ bottom = Atomic.make 1;
+ tab = Atomic.make (CArray.create min_size (Obj.magic ()));
+ next_shrink = 0;
}
- let create () =
- {
- top = Atomic.make 1;
- bottom = Atomic.make 1;
- tab = Atomic.make (CArray.create min_size (Obj.magic ()));
- next_shrink = 0;
- }
-
- let set_next_shrink q =
- let sz = CArray.size (Atomic.get q.tab) in
- if sz <= min_size then q.next_shrink <- 0
- else q.next_shrink <- sz / shrink_const
-
- let grow q t b =
- Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
- set_next_shrink q
-
- let size q =
- let b = Atomic.get q.bottom in
- let t = Atomic.get q.top in
- b - t
-
- let push q v =
- let v' = ref v in
- let b = Atomic.get q.bottom in
+let set_next_shrink q =
+ let sz = CArray.size (Atomic.get q.tab) in
+ if sz <= min_size then q.next_shrink <- 0
+ else q.next_shrink <- sz / shrink_const
+
+let grow q t b =
+ Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
+ set_next_shrink q
+
+let size q =
+ let b = Atomic.get q.bottom in
+ let t = Atomic.get q.top in
+ b - t
+
+let push q v =
+ let v' = ref v in
+ let b = Atomic.get q.bottom in
+ let t = Atomic.get q.top in
+ let a = Atomic.get q.tab in
+ let size = b - t in
+ let a =
+ if size = CArray.size a then (
+ grow q t b;
+ Atomic.get q.tab)
+ else a
+ in
+ CArray.put a b v';
+ Atomic.set q.bottom (b + 1)
+
+let release ptr =
+ let res = !ptr in
+ (* we know this ptr will never be dereferenced, but want to
+ break the reference to ensure that the contents of the
+ deque array get garbage collected *)
+ ptr := Obj.magic ();
+ res
+[@@inline]
+
+let pop q =
+ if size q = 0 then raise Exit
+ else
+ let b = Atomic.get q.bottom - 1 in
+ Atomic.set q.bottom b;
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
- let a =
- if size = CArray.size a then (
- grow q t b;
- Atomic.get q.tab)
- else a
- in
- CArray.put a b v';
- Atomic.set q.bottom (b + 1)
-
- let release ptr =
- let res = !ptr in
- (* we know this ptr will never be dereferenced, but want to
- break the reference to ensure that the contents of the
- deque array get garbage collected *)
- ptr := Obj.magic ();
- res
- [@@inline]
-
- let pop q =
- if size q = 0 then raise Exit
+ if size < 0 then (
+ (* empty queue *)
+ Atomic.set q.bottom (b + 1);
+ raise Exit)
else
- let b = Atomic.get q.bottom - 1 in
- Atomic.set q.bottom b;
- let t = Atomic.get q.top in
- let a = Atomic.get q.tab in
- let size = b - t in
- if size < 0 then (
- (* empty queue *)
- Atomic.set q.bottom (b + 1);
- raise Exit)
- else
- let out = CArray.get a b in
- if b = t then
- (* single last element *)
- if Atomic.compare_and_set q.top t (t + 1) then (
- Atomic.set q.bottom (b + 1);
- release out)
- else (
- Atomic.set q.bottom (b + 1);
- raise Exit)
- else (
- (* non-empty queue *)
- if q.next_shrink > size then (
- Atomic.set q.tab (CArray.shrink a t b);
- set_next_shrink q);
+ let out = CArray.get a b in
+ if b = t then
+ (* single last element *)
+ if Atomic.compare_and_set q.top t (t + 1) then (
+ Atomic.set q.bottom (b + 1);
release out)
+ else (
+ Atomic.set q.bottom (b + 1);
+ raise Exit)
+ else (
+ (* non-empty queue *)
+ if q.next_shrink > size then (
+ Atomic.set q.tab (CArray.shrink a t b);
+ set_next_shrink q);
+ release out)
+
+let pop_opt q = try Some (pop q) with Exit -> None
+
+let rec steal backoff q =
+ let t = Atomic.get q.top in
+ let b = Atomic.get q.bottom in
+ let size = b - t in
+ if size <= 0 then raise Exit
+ else
+ let a = Atomic.get q.tab in
+ let out = CArray.get a t in
+ if Atomic.compare_and_set q.top t (t + 1) then release out
+ else steal (Backoff.once backoff) q
- let pop_opt q = try Some (pop q) with Exit -> None
-
- let rec steal backoff q =
- let t = Atomic.get q.top in
- let b = Atomic.get q.bottom in
- let size = b - t in
- if size <= 0 then raise Exit
- else
- let a = Atomic.get q.tab in
- let out = CArray.get a t in
- if Atomic.compare_and_set q.top t (t + 1) then release out
- else steal (Backoff.once backoff) q
-
- let steal q = steal Backoff.default q
- let steal_opt q = try Some (steal q) with Exit -> None
-end
+let steal q = steal Backoff.default q
+let steal_opt q = try Some (steal q) with Exit -> None
diff --git a/src_lockfree/ws_deque.mli b/src_lockfree/ws_deque.mli
index 7f958584..08654e09 100644
--- a/src_lockfree/ws_deque.mli
+++ b/src_lockfree/ws_deque.mli
@@ -13,45 +13,41 @@
Thus, it is not the best choice when tail latency matters.
*)
-module type S = sig
- type 'a t
- (** Type of work-stealing queue *)
+type 'a t
+(** Type of work-stealing queue *)
- val create : unit -> 'a t
- (** [create ()] returns a new empty work-stealing queue. *)
+val create : unit -> 'a t
+(** [create ()] returns a new empty work-stealing queue. *)
- (** {1 Queue owner functions} *)
+(** {1 Queue owner functions} *)
- val push : 'a t -> 'a -> unit
- (** [push t v] adds [v] to the front of the queue [q].
+val push : 'a t -> 'a -> unit
+(** [push t v] adds [v] to the front of the queue [q].
It should only be invoked by the domain which owns the queue [q]. *)
- val pop : 'a t -> 'a
- (** [pop q] removes and returns the first element in queue
+val pop : 'a t -> 'a
+(** [pop q] removes and returns the first element in queue
[q].It should only be invoked by the domain which owns the queue
[q].
@raise Exit if the queue is empty.
*)
- val pop_opt : 'a t -> 'a option
- (** [pop_opt q] removes and returns the first element in queue [q], or
+val pop_opt : 'a t -> 'a option
+(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)
- (** {1 Stealers function} *)
+(** {1 Stealers function} *)
- val steal : 'a t -> 'a
- (** [steal q] removes and returns the last element from queue
+val steal : 'a t -> 'a
+(** [steal q] removes and returns the last element from queue
[q]. It should only be invoked by domain which doesn't own the
queue [q].
@raise Exit if the queue is empty.
*)
- val steal_opt : 'a t -> 'a option
- (** [steal_opt q] removes and returns the last element from queue
+val steal_opt : 'a t -> 'a option
+(** [steal_opt q] removes and returns the last element from queue
[q], or returns [None] if the queue is empty. It should only be
invoked by domain which doesn't own the queue [q]. *)
-end
-
-module M : S
diff --git a/test/mpmc_relaxed_queue/dune b/test/mpmc_relaxed_queue/dune
deleted file mode 100644
index 9b505137..00000000
--- a/test/mpmc_relaxed_queue/dune
+++ /dev/null
@@ -1,5 +0,0 @@
-(test
- (package saturn)
- (name test_mpmc_relaxed_queue)
- (libraries saturn unix alcotest domain_shims)
- (modules test_mpmc_relaxed_queue))
diff --git a/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml b/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml
deleted file mode 100644
index dd850a59..00000000
--- a/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml
+++ /dev/null
@@ -1,173 +0,0 @@
-module Relaxed_queue = Saturn.Relaxed_queue
-
-let smoke_test (push, pop) () =
- let queue = Relaxed_queue.create ~size_exponent:2 () in
- (* enqueue 4 *)
- for i = 1 to 4 do
- Alcotest.(check bool)
- "there should be space in the queue" (push queue i) true
- done;
- assert (not (push queue 0));
- let ({ tail; head; _ } : 'a Relaxed_queue.t) = queue in
- assert (Atomic.get tail = 4);
- assert (Atomic.get head = 0);
- (* dequeue 4 *)
- for i = 1 to 4 do
- Alcotest.(check (option int))
- "items should come out in FIFO order" (Some i) (pop queue)
- done;
- Alcotest.(check (option int)) "queue should be empty" None (pop queue)
-
-let two_threads_test (push, pop) () =
- let queue = Relaxed_queue.create ~size_exponent:2 () in
- let num_of_elements = 1_000_000 in
- (* start dequeuer *)
- let dequeuer =
- Domain.spawn (fun () ->
- let i = ref 0 in
- while !i < num_of_elements do
- match pop queue with
- | Some item ->
- Alcotest.(check int)
- "popped items should follow FIFO order" item !i;
- i := !i + 1
- | None -> Domain.cpu_relax ()
- done)
- in
- (* enqueue *)
- let i = ref 0 in
- while !i < num_of_elements do
- if push queue !i then i := !i + 1 else Domain.cpu_relax ()
- done;
- Domain.join dequeuer |> ignore;
- ()
-
-module Wait_for_others = struct
- type t = { currently : int Atomic.t; total_expected : int }
-
- let init ~total_expected = { currently = Atomic.make 0; total_expected }
-
- let wait { currently; total_expected } =
- Atomic.incr currently;
- while Atomic.get currently < total_expected do
- Domain.cpu_relax ()
- done
-end
-
-let taker wfo queue num_of_elements () =
- Wait_for_others.wait wfo;
- let i = ref 0 in
- while !i < num_of_elements do
- if Option.is_some (Relaxed_queue.Not_lockfree.pop queue) then i := !i + 1
- else Domain.cpu_relax ()
- done
-
-let pusher wfo queue num_of_elements () =
- Wait_for_others.wait wfo;
- let i = ref 0 in
- while !i < num_of_elements do
- if Relaxed_queue.Not_lockfree.push queue !i then i := !i + 1
- else Domain.cpu_relax ()
- done
-
-let run_test num_takers num_pushers () =
- let queue = Relaxed_queue.create ~size_exponent:3 () in
- let num_of_elements = 4_000_000 in
- let wfo = Wait_for_others.init ~total_expected:(num_takers + num_pushers) in
- let _ =
- let takers =
- assert (num_of_elements mod num_takers == 0);
- let items_per_taker = num_of_elements / num_takers in
- List.init num_takers (fun _ ->
- Domain.spawn (taker wfo queue items_per_taker))
- in
- let pushers =
- assert (num_of_elements mod num_pushers == 0);
- let items_per_pusher = num_of_elements / num_pushers in
- List.init num_pushers (fun _ ->
- Domain.spawn (pusher wfo queue items_per_pusher))
- in
- Sys.opaque_identity (List.map Domain.join (pushers @ takers))
- in
- let ({ array; head; tail; _ } : 'a Relaxed_queue.t) = queue in
- let head_val = Atomic.get head in
- let tail_val = Atomic.get tail in
- Alcotest.(check int) "hd an tl match" head_val tail_val;
- Array.iter
- (fun item ->
- Alcotest.(check (option int))
- "ghost item in the queue!" None (Atomic.get item))
- array
-
-let smoke_test_spinning () =
- let queue = Relaxed_queue.create ~size_exponent:2 () in
- (* enqueue 4 *)
- for i = 1 to 4 do
- Relaxed_queue.Spin.push queue i
- done;
- assert (not (Relaxed_queue.Not_lockfree.push queue 0));
- let ({ tail; head; _ } : 'a Relaxed_queue.t) = queue in
- assert (Atomic.get tail = 4);
- assert (Atomic.get head = 0);
- (* dequeue 4 *)
- for i = 1 to 4 do
- Alcotest.(check (option int))
- "items should come out in FIFO order" (Some i)
- (Relaxed_queue.Not_lockfree.pop queue)
- done;
- Alcotest.(check (option int))
- "queue should be empty" None
- (Relaxed_queue.Not_lockfree.pop queue)
-
-let two_threads_spin_test () =
- let queue = Relaxed_queue.create ~size_exponent:2 () in
- let num_of_elements = 1_000_000 in
- (* start dequeuer *)
- let dequeuer =
- Domain.spawn (fun () ->
- for i = 1 to num_of_elements do
- assert (Relaxed_queue.Spin.pop queue == i)
- done)
- in
- (* enqueue *)
- for i = 1 to num_of_elements do
- Relaxed_queue.Spin.push queue i
- done;
- Domain.join dequeuer |> ignore;
- ()
-
-let doms1 = if Sys.word_size >= 64 then 4 else 1
-let doms2 = if Sys.word_size >= 64 then 8 else 1
-
-let () =
- let open Alcotest in
- run "Mpmc_queue"
- (let open Relaxed_queue.Not_lockfree in
- [
- ( "single-thread",
- [ test_case "is it a queue" `Quick (smoke_test (push, pop)) ] );
- ( "validate items",
- [ test_case "1 prod. 1 cons." `Quick (two_threads_test (push, pop)) ]
- );
- ( "validate indices under load",
- [
- test_case " 4 prod. 4 cons." `Slow (run_test doms1 doms1);
- test_case " 8 prod. 1 cons." `Slow (run_test doms2 1);
- test_case " 1 prod. 8 cons." `Slow (run_test 1 doms2);
- ] );
- ]
- @
- let open Relaxed_queue.Not_lockfree.CAS_interface in
- [
- ( "single-thread-CAS-intf",
- [ test_case "is it a queue" `Quick (smoke_test (push, pop)) ] );
- ( "validate items-CAS-intf",
- [ test_case "1 prod. 1 cons." `Quick (two_threads_test (push, pop)) ]
- );
- ]
- @ [
- ( "single-thread-spinning",
- [ test_case "is it a queue" `Quick smoke_test_spinning ] );
- ( "validate-items-spinning",
- [ test_case "1 prod. 1 cons" `Quick two_threads_spin_test ] );
- ])
diff --git a/test/ws_deque/qcheck_ws_deque.ml b/test/ws_deque/qcheck_ws_deque.ml
index 5255ad87..7360a972 100644
--- a/test/ws_deque/qcheck_ws_deque.ml
+++ b/test/ws_deque/qcheck_ws_deque.ml
@@ -1,4 +1,4 @@
-module Ws_deque = Saturn_lockfree.Work_stealing_deque.M
+module Ws_deque = Saturn_lockfree.Work_stealing_deque
(* Sequential building of a deque *)
let deque_of_list l =
diff --git a/test/ws_deque/stm_ws_deque.ml b/test/ws_deque/stm_ws_deque.ml
index f12235f9..15412f5f 100644
--- a/test/ws_deque/stm_ws_deque.ml
+++ b/test/ws_deque/stm_ws_deque.ml
@@ -15,7 +15,7 @@ module Spec = struct
| Steal -> "Steal"
type state = int list
- type sut = int Ws_deque.M.t
+ type sut = int Ws_deque.t
let arb_cmd _s =
let int_gen = Gen.nat in
@@ -30,7 +30,7 @@ module Spec = struct
let stealer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Steal)
let init_state = []
- let init_sut () = Ws_deque.M.create ()
+ let init_sut () = Ws_deque.create ()
let cleanup _ = ()
let next_state c s =
@@ -46,9 +46,9 @@ module Spec = struct
let run c d =
match c with
- | Push i -> Res (unit, Ws_deque.M.push d i)
- | Pop -> Res (result int exn, protect Ws_deque.M.pop d)
- | Steal -> Res (result int exn, protect Ws_deque.M.steal d)
+ | Push i -> Res (unit, Ws_deque.push d i)
+ | Pop -> Res (result int exn, protect Ws_deque.pop d)
+ | Steal -> Res (result int exn, protect Ws_deque.steal d)
let postcond c (s : state) res =
match (c, res) with
diff --git a/test/ws_deque/test_ws_deque.ml b/test/ws_deque/test_ws_deque.ml
index 3681c239..b4094c54 100644
--- a/test/ws_deque/test_ws_deque.ml
+++ b/test/ws_deque/test_ws_deque.ml
@@ -1,4 +1,4 @@
-open Saturn_lockfree.Work_stealing_deque.M
+open Saturn_lockfree.Work_stealing_deque
(** Tests *)
let test_empty () =
diff --git a/test/ws_deque/ws_deque_dscheck.ml b/test/ws_deque/ws_deque_dscheck.ml
index 2c2c87dc..c8a3afce 100644
--- a/test/ws_deque/ws_deque_dscheck.ml
+++ b/test/ws_deque/ws_deque_dscheck.ml
@@ -2,7 +2,7 @@ let drain_remaining queue =
let remaining = ref 0 in
(try
while true do
- Ws_deque.M.pop queue |> ignore;
+ Ws_deque.pop queue |> ignore;
remaining := !remaining + 1
done
with _ -> ());
@@ -10,7 +10,7 @@ let drain_remaining queue =
let owner_stealer () =
Atomic.trace (fun () ->
- let queue = Ws_deque.M.create () in
+ let queue = Ws_deque.create () in
let total_items = 3 in
let popped = ref 0 in
@@ -18,10 +18,10 @@ let owner_stealer () =
(* owner thr *)
Atomic.spawn (fun () ->
for _ = 1 to total_items do
- Ws_deque.M.push queue 0
+ Ws_deque.push queue 0
done;
for _ = 1 to total_items / 2 do
- match Ws_deque.M.pop queue with
+ match Ws_deque.pop queue with
| exception _ -> ()
| _ -> popped := !popped + 1
done);
@@ -29,7 +29,7 @@ let owner_stealer () =
(* stealer *)
Atomic.spawn (fun () ->
for _ = 1 to total_items / 2 do
- match Ws_deque.M.steal queue with
+ match Ws_deque.steal queue with
| exception _ -> ()
| _ -> popped := !popped + 1
done);
@@ -41,16 +41,16 @@ let owner_stealer () =
let popper_stealer () =
Atomic.trace (fun () ->
- let queue = Ws_deque.M.create () in
+ let queue = Ws_deque.create () in
let total_items = 3 in
for _ = 1 to total_items do
- Ws_deque.M.push queue 0
+ Ws_deque.push queue 0
done;
(* stealers *)
let popped = ref 0 in
let stealer () =
- match Ws_deque.M.steal queue with
+ match Ws_deque.steal queue with
| exception _ -> ()
| _ -> popped := !popped + 1
in