Skip to content

Commit

Permalink
* Update mpsc queue from Eio: simplify the implementation.
Browse files Browse the repository at this point in the history
* Add bench

* Improve documentation

* Add of_list and push_all functions

* Add examples in documentation.

* More dscheck tests
  • Loading branch information
lyrm committed Nov 26, 2024
1 parent fc34768 commit 1a7b012
Show file tree
Hide file tree
Showing 8 changed files with 530 additions and 465 deletions.
79 changes: 79 additions & 0 deletions bench/bench_mpsc.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
open Multicore_bench
module Queue = Saturn.Single_consumer_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Queue.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then begin
match Queue.pop_opt t with
| None ->
Domain.cpu_relax ();
loop n
| Some _ -> loop (n - 1)
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 3; 4 ] [ 1 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ let benchmarks =
("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite);
("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn Single_consumer_queue", Bench_mpsc.run_suite);
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
("Saturn Htbl", Bench_htbl.Safe.run_suite);
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ let () =
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files treiber_stack.mli bounded_stack.mli))
(files treiber_stack.mli bounded_stack.mli mpsc_queue.mli))
|}
261 changes: 119 additions & 142 deletions src/mpsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,152 +1,129 @@
(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.
See: "Implementing lock-free queues"
https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf
It is simplified slightly because we don't need multiple consumers.
Therefore [head] is not atomic. *)
Based on Vesa Karvonen's example at:
https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml
*)

exception Closed
exception Empty

module Node : sig
type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
and +'a opt

val make : next:'a opt -> 'a -> 'a t

val none : 'a opt
(** [t.next = none] means that [t] is currently the last node. *)

val closed : 'a opt
(** [t.next = closed] means that [t] will always be the last node. *)

val some : 'a t -> 'a opt
val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b
end = struct
(* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)

type +'a opt (* special | 'a t *)
type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
type special = Nothing | Closed

let none : 'a. 'a opt = Obj.magic Nothing
let closed : 'a. 'a opt = Obj.magic Closed
let some (t : 'a t) : 'a opt = Obj.magic t

let fold (opt : 'a opt) ~none:n ~some =
if opt == none then n ()
else if opt == closed then raise Closed
else some (Obj.magic opt : 'a t)

let make ~next value = { value; next = Atomic.make next }
end

type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t }
(* [head] is the last node dequeued (or a dummy node, initially).
[head.next] gives the real first node, if not [Node.none].
If [tail.next] is [none] then it is the last node in the queue.
Otherwise, [tail.next] is a node that is closer to the tail. *)

let push t x =
let node = Node.(make ~next:none) x in
let rec aux () =
let p = Atomic.get t.tail in
(* While [p.next == none], [p] is the last node in the queue. *)
if Atomic.compare_and_set p.next Node.none (Node.some node) then
(* [node] has now been added to the queue (and possibly even consumed).
Update [tail], unless someone else already did it for us. *)
ignore (Atomic.compare_and_set t.tail p node : bool)
else
(* Someone else added a different node first ([p.next] is not [none]).
Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false)
~some:(fun p_next ->
ignore (Atomic.compare_and_set t.tail p p_next : bool);
aux ())
in
aux ()

let rec push_head t x =
let p = t.head in
let next = Atomic.get p.next in
if next == Node.closed then raise Closed;
let node = Node.make ~next x in
if Atomic.compare_and_set p.next next (Node.some node) then
if
(* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)
next == Node.none
then ignore (Atomic.compare_and_set t.tail p node : bool)
else
( (* If the queue wasn't empty, there's nothing to do.
Either tail isn't at head or there is some [push] thread working to update it.
Either [push] will update it directly to the new tail, or will update it to [node]
and then retry. Either way, it ends up at the real tail. *) )
else (
(* Someone else changed it first. This can only happen if the queue was empty. *)
assert (next == Node.none);
push_head t x)

let rec close (t : 'a t) =
(* Mark the tail node as final. *)
let p = Atomic.get t.tail in
if not (Atomic.compare_and_set p.next Node.none Node.closed) then
(* CAS failed because [p] is no longer the tail (or is already closed). *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false)
(* Can't switch from another state to [none] *)
~some:(fun p_next ->
(* Make [tail] more up-to-date if it hasn't changed already *)
ignore (Atomic.compare_and_set t.tail p p_next : bool);
(* Retry *)
close t)

let pop_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> None)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
Some v)
(* A list where the end indicates whether the queue is closed. *)
type 'a clist = ( :: ) of 'a * 'a clist | Open | Closed

exception Empty
(* [rev_append l1 l2] is like [rev l1 @ l2] *)
let rec rev_append l1 l2 =
match l1 with
| a :: l -> rev_append l (a :: l2)
| Open -> l2
| Closed -> assert false

let pop t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> raise Empty)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
v)

let peek_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value)

let peek t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value)
let[@tail_mod_cons] rec ( @ ) l1 l2 =
match l1 with
| h1 :: tl -> h1 :: (tl @ l2)
| Open -> l2
| Closed -> assert false

let is_empty t =
Node.fold (Atomic.get t.head.next)
~none:(fun () -> true)
~some:(fun _ -> false)
(* *)

(* The queue contains [head @ rev tail].
If [tail] is non-empty then it ends in [Open]. *)
type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t }

let create () =
let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in
{ tail = Atomic.make dummy; head = dummy }
let create () = { head = Open; tail = Atomic.make_contended Open }

let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let of_list l =
{ head = append_list_to_clist l Open; tail = Atomic.make_contended Open }

(* *)

let is_empty t =
match t.head with
| _ :: _ -> false
| Closed -> raise Closed
| Open -> ( match Atomic.get t.tail with _ :: _ -> false | _ -> true)

let close t =
match Atomic.exchange t.tail Closed with
| Closed -> raise Closed
| xs -> t.head <- t.head @ rev_append xs Closed

(* *)

let rec push t x =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = x :: before in
if not (Atomic.compare_and_set t.tail before after) then push t x

let push_head t x =
match t.head with Closed -> raise Closed | before -> t.head <- x :: before

let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let rec push_all t values =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = append_list_to_clist (List.rev values) before in
if not (Atomic.compare_and_set t.tail before after) then push_all t values

(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

let rec pop_as : type a r. a t -> (a, r) poly -> r =
fun t poly ->
match t.head with
| x :: xs -> begin
t.head <- xs;
match poly with Option -> Some x | Value -> x | Unit -> ()
end
| Closed -> raise Closed
| Open -> (
(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)
match Atomic.exchange t.tail Open with
| Closed -> assert false
| Open -> (
match poly with
| Option -> None
| Value | Unit -> raise Empty (* Optimise the common case *))
| tail ->
t.head <- rev_append tail Open;
pop_as t poly)

(* *)

type ('a, _) poly2 = Option : ('a, 'a option) poly2 | Value : ('a, 'a) poly2

let rec peek_as : type a r. a t -> (a, r) poly2 -> r =
fun t poly ->
match t.head with
| x :: _ -> ( match poly with Option -> Some x | Value -> x)
| Closed -> raise Closed
| Open -> (
(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)
match Atomic.exchange t.tail Open with
| Closed -> assert false
| Open -> ( match poly with Option -> None | Value -> raise Empty)
| tail ->
t.head <- rev_append tail Open;
peek_as t poly)

(* *)

let pop_opt t = pop_as t Option
let pop_exn t = pop_as t Value
let drop_exn t = pop_as t Unit
let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option
Loading

0 comments on commit 1a7b012

Please sign in to comment.