Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mpsc update #162

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions bench/bench_mpsc.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
open Multicore_bench
module Queue = Saturn.Single_consumer_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Queue.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then begin
match Queue.pop_opt t with
| None ->
Domain.cpu_relax ();
loop n
| Some _ -> loop (n - 1)
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 3; 4 ] [ 1 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ let benchmarks =
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.Safe.run_suite);
( "Saturn Single_prod_single_cons_queue_unsafe",
Bench_spsc_queue.Unsafe.run_suite );
("Saturn Single_consumer_queue", Bench_mpsc.run_suite);
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
("Saturn Htbl", Bench_htbl.Safe.run_suite);
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ let () =
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files treiber_stack.mli bounded_stack.mli ws_deque.mli))
(files treiber_stack.mli bounded_stack.mli ws_deque.mli mpsc_queue.mli))
|}
261 changes: 119 additions & 142 deletions src/mpsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,152 +1,129 @@
(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.

See: "Implementing lock-free queues"
https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf

It is simplified slightly because we don't need multiple consumers.
Therefore [head] is not atomic. *)
Based on Vesa Karvonen's example at:
https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml
*)

exception Closed
exception Empty

module Node : sig
type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
and +'a opt

val make : next:'a opt -> 'a -> 'a t

val none : 'a opt
(** [t.next = none] means that [t] is currently the last node. *)

val closed : 'a opt
(** [t.next = closed] means that [t] will always be the last node. *)

val some : 'a t -> 'a opt
val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b
end = struct
(* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)

type +'a opt (* special | 'a t *)
type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
type special = Nothing | Closed

let none : 'a. 'a opt = Obj.magic Nothing
let closed : 'a. 'a opt = Obj.magic Closed
let some (t : 'a t) : 'a opt = Obj.magic t

let fold (opt : 'a opt) ~none:n ~some =
if opt == none then n ()
else if opt == closed then raise Closed
else some (Obj.magic opt : 'a t)

let make ~next value = { value; next = Atomic.make next }
end

type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t }
(* [head] is the last node dequeued (or a dummy node, initially).
[head.next] gives the real first node, if not [Node.none].
If [tail.next] is [none] then it is the last node in the queue.
Otherwise, [tail.next] is a node that is closer to the tail. *)

let push t x =
let node = Node.(make ~next:none) x in
let rec aux () =
let p = Atomic.get t.tail in
(* While [p.next == none], [p] is the last node in the queue. *)
if Atomic.compare_and_set p.next Node.none (Node.some node) then
(* [node] has now been added to the queue (and possibly even consumed).
Update [tail], unless someone else already did it for us. *)
ignore (Atomic.compare_and_set t.tail p node : bool)
else
(* Someone else added a different node first ([p.next] is not [none]).
Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false)
~some:(fun p_next ->
ignore (Atomic.compare_and_set t.tail p p_next : bool);
aux ())
in
aux ()

let rec push_head t x =
let p = t.head in
let next = Atomic.get p.next in
if next == Node.closed then raise Closed;
let node = Node.make ~next x in
if Atomic.compare_and_set p.next next (Node.some node) then
if
(* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)
next == Node.none
then ignore (Atomic.compare_and_set t.tail p node : bool)
else
( (* If the queue wasn't empty, there's nothing to do.
Either tail isn't at head or there is some [push] thread working to update it.
Either [push] will update it directly to the new tail, or will update it to [node]
and then retry. Either way, it ends up at the real tail. *) )
else (
(* Someone else changed it first. This can only happen if the queue was empty. *)
assert (next == Node.none);
push_head t x)

let rec close (t : 'a t) =
(* Mark the tail node as final. *)
let p = Atomic.get t.tail in
if not (Atomic.compare_and_set p.next Node.none Node.closed) then
(* CAS failed because [p] is no longer the tail (or is already closed). *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false)
(* Can't switch from another state to [none] *)
~some:(fun p_next ->
(* Make [tail] more up-to-date if it hasn't changed already *)
ignore (Atomic.compare_and_set t.tail p p_next : bool);
(* Retry *)
close t)

let pop_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> None)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
Some v)
(* A list where the end indicates whether the queue is closed. *)
type 'a clist = ( :: ) of 'a * 'a clist | Open | Closed

exception Empty
(* [rev_append l1 l2] is like [rev l1 @ l2] *)
let rec rev_append l1 l2 =
match l1 with
| a :: l -> rev_append l (a :: l2)
| Open -> l2
| Closed -> assert false

let pop t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> raise Empty)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
v)

let peek_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value)

let peek t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value)
let[@tail_mod_cons] rec ( @ ) l1 l2 =
match l1 with
| h1 :: tl -> h1 :: (tl @ l2)
| Open -> l2
| Closed -> assert false

let is_empty t =
Node.fold (Atomic.get t.head.next)
~none:(fun () -> true)
~some:(fun _ -> false)
(* *)

(* The queue contains [head @ rev tail].
If [tail] is non-empty then it ends in [Open]. *)
type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t }

let create () =
let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in
{ tail = Atomic.make dummy; head = dummy }
let create () = { head = Open; tail = Atomic.make_contended Open }

let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let of_list l =
{ head = append_list_to_clist l Open; tail = Atomic.make_contended Open }

(* *)

let is_empty t =
match t.head with
| _ :: _ -> false
| Closed -> raise Closed
| Open -> ( match Atomic.get t.tail with _ :: _ -> false | _ -> true)

let close t =
match Atomic.exchange t.tail Closed with
| Closed -> raise Closed
| xs -> t.head <- t.head @ rev_append xs Closed

(* *)

let rec push t x =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = x :: before in
if not (Atomic.compare_and_set t.tail before after) then push t x

let push_head t x =
match t.head with Closed -> raise Closed | before -> t.head <- x :: before

let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let rec push_all t values =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = append_list_to_clist (List.rev values) before in
if not (Atomic.compare_and_set t.tail before after) then push_all t values

(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

let rec pop_as : type a r. a t -> (a, r) poly -> r =
fun t poly ->
match t.head with
| x :: xs -> begin
t.head <- xs;
match poly with Option -> Some x | Value -> x | Unit -> ()
end
| Closed -> raise Closed
| Open -> (
(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)
match Atomic.exchange t.tail Open with
| Closed -> assert false
| Open -> (
match poly with
| Option -> None
| Value | Unit -> raise Empty (* Optimise the common case *))
| tail ->
t.head <- rev_append tail Open;
pop_as t poly)

(* *)

type ('a, _) poly2 = Option : ('a, 'a option) poly2 | Value : ('a, 'a) poly2

let rec peek_as : type a r. a t -> (a, r) poly2 -> r =
fun t poly ->
match t.head with
| x :: _ -> ( match poly with Option -> Some x | Value -> x)
| Closed -> raise Closed
| Open -> (
(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)
match Atomic.exchange t.tail Open with
| Closed -> assert false
| Open -> ( match poly with Option -> None | Value -> raise Empty)
| tail ->
t.head <- rev_append tail Open;
peek_as t poly)

(* *)

let pop_opt t = pop_as t Option
let pop_exn t = pop_as t Value
let drop_exn t = pop_as t Unit
let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option
Loading