From 357c1fe0b5be911865ddfa242e1169c77956aef8 Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 31 Oct 2024 10:42:10 +0100 Subject: [PATCH] Remove relaxed_queue and M module in ws_deque (#157) * Remove relaxed queue. * Remove module M in ws_deque. --- README.md | 15 +- bench/bench_relaxed_queue.ml | 115 ---------- bench/bench_ws_deque.ml | 2 +- bench/main.ml | 1 - src/mpmc_relaxed_queue.ml | 152 -------------- src/mpmc_relaxed_queue.mli | 68 ------ src/saturn.ml | 1 - src/saturn.mli | 1 - src_lockfree/mpmc_relaxed_queue.ml | 134 ------------ src_lockfree/mpmc_relaxed_queue.mli | 36 ---- src_lockfree/saturn_lockfree.ml | 1 - src_lockfree/saturn_lockfree.mli | 1 - src_lockfree/ws_deque.ml | 197 ++++++++---------- src_lockfree/ws_deque.mli | 36 ++-- test/mpmc_relaxed_queue/dune | 5 - .../test_mpmc_relaxed_queue.ml | 173 --------------- test/ws_deque/qcheck_ws_deque.ml | 2 +- test/ws_deque/stm_ws_deque.ml | 10 +- test/ws_deque/test_ws_deque.ml | 2 +- test/ws_deque/ws_deque_dscheck.ml | 16 +- 20 files changed, 131 insertions(+), 837 deletions(-) delete mode 100644 bench/bench_relaxed_queue.ml delete mode 100644 src/mpmc_relaxed_queue.ml delete mode 100644 src/mpmc_relaxed_queue.mli delete mode 100644 src_lockfree/mpmc_relaxed_queue.ml delete mode 100644 src_lockfree/mpmc_relaxed_queue.mli delete mode 100644 test/mpmc_relaxed_queue/dune delete mode 100644 test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml diff --git a/README.md b/README.md index a95a08c0..6f683c85 100644 --- a/README.md +++ b/README.md @@ -101,14 +101,13 @@ if you prefer to use only lock-free data structures. ## Provided data structures -| Name | Module in `Saturn`
(in `Saturn_lockfree`) | Description | Sources | -| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| Treiber Stack | [`Stack`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Stack/index.html) (same) | A classic multi-producer multi-consumer stack, robust and flexible. Recommended starting point when needing a LIFO structure | | -| Michael-Scott Queue | [`Queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Queue/index.html) (same) | A classic multi-producer multi-consumer queue, robust and flexible. Recommended starting point when needing a FIFO structure. | [Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf) | -| Chase-Lev Work-Stealing Dequeue | [`Work_stealing_deque`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Work_stealing_deque/index.html) (same) | Single-producer, multi-consumer, dynamic-size, double-ended queue (deque). Ideal for throughput-focused scheduling using per-core work distribution. Note, `pop` and `steal` follow different ordering (respectively LIFO and FIFO) and have different linearisation contraints. | [Dynamic Circular Work-Stealing Deque](https://dl.acm.org/doi/10.1145/1073970.1073974) and [Correct and Efficient Work-Stealing for Weak Memory Models](https://dl.acm.org/doi/abs/10.1145/2442516.2442524)) | -| SPSC Queue | [`Single_prod_single_`
`cons_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_prod_single_cons_queue/index.html) (same) | Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time. | | -| MPMC Bounded Relaxed Queue | [`Relaxed_queue`](https://ocaml-multicore.github.io/saturn/saturn/Saturn/Relaxed_queue/index.html) ([same](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Relaxed_queue/index.html)) | Multi-producer, multi-consumer, fixed-size relaxed queue. Optimised for high number of threads. Not strictly FIFO. Note, it exposes two interfaces: a lockfree and a non-lockfree (albeit more practical) one. See the `mli` for details. | | -| MPSC Queue | [`Single_consumer_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_consumer_queue/index.html) (same) | A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). | It is a single consumer version of the queue described in [Implementing Lock-Free Queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf). | +| Name | Module in `Saturn`
(in `Saturn_lockfree`) | Description | Sources | +| ------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Treiber Stack | [`Stack`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Stack/index.html) (same) | A classic multi-producer multi-consumer stack, robust and flexible. Recommended starting point when needing a LIFO structure | | +| Michael-Scott Queue | [`Queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Queue/index.html) (same) | A classic multi-producer multi-consumer queue, robust and flexible. Recommended starting point when needing a FIFO structure. | [Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf) | +| Chase-Lev Work-Stealing Dequeue | [`Work_stealing_deque`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Work_stealing_deque/index.html) (same) | Single-producer, multi-consumer, dynamic-size, double-ended queue (deque). Ideal for throughput-focused scheduling using per-core work distribution. Note, `pop` and `steal` follow different ordering (respectively LIFO and FIFO) and have different linearisation constraints. | [Dynamic Circular Work-Stealing Deque](https://dl.acm.org/doi/10.1145/1073970.1073974) and [Correct and Efficient Work-Stealing for Weak Memory Models](https://dl.acm.org/doi/abs/10.1145/2442516.2442524) | +| SPSC Queue | [`Single_prod_single_cons_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_prod_single_cons_queue/index.html) (same) | Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time. | +| MPSC Queue | [`Single_consumer_queue`](https://ocaml-multicore.github.io/saturn/saturn_lockfree/Lockfree/Single_consumer_queue/index.html) (same) | A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). | It is a single consumer version of the queue described in [Implementing Lock-Free Queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf). | ## Motivation diff --git a/bench/bench_relaxed_queue.ml b/bench/bench_relaxed_queue.ml deleted file mode 100644 index 9d8f1cfa..00000000 --- a/bench/bench_relaxed_queue.ml +++ /dev/null @@ -1,115 +0,0 @@ -open Multicore_bench -module Queue = Saturn.Relaxed_queue -module Spin = Queue.Spin -module Not_lockfree = Queue.Not_lockfree -module CAS_interface = Queue.Not_lockfree.CAS_interface - -let run_one ~budgetf ~n_adders ~n_takers ?(n_msgs = 50 * Util.iter_factor) - ?(api = `Spin) () = - let n_domains = n_adders + n_takers in - - let t = Queue.create ~size_exponent:10 () in - - let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in - let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in - - let init _ = - assert (Not_lockfree.pop t == None); - Atomic.set n_msgs_to_take n_msgs; - Atomic.set n_msgs_to_add n_msgs - in - let work i () = - if i < n_adders then - let rec work () = - let n = Util.alloc n_msgs_to_add in - if n <> 0 then begin - match api with - | `Spin -> - for i = 1 to n do - Spin.push t i - done; - work () - | `Not_lockfree -> - let rec loop n = - if 0 < n then - if Not_lockfree.push t i then loop (n - 1) - else begin - Domain.cpu_relax (); - loop n - end - else work () - in - loop n - | `CAS_interface -> - let rec loop n = - if 0 < n then - if CAS_interface.push t i then loop (n - 1) - else begin - Domain.cpu_relax (); - loop n - end - else work () - in - loop n - end - in - work () - else - let rec work () = - let n = Util.alloc n_msgs_to_take in - if n <> 0 then - match api with - | `Spin -> - for _ = 1 to n do - Spin.pop t |> ignore - done; - work () - | `Not_lockfree -> - let rec loop n = - if 0 < n then begin - match Not_lockfree.pop t with - | None -> - Domain.cpu_relax (); - loop n - | Some _ -> loop (n - 1) - end - else work () - in - loop n - | `CAS_interface -> - let rec loop n = - if 0 < n then begin - match CAS_interface.pop t with - | None -> - Domain.cpu_relax (); - loop n - | Some _ -> loop (n - 1) - end - else work () - in - loop n - in - work () - in - - let config = - let plural role n = - Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") - in - Printf.sprintf "%s, %s (%s)" (plural "adder" n_adders) - (plural "taker" n_takers) - (match api with - | `Spin -> "spin" - | `Not_lockfree -> "not lf" - | `CAS_interface -> "cas") - in - - Times.record ~budgetf ~n_domains ~init ~work () - |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config - -let run_suite ~budgetf = - Util.cross - [ `Spin; `Not_lockfree; `CAS_interface ] - (Util.cross [ 1; 2 ] [ 1; 2 ]) - |> List.concat_map @@ fun (api, (n_adders, n_takers)) -> - run_one ~budgetf ~n_adders ~n_takers ~api () diff --git a/bench/bench_ws_deque.ml b/bench/bench_ws_deque.ml index a6f5fce4..585225cf 100644 --- a/bench/bench_ws_deque.ml +++ b/bench/bench_ws_deque.ml @@ -1,5 +1,5 @@ open Multicore_bench -module Ws_deque = Saturn_lockfree.Work_stealing_deque.M +module Ws_deque = Saturn_lockfree.Work_stealing_deque let run_as_scheduler ~budgetf ?(n_domains = 1) () = let spawns = diff --git a/bench/main.ml b/bench/main.ml index be50b92b..6724fa12 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -1,6 +1,5 @@ let benchmarks = [ - ("Saturn Relaxed_queue", Bench_relaxed_queue.run_suite); ("Saturn_lockfree Queue", Bench_queue.run_suite); ("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite); ("Saturn_lockfree Size", Bench_size.run_suite); diff --git a/src/mpmc_relaxed_queue.ml b/src/mpmc_relaxed_queue.ml deleted file mode 100644 index 24ffc811..00000000 --- a/src/mpmc_relaxed_queue.ml +++ /dev/null @@ -1,152 +0,0 @@ -include Saturn_lockfree.Relaxed_queue - -module Spin = struct - let push = push - let pop = pop -end - -(* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *) -let ccas cell seen v = - if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v - -module Not_lockfree = struct - (* [spin_threshold] Number of times on spin on a slot before trying an exit strategy. *) - let spin_threshold = 30 - - (* [try_other_exit_every_n] There is two strategies that push/pop can take to fix state ( - to be able to return without completion). Generally, we want to try to do "rollback" more - than "push forward", as the latter adds contention to the side that might already not - be keeping up. *) - let try_other_exit_every_n = 10 - let time_to_try_push_forward n = n mod try_other_exit_every_n == 0 - - let push { array; tail; head; mask; _ } item = - let tail_val = Atomic.fetch_and_add tail 1 in - let index = tail_val land mask in - let cell = Array.get array index in - - (* spin for a bit *) - let i = ref 0 in - while - !i < spin_threshold && not (Atomic.compare_and_set cell None (Some item)) - do - i := !i + 1 - done; - - (* define clean up function *) - let rec take_or_rollback nth_attempt = - if Atomic.compare_and_set cell None (Some item) then - (* succedded to push *) - true - else if ccas tail (tail_val + 1) tail_val then (* rolled back tail *) - false - else if - time_to_try_push_forward nth_attempt && ccas head tail_val (tail_val + 1) - then (* pushed forward head *) - false - else begin - Domain.cpu_relax (); - (* retry *) - take_or_rollback (nth_attempt + 1) - end - in - - (* if succeeded return true otherwise clean up *) - if !i < spin_threshold then true else take_or_rollback 0 - - let take_item cell = - let value = Atomic.get cell in - if Option.is_some value && Atomic.compare_and_set cell value None then value - else None - - let pop queue = - let ({ array; head; tail; mask; _ } : 'a t) = queue in - let head_value = Atomic.get head in - let tail_value = Atomic.get tail in - if head_value - tail_value >= 0 then None - else - let old_head = Atomic.fetch_and_add head 1 in - let cell = Array.get array (old_head land mask) in - - (* spin for a bit *) - let i = ref 0 in - let item = ref None in - while !i < spin_threshold && not (Option.is_some !item) do - item := take_item cell; - i := !i + 1 - done; - - (* define clean up function *) - let rec take_or_rollback nth_attempt = - let value = Atomic.get cell in - if Option.is_some value && Atomic.compare_and_set cell value None then - (* dequeued an item, return it *) - value - else if ccas head (old_head + 1) old_head then - (* rolled back head *) - None - else if - time_to_try_push_forward nth_attempt - && ccas tail old_head (old_head + 1) - then (* pushed tail forward *) - None - else begin - Domain.cpu_relax (); - take_or_rollback (nth_attempt + 1) - end - in - - (* return if got item, clean up otherwise *) - if Option.is_some !item then !item else take_or_rollback 0 - - module CAS_interface = struct - let rec push ({ array; tail; head; mask; _ } as t) item = - let tail_val = Atomic.get tail in - let head_val = Atomic.get head in - let size = mask + 1 in - if tail_val - head_val >= size then false - else if ccas tail tail_val (tail_val + 1) then begin - let index = tail_val land mask in - let cell = Array.get array index in - (* - Given that code above checks for overlap, is this CAS needed? - - Yes. Even though a thread cannot explicitely enter overlap, - it can still occur just because enqueuer may theoretically be - unscheduled for unbounded amount of time between incrementing - index and filling the slot. - - I doubt we'd observe that case in real-life (outside some - extreme circumstances), but this optimization has to be left - for the user to decide. After all, algorithm would not pass - model-checking without it. - - Incidentally, it also makes this method interoperable with - standard interface. - *) - while not (Atomic.compare_and_set cell None (Some item)) do - Domain.cpu_relax () - done; - true - end - else push t item - - let rec pop ({ array; tail; head; mask; _ } as t) = - let tail_val = Atomic.get tail in - let head_val = Atomic.get head in - if head_val - tail_val >= 0 then None - else if ccas head head_val (head_val + 1) then begin - let index = head_val land mask in - let cell = Array.get array index in - let item = ref (Atomic.get cell) in - while - not (Option.is_some !item && Atomic.compare_and_set cell !item None) - do - Domain.cpu_relax (); - item := Atomic.get cell - done; - !item - end - else pop t - end -end diff --git a/src/mpmc_relaxed_queue.mli b/src/mpmc_relaxed_queue.mli deleted file mode 100644 index b7267926..00000000 --- a/src/mpmc_relaxed_queue.mli +++ /dev/null @@ -1,68 +0,0 @@ -(** - A multi-producer, multi-consumer, thread-safe, bounded relaxed-FIFO queue. - - It exposes two interfaces: [Spin] and [Not_lockfree]. [Spin] is lock-free - formally, but the property is achieved in a fairly counterintuitive way - - - by using the fact that lock-freedom does not impose any constraints on - partial methods. In simple words, an invocation of function that cannot - logically terminate (`push` on full queue, `pop` on empty queue), it is - allowed to *busy-wait* until the precondition is meet. - - Above interface is impractical outside specialized applications. Thus, - [Mpmc_relaxed_queue] also exposes [Not_lockfree] interface. [Not_lockfree] - contains non-lockfree paths. While formally a locked algorithm, it will - often be the more practical solution as it allows having an overflow - queue, etc. -*) - -type 'a t = private { - array : 'a Option.t Atomic.t Array.t; - head : int Atomic.t; - tail : int Atomic.t; - mask : int; -} -(** A queue of items of type ['a]. Implementation exposed for testing. *) - -val create : size_exponent:int -> unit -> 'a t -(** [create ~size_exponent:int] creates an empty queue of size - [2^size_exponent]. *) - -module Spin : sig - (** [Spin] exposes a formally lock-free interface as per the [A lock-free relaxed - concurrent queue for fast work distribution] paper. Functions here busy-wait if - the action cannot be completed (i.e. [push] on full queue, [pop] on empty queue). *) - - val push : 'a t -> 'a -> unit - (** [push t x] adds [x] to the tail of the queue. If the queue is full, [push] - busy-waits until another thread removes an item. *) - - val pop : 'a t -> 'a - (** [pop t] removes an item from the head of the queue. If the queue is empty, - [pop] busy-waits until an item appear. *) -end - -module Not_lockfree : sig - (** [Non_lockfree] exposes an interface that contains non-lockfree paths, i.e. threads - may need to cooperate to terminate. It is often more practical than [Spin], in - particular when using a fair OS scheduler. *) - - val push : 'a t -> 'a -> bool - (** [push t x] adds [x] to the tail of the queue. - Returns [false] if [t] is currently full. *) - - val pop : 'a t -> 'a option - (** [pop t] removes the head item from [t] and returns it. - Returns [None] if [t] is currently empty. *) - - module CAS_interface : sig - (** Alternative interface, which may perform better on architectures without - FAD instructions (e.g. AArch). - - CAS_interface should not be the default choice. It may be a little faster - on ARM, but it is going to be a few times slower than standard on x86. - *) - - val push : 'a t -> 'a -> bool - val pop : 'a t -> 'a option - end -end diff --git a/src/saturn.ml b/src/saturn.ml index f8464bae..8df10afd 100644 --- a/src/saturn.ml +++ b/src/saturn.ml @@ -38,7 +38,6 @@ module Single_prod_single_cons_queue_unsafe = Saturn_lockfree.Single_prod_single_cons_queue_unsafe module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue -module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Saturn_lockfree.Skiplist module Htbl = Saturn_lockfree.Htbl module Htbl_unsafe = Saturn_lockfree.Htbl_unsafe diff --git a/src/saturn.mli b/src/saturn.mli index 2906447c..c684525a 100644 --- a/src/saturn.mli +++ b/src/saturn.mli @@ -42,7 +42,6 @@ module Single_prod_single_cons_queue_unsafe = Saturn_lockfree.Single_prod_single_cons_queue_unsafe module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue -module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Saturn_lockfree.Skiplist module Htbl = Saturn_lockfree.Htbl module Htbl_unsafe = Saturn_lockfree.Htbl_unsafe diff --git a/src_lockfree/mpmc_relaxed_queue.ml b/src_lockfree/mpmc_relaxed_queue.ml deleted file mode 100644 index db2ca43e..00000000 --- a/src_lockfree/mpmc_relaxed_queue.ml +++ /dev/null @@ -1,134 +0,0 @@ -(* - # General idea - - It is the easiest to explain the general idea on an array of infinite size. - Let's start with that. Each element in such an array constitutes a single-use - exchange slot. Enqueuer increments [tail] and treats prior value as index of - its slot. Same for dequeuer and [head]. This effectively creates pairs - (enqueuer, dequeuer) assigned to the same slot. Enqueuer leaves the value in - the slot, dequer copies it out. - - Enqueuer never fails. It always gets a brand-new slot and places item in it. - Dequeuer, on the other hand, may witness an empty slot. That's because [head] - may jump behind [tail]. Remember, indices are implemented blindy. For now, - assume dequeuer simply spins on the empty slot until an item appears. - - That's it. There's a few things flowing from this construction: - * Slots are atomic. This is where paired enqueuer and dequeuer communicate. - * [head] overshooting [tail] is a normal condition and that's good - we want - to keep operations on [head] and [tail] independent. - - # Finite array - - Now, to make it work in real-world, simply treat finite array as circular, - i.e. wrap around when reached the end. Slots are now re-used, so we need to be - more careful. - - Firstly, if there's too many items, enqueuer may witness a full slot. Let's assume - enqueuer simply spins on full slot until some dequeuer appears and takes the old - value. - - Secondly, in the case of overlap, there can be more than 2 threads (1x enqueuer, - 1x dequeuer) assigned to a single slot (imagine 10 enqueuers spinning on an 8-slot - array). In fact, it could be any number. Thus, all operations on slot have to use - CAS to ensure that no item is overwrriten on store and no item is dequeued by two - threads at once. - - Above works okay in practise, and there is some relevant literature, e.g. - (DOI: 10.1145/3437801.3441583) analyzed this particular design. There's also - plenty older papers looking at similar approaches - (e.g. DOI: 10.1145/2851141.2851168). - - Note, this design may violate FIFO (on overlap). The risk can be minimized by - ensuring size of array >> number of threads but it's never zero. - (github.com/rigtorp/MPMCQueue has a nice way of fixing this, we could add it). - - # Blocking (non-lockfree paths on full, empty) - - Up until now [push] and [pop] were allowed to block indefinitely on empty and full - queue. Overall, what can be done in those states? - - 1. Busy wait until able to finish. - 2. Rollback own index with CAS (unassign itself from slot). - 3. Move forward other index with CAS (assign itself to the same slot as opposite - action). - 4. Mark slot as burned - dequeue only. - - Which one then? - - Let's optimize for stability, i.e. some reasonable latency that won't get much worse - under heavy load. Busy wait is great because it does not cause any contention in the - hotspots ([head], [tail]). Thus, start with busy wait (1). If queue is busy and - moving fast, there is a fair chance that within, say, 30 spins, we'll manage to - complete action without having to add contention elsewhere. - - Once N busy-loops happen and nothing changes, we probably want to return even if its - costs. (2), (3) both allow that. (2) doesn't add contention to the other index like - (3) does. Say, there's a lot more dequeuers than enqueuers, if all dequeurs did (3), - they would add a fair amount of contention to the [tail] index and slow the - already-outnumbered enqueuers further. So, (2) > (3) for that reason. - - However, with just (2), some dequeuers will struggle to return. If many dequeuers - constatly try to pop an element and fail, they will form a chain. - - tl hd - | | - [.]-[A]-[B]-[C]-..-[X] - - For A to rollback, B has to rollback first. For B to rollback C has to rollback first. - - [A] is likely to experience a large latency spike. In such a case, it is easier for [A] - to do (3) rather than hope all other active dequeuers will unblock it at some point. - Thus, it's worthwile also trying to do (3) periodically. - - Thus, the current policy does (1) for a bit, then (1), (2) with periodic (3). - - What about burned slots (4)? - - It's present in the literature. Weakly I'm not a fan. If dequeuers are faster to remove - items than enqueuers supply them, slots burned by dequeuers are going to make enqueuers - do even more work. - - # Resizing - - The queue does not support resizing, but it can be simulated by wrapping it in a - lockfree list. -*) - -type 'a t = { - array : 'a Option.t Atomic.t Array.t; - head : int Atomic.t; - tail : int Atomic.t; - mask : int; -} - -let create ~size_exponent () : 'a t = - let size = 1 lsl size_exponent in - let array = Array.init size (fun _ -> Atomic.make None) in - let mask = size - 1 in - let head = Atomic.make 0 in - let tail = Atomic.make 0 in - { array; head; tail; mask } - -(* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *) -let ccas cell seen v = - if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v - -let push { array; tail; mask; _ } item = - let tail_val = Atomic.fetch_and_add tail 1 in - let index = tail_val land mask in - let cell = Array.get array index in - while not (ccas cell None (Some item)) do - Domain.cpu_relax () - done - -let pop { array; head; mask; _ } = - let head_val = Atomic.fetch_and_add head 1 in - let index = head_val land mask in - let cell = Array.get array index in - let item = ref (Atomic.get cell) in - while Option.is_none !item || not (ccas cell !item None) do - Domain.cpu_relax (); - item := Atomic.get cell - done; - Option.get !item diff --git a/src_lockfree/mpmc_relaxed_queue.mli b/src_lockfree/mpmc_relaxed_queue.mli deleted file mode 100644 index 04049734..00000000 --- a/src_lockfree/mpmc_relaxed_queue.mli +++ /dev/null @@ -1,36 +0,0 @@ -(** - A multi-producer, multi-consumer, thread-safe, bounded relaxed-FIFO queue. - - This interface is a subset of the one in `Saturn.Relaxed_queue` - that exposes a formally lock-free interface as per the [A - lock-free relaxed concurrent queue for fast work distribution] - paper. - - [push] and [pop] are said to be `lock-free formally`, because the - property is achieved in a fairly counterintuitive way - by using - the fact that lock-freedom does not impose any constraints on - partial methods. In simple words, an invocation of function that - cannot logically terminate (`push` on full queue, `pop` on empty - queue), it is allowed to *busy-wait* until the precondition is - meet. -*) - -type 'a t = private { - array : 'a Option.t Atomic.t Array.t; - head : int Atomic.t; - tail : int Atomic.t; - mask : int; -} -(** A queue of items of type ['a]. Implementation exposed for testing. *) - -val create : size_exponent:int -> unit -> 'a t -(** [create ~size_exponent:int] creates an empty queue of size - [2^size_exponent]. *) - -val push : 'a t -> 'a -> unit -(** [push t x] adds [x] to the tail of the queue. If the queue is full, [push] - busy-waits until another thread removes an item. *) - -val pop : 'a t -> 'a -(** [pop t] removes an item from the head of the queue. If the queue is empty, - [pop] busy-waits until an item appear. *) diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml index 3c1fd09c..eff05f9d 100644 --- a/src_lockfree/saturn_lockfree.ml +++ b/src_lockfree/saturn_lockfree.ml @@ -33,7 +33,6 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe module Single_consumer_queue = Mpsc_queue -module Relaxed_queue = Mpmc_relaxed_queue module Size = Size module Skiplist = Skiplist module Htbl = Htbl diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli index 214a92e9..eac1789b 100644 --- a/src_lockfree/saturn_lockfree.mli +++ b/src_lockfree/saturn_lockfree.mli @@ -37,7 +37,6 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe module Single_consumer_queue = Mpsc_queue -module Relaxed_queue = Mpmc_relaxed_queue module Skiplist = Skiplist module Size = Size module Htbl = Htbl diff --git a/src_lockfree/ws_deque.ml b/src_lockfree/ws_deque.ml index 02bd6d6d..e09e9530 100644 --- a/src_lockfree/ws_deque.ml +++ b/src_lockfree/ws_deque.ml @@ -27,17 +27,6 @@ * https://dl.acm.org/doi/abs/10.1145/2442516.2442524 *) -module type S = sig - type 'a t - - val create : unit -> 'a t - val push : 'a t -> 'a -> unit - val pop : 'a t -> 'a - val pop_opt : 'a t -> 'a option - val steal : 'a t -> 'a - val steal_opt : 'a t -> 'a option -end - module CArray = struct type 'a t = 'a array @@ -85,105 +74,103 @@ module CArray = struct dst end -module M : S = struct - let min_size = 32 - let shrink_const = 3 - - type 'a t = { - top : int Atomic.t; - bottom : int Atomic.t; - tab : 'a ref CArray.t Atomic.t; - mutable next_shrink : int; +let min_size = 32 +let shrink_const = 3 + +type 'a t = { + top : int Atomic.t; + bottom : int Atomic.t; + tab : 'a ref CArray.t Atomic.t; + mutable next_shrink : int; +} + +let create () = + { + top = Atomic.make 1; + bottom = Atomic.make 1; + tab = Atomic.make (CArray.create min_size (Obj.magic ())); + next_shrink = 0; } - let create () = - { - top = Atomic.make 1; - bottom = Atomic.make 1; - tab = Atomic.make (CArray.create min_size (Obj.magic ())); - next_shrink = 0; - } - - let set_next_shrink q = - let sz = CArray.size (Atomic.get q.tab) in - if sz <= min_size then q.next_shrink <- 0 - else q.next_shrink <- sz / shrink_const - - let grow q t b = - Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b); - set_next_shrink q - - let size q = - let b = Atomic.get q.bottom in - let t = Atomic.get q.top in - b - t - - let push q v = - let v' = ref v in - let b = Atomic.get q.bottom in +let set_next_shrink q = + let sz = CArray.size (Atomic.get q.tab) in + if sz <= min_size then q.next_shrink <- 0 + else q.next_shrink <- sz / shrink_const + +let grow q t b = + Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b); + set_next_shrink q + +let size q = + let b = Atomic.get q.bottom in + let t = Atomic.get q.top in + b - t + +let push q v = + let v' = ref v in + let b = Atomic.get q.bottom in + let t = Atomic.get q.top in + let a = Atomic.get q.tab in + let size = b - t in + let a = + if size = CArray.size a then ( + grow q t b; + Atomic.get q.tab) + else a + in + CArray.put a b v'; + Atomic.set q.bottom (b + 1) + +let release ptr = + let res = !ptr in + (* we know this ptr will never be dereferenced, but want to + break the reference to ensure that the contents of the + deque array get garbage collected *) + ptr := Obj.magic (); + res +[@@inline] + +let pop q = + if size q = 0 then raise Exit + else + let b = Atomic.get q.bottom - 1 in + Atomic.set q.bottom b; let t = Atomic.get q.top in let a = Atomic.get q.tab in let size = b - t in - let a = - if size = CArray.size a then ( - grow q t b; - Atomic.get q.tab) - else a - in - CArray.put a b v'; - Atomic.set q.bottom (b + 1) - - let release ptr = - let res = !ptr in - (* we know this ptr will never be dereferenced, but want to - break the reference to ensure that the contents of the - deque array get garbage collected *) - ptr := Obj.magic (); - res - [@@inline] - - let pop q = - if size q = 0 then raise Exit + if size < 0 then ( + (* empty queue *) + Atomic.set q.bottom (b + 1); + raise Exit) else - let b = Atomic.get q.bottom - 1 in - Atomic.set q.bottom b; - let t = Atomic.get q.top in - let a = Atomic.get q.tab in - let size = b - t in - if size < 0 then ( - (* empty queue *) - Atomic.set q.bottom (b + 1); - raise Exit) - else - let out = CArray.get a b in - if b = t then - (* single last element *) - if Atomic.compare_and_set q.top t (t + 1) then ( - Atomic.set q.bottom (b + 1); - release out) - else ( - Atomic.set q.bottom (b + 1); - raise Exit) - else ( - (* non-empty queue *) - if q.next_shrink > size then ( - Atomic.set q.tab (CArray.shrink a t b); - set_next_shrink q); + let out = CArray.get a b in + if b = t then + (* single last element *) + if Atomic.compare_and_set q.top t (t + 1) then ( + Atomic.set q.bottom (b + 1); release out) + else ( + Atomic.set q.bottom (b + 1); + raise Exit) + else ( + (* non-empty queue *) + if q.next_shrink > size then ( + Atomic.set q.tab (CArray.shrink a t b); + set_next_shrink q); + release out) + +let pop_opt q = try Some (pop q) with Exit -> None + +let rec steal backoff q = + let t = Atomic.get q.top in + let b = Atomic.get q.bottom in + let size = b - t in + if size <= 0 then raise Exit + else + let a = Atomic.get q.tab in + let out = CArray.get a t in + if Atomic.compare_and_set q.top t (t + 1) then release out + else steal (Backoff.once backoff) q - let pop_opt q = try Some (pop q) with Exit -> None - - let rec steal backoff q = - let t = Atomic.get q.top in - let b = Atomic.get q.bottom in - let size = b - t in - if size <= 0 then raise Exit - else - let a = Atomic.get q.tab in - let out = CArray.get a t in - if Atomic.compare_and_set q.top t (t + 1) then release out - else steal (Backoff.once backoff) q - - let steal q = steal Backoff.default q - let steal_opt q = try Some (steal q) with Exit -> None -end +let steal q = steal Backoff.default q +let steal_opt q = try Some (steal q) with Exit -> None diff --git a/src_lockfree/ws_deque.mli b/src_lockfree/ws_deque.mli index 7f958584..08654e09 100644 --- a/src_lockfree/ws_deque.mli +++ b/src_lockfree/ws_deque.mli @@ -13,45 +13,41 @@ Thus, it is not the best choice when tail latency matters. *) -module type S = sig - type 'a t - (** Type of work-stealing queue *) +type 'a t +(** Type of work-stealing queue *) - val create : unit -> 'a t - (** [create ()] returns a new empty work-stealing queue. *) +val create : unit -> 'a t +(** [create ()] returns a new empty work-stealing queue. *) - (** {1 Queue owner functions} *) +(** {1 Queue owner functions} *) - val push : 'a t -> 'a -> unit - (** [push t v] adds [v] to the front of the queue [q]. +val push : 'a t -> 'a -> unit +(** [push t v] adds [v] to the front of the queue [q]. It should only be invoked by the domain which owns the queue [q]. *) - val pop : 'a t -> 'a - (** [pop q] removes and returns the first element in queue +val pop : 'a t -> 'a +(** [pop q] removes and returns the first element in queue [q].It should only be invoked by the domain which owns the queue [q]. @raise Exit if the queue is empty. *) - val pop_opt : 'a t -> 'a option - (** [pop_opt q] removes and returns the first element in queue [q], or +val pop_opt : 'a t -> 'a option +(** [pop_opt q] removes and returns the first element in queue [q], or returns [None] if the queue is empty. *) - (** {1 Stealers function} *) +(** {1 Stealers function} *) - val steal : 'a t -> 'a - (** [steal q] removes and returns the last element from queue +val steal : 'a t -> 'a +(** [steal q] removes and returns the last element from queue [q]. It should only be invoked by domain which doesn't own the queue [q]. @raise Exit if the queue is empty. *) - val steal_opt : 'a t -> 'a option - (** [steal_opt q] removes and returns the last element from queue +val steal_opt : 'a t -> 'a option +(** [steal_opt q] removes and returns the last element from queue [q], or returns [None] if the queue is empty. It should only be invoked by domain which doesn't own the queue [q]. *) -end - -module M : S diff --git a/test/mpmc_relaxed_queue/dune b/test/mpmc_relaxed_queue/dune deleted file mode 100644 index 9b505137..00000000 --- a/test/mpmc_relaxed_queue/dune +++ /dev/null @@ -1,5 +0,0 @@ -(test - (package saturn) - (name test_mpmc_relaxed_queue) - (libraries saturn unix alcotest domain_shims) - (modules test_mpmc_relaxed_queue)) diff --git a/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml b/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml deleted file mode 100644 index dd850a59..00000000 --- a/test/mpmc_relaxed_queue/test_mpmc_relaxed_queue.ml +++ /dev/null @@ -1,173 +0,0 @@ -module Relaxed_queue = Saturn.Relaxed_queue - -let smoke_test (push, pop) () = - let queue = Relaxed_queue.create ~size_exponent:2 () in - (* enqueue 4 *) - for i = 1 to 4 do - Alcotest.(check bool) - "there should be space in the queue" (push queue i) true - done; - assert (not (push queue 0)); - let ({ tail; head; _ } : 'a Relaxed_queue.t) = queue in - assert (Atomic.get tail = 4); - assert (Atomic.get head = 0); - (* dequeue 4 *) - for i = 1 to 4 do - Alcotest.(check (option int)) - "items should come out in FIFO order" (Some i) (pop queue) - done; - Alcotest.(check (option int)) "queue should be empty" None (pop queue) - -let two_threads_test (push, pop) () = - let queue = Relaxed_queue.create ~size_exponent:2 () in - let num_of_elements = 1_000_000 in - (* start dequeuer *) - let dequeuer = - Domain.spawn (fun () -> - let i = ref 0 in - while !i < num_of_elements do - match pop queue with - | Some item -> - Alcotest.(check int) - "popped items should follow FIFO order" item !i; - i := !i + 1 - | None -> Domain.cpu_relax () - done) - in - (* enqueue *) - let i = ref 0 in - while !i < num_of_elements do - if push queue !i then i := !i + 1 else Domain.cpu_relax () - done; - Domain.join dequeuer |> ignore; - () - -module Wait_for_others = struct - type t = { currently : int Atomic.t; total_expected : int } - - let init ~total_expected = { currently = Atomic.make 0; total_expected } - - let wait { currently; total_expected } = - Atomic.incr currently; - while Atomic.get currently < total_expected do - Domain.cpu_relax () - done -end - -let taker wfo queue num_of_elements () = - Wait_for_others.wait wfo; - let i = ref 0 in - while !i < num_of_elements do - if Option.is_some (Relaxed_queue.Not_lockfree.pop queue) then i := !i + 1 - else Domain.cpu_relax () - done - -let pusher wfo queue num_of_elements () = - Wait_for_others.wait wfo; - let i = ref 0 in - while !i < num_of_elements do - if Relaxed_queue.Not_lockfree.push queue !i then i := !i + 1 - else Domain.cpu_relax () - done - -let run_test num_takers num_pushers () = - let queue = Relaxed_queue.create ~size_exponent:3 () in - let num_of_elements = 4_000_000 in - let wfo = Wait_for_others.init ~total_expected:(num_takers + num_pushers) in - let _ = - let takers = - assert (num_of_elements mod num_takers == 0); - let items_per_taker = num_of_elements / num_takers in - List.init num_takers (fun _ -> - Domain.spawn (taker wfo queue items_per_taker)) - in - let pushers = - assert (num_of_elements mod num_pushers == 0); - let items_per_pusher = num_of_elements / num_pushers in - List.init num_pushers (fun _ -> - Domain.spawn (pusher wfo queue items_per_pusher)) - in - Sys.opaque_identity (List.map Domain.join (pushers @ takers)) - in - let ({ array; head; tail; _ } : 'a Relaxed_queue.t) = queue in - let head_val = Atomic.get head in - let tail_val = Atomic.get tail in - Alcotest.(check int) "hd an tl match" head_val tail_val; - Array.iter - (fun item -> - Alcotest.(check (option int)) - "ghost item in the queue!" None (Atomic.get item)) - array - -let smoke_test_spinning () = - let queue = Relaxed_queue.create ~size_exponent:2 () in - (* enqueue 4 *) - for i = 1 to 4 do - Relaxed_queue.Spin.push queue i - done; - assert (not (Relaxed_queue.Not_lockfree.push queue 0)); - let ({ tail; head; _ } : 'a Relaxed_queue.t) = queue in - assert (Atomic.get tail = 4); - assert (Atomic.get head = 0); - (* dequeue 4 *) - for i = 1 to 4 do - Alcotest.(check (option int)) - "items should come out in FIFO order" (Some i) - (Relaxed_queue.Not_lockfree.pop queue) - done; - Alcotest.(check (option int)) - "queue should be empty" None - (Relaxed_queue.Not_lockfree.pop queue) - -let two_threads_spin_test () = - let queue = Relaxed_queue.create ~size_exponent:2 () in - let num_of_elements = 1_000_000 in - (* start dequeuer *) - let dequeuer = - Domain.spawn (fun () -> - for i = 1 to num_of_elements do - assert (Relaxed_queue.Spin.pop queue == i) - done) - in - (* enqueue *) - for i = 1 to num_of_elements do - Relaxed_queue.Spin.push queue i - done; - Domain.join dequeuer |> ignore; - () - -let doms1 = if Sys.word_size >= 64 then 4 else 1 -let doms2 = if Sys.word_size >= 64 then 8 else 1 - -let () = - let open Alcotest in - run "Mpmc_queue" - (let open Relaxed_queue.Not_lockfree in - [ - ( "single-thread", - [ test_case "is it a queue" `Quick (smoke_test (push, pop)) ] ); - ( "validate items", - [ test_case "1 prod. 1 cons." `Quick (two_threads_test (push, pop)) ] - ); - ( "validate indices under load", - [ - test_case " 4 prod. 4 cons." `Slow (run_test doms1 doms1); - test_case " 8 prod. 1 cons." `Slow (run_test doms2 1); - test_case " 1 prod. 8 cons." `Slow (run_test 1 doms2); - ] ); - ] - @ - let open Relaxed_queue.Not_lockfree.CAS_interface in - [ - ( "single-thread-CAS-intf", - [ test_case "is it a queue" `Quick (smoke_test (push, pop)) ] ); - ( "validate items-CAS-intf", - [ test_case "1 prod. 1 cons." `Quick (two_threads_test (push, pop)) ] - ); - ] - @ [ - ( "single-thread-spinning", - [ test_case "is it a queue" `Quick smoke_test_spinning ] ); - ( "validate-items-spinning", - [ test_case "1 prod. 1 cons" `Quick two_threads_spin_test ] ); - ]) diff --git a/test/ws_deque/qcheck_ws_deque.ml b/test/ws_deque/qcheck_ws_deque.ml index 5255ad87..7360a972 100644 --- a/test/ws_deque/qcheck_ws_deque.ml +++ b/test/ws_deque/qcheck_ws_deque.ml @@ -1,4 +1,4 @@ -module Ws_deque = Saturn_lockfree.Work_stealing_deque.M +module Ws_deque = Saturn_lockfree.Work_stealing_deque (* Sequential building of a deque *) let deque_of_list l = diff --git a/test/ws_deque/stm_ws_deque.ml b/test/ws_deque/stm_ws_deque.ml index f12235f9..15412f5f 100644 --- a/test/ws_deque/stm_ws_deque.ml +++ b/test/ws_deque/stm_ws_deque.ml @@ -15,7 +15,7 @@ module Spec = struct | Steal -> "Steal" type state = int list - type sut = int Ws_deque.M.t + type sut = int Ws_deque.t let arb_cmd _s = let int_gen = Gen.nat in @@ -30,7 +30,7 @@ module Spec = struct let stealer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Steal) let init_state = [] - let init_sut () = Ws_deque.M.create () + let init_sut () = Ws_deque.create () let cleanup _ = () let next_state c s = @@ -46,9 +46,9 @@ module Spec = struct let run c d = match c with - | Push i -> Res (unit, Ws_deque.M.push d i) - | Pop -> Res (result int exn, protect Ws_deque.M.pop d) - | Steal -> Res (result int exn, protect Ws_deque.M.steal d) + | Push i -> Res (unit, Ws_deque.push d i) + | Pop -> Res (result int exn, protect Ws_deque.pop d) + | Steal -> Res (result int exn, protect Ws_deque.steal d) let postcond c (s : state) res = match (c, res) with diff --git a/test/ws_deque/test_ws_deque.ml b/test/ws_deque/test_ws_deque.ml index 3681c239..b4094c54 100644 --- a/test/ws_deque/test_ws_deque.ml +++ b/test/ws_deque/test_ws_deque.ml @@ -1,4 +1,4 @@ -open Saturn_lockfree.Work_stealing_deque.M +open Saturn_lockfree.Work_stealing_deque (** Tests *) let test_empty () = diff --git a/test/ws_deque/ws_deque_dscheck.ml b/test/ws_deque/ws_deque_dscheck.ml index 2c2c87dc..c8a3afce 100644 --- a/test/ws_deque/ws_deque_dscheck.ml +++ b/test/ws_deque/ws_deque_dscheck.ml @@ -2,7 +2,7 @@ let drain_remaining queue = let remaining = ref 0 in (try while true do - Ws_deque.M.pop queue |> ignore; + Ws_deque.pop queue |> ignore; remaining := !remaining + 1 done with _ -> ()); @@ -10,7 +10,7 @@ let drain_remaining queue = let owner_stealer () = Atomic.trace (fun () -> - let queue = Ws_deque.M.create () in + let queue = Ws_deque.create () in let total_items = 3 in let popped = ref 0 in @@ -18,10 +18,10 @@ let owner_stealer () = (* owner thr *) Atomic.spawn (fun () -> for _ = 1 to total_items do - Ws_deque.M.push queue 0 + Ws_deque.push queue 0 done; for _ = 1 to total_items / 2 do - match Ws_deque.M.pop queue with + match Ws_deque.pop queue with | exception _ -> () | _ -> popped := !popped + 1 done); @@ -29,7 +29,7 @@ let owner_stealer () = (* stealer *) Atomic.spawn (fun () -> for _ = 1 to total_items / 2 do - match Ws_deque.M.steal queue with + match Ws_deque.steal queue with | exception _ -> () | _ -> popped := !popped + 1 done); @@ -41,16 +41,16 @@ let owner_stealer () = let popper_stealer () = Atomic.trace (fun () -> - let queue = Ws_deque.M.create () in + let queue = Ws_deque.create () in let total_items = 3 in for _ = 1 to total_items do - Ws_deque.M.push queue 0 + Ws_deque.push queue 0 done; (* stealers *) let popped = ref 0 in let stealer () = - match Ws_deque.M.steal queue with + match Ws_deque.steal queue with | exception _ -> () | _ -> popped := !popped + 1 in