Skip to content

Commit

Permalink
Optimize work-stealing deque (#124)
Browse files Browse the repository at this point in the history
* Optimize work-stealing deque

- Add padding to avoid false sharing
- Use a GADT to express desired result type
- Use various tweaks to improve performance
- Remove negative test that uses the WS deque in an invalid unsafe way
- Implement caching of the thief side index

* Remove M module and rename pop to pop_exn,  steal to steal_exn and exception Exit to Empty.
---------

Co-authored-by: Carine Morel <[email protected]>
  • Loading branch information
polytypic and lyrm authored Nov 23, 2024
1 parent 471912c commit 2035010
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 194 deletions.
34 changes: 21 additions & 13 deletions bench/bench_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () =
in

let rec try_own own =
match Ws_deque.pop (Array.unsafe_get deques own) with
match Ws_deque.pop_exn (Array.unsafe_get deques own) with
| work -> work
| exception Exit -> try_steal own (next own)
| exception Ws_deque.Empty -> try_steal own (next own)
and try_steal own other =
if other = own then raise_notrace Exit
if other = own then raise_notrace Ws_deque.Empty
else
match Ws_deque.steal (Array.unsafe_get deques other) with
match Ws_deque.steal_exn (Array.unsafe_get deques other) with
| work -> work
| exception Exit -> try_steal own (next other)
| exception Ws_deque.Empty -> try_steal own (next other)
in
let rec run own =
match try_own own with
| work ->
work own;
run own
| exception Exit ->
| exception Ws_deque.Empty ->
if not !exit then begin
Domain.cpu_relax ();
run own
Expand All @@ -47,7 +47,7 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () =
if x == Obj.magic exit then begin
begin
match try_own own with
| exception Exit -> Domain.cpu_relax ()
| exception Ws_deque.Empty -> Domain.cpu_relax ()
| work -> work own
end;
await own promise
Expand Down Expand Up @@ -90,14 +90,19 @@ let run_as_one_domain ~budgetf ?(n_msgs = 150 * Util.iter_factor) order =

let op_lifo push =
if push then Ws_deque.push t 101
else match Ws_deque.pop t with _ -> () | exception Exit -> ()
else
match Ws_deque.pop_exn t with _ -> () | exception Ws_deque.Empty -> ()
and op_fifo push =
if push then Ws_deque.push t 101
else match Ws_deque.steal t with _ -> () | exception Exit -> ()
else
match Ws_deque.steal_exn t with _ -> () | exception Ws_deque.Empty -> ()
in

let init _ =
assert (match Ws_deque.steal t with _ -> false | exception Exit -> true);
assert (
match Ws_deque.steal_exn t with
| _ -> false
| exception Ws_deque.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits =
Expand All @@ -121,7 +126,10 @@ let run_as_spmc ~budgetf ~n_thiefs () =
let n_msgs_to_steal = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (match Ws_deque.steal t with _ -> false | exception Exit -> true);
assert (
match Ws_deque.steal_exn t with
| _ -> false
| exception Ws_deque.Empty -> true);
Atomic.set n_msgs_to_steal n_msgs
in
let work i () =
Expand All @@ -131,8 +139,8 @@ let run_as_spmc ~budgetf ~n_thiefs () =
if 0 < n then
let rec loop n =
if 0 < n then
match Ws_deque.steal t with
| exception Exit ->
match Ws_deque.steal_exn t with
| exception Ws_deque.Empty ->
Domain.cpu_relax ();
loop n
| _ -> loop (n - 1)
Expand Down
240 changes: 108 additions & 132 deletions src/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
* Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
* Copyright (c) 2021, Tom Kelly <[email protected]>
* Copyright (c) 2024, Vesa Karvonen <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand All @@ -27,150 +28,125 @@
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
*)

module CArray = struct
type 'a t = 'a array
module Atomic = Multicore_magic.Transparent_atomic

let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1)

let create sz v =
(* [sz] must be a power of two. *)
assert (0 < sz && sz = Int.shift_left 1 (log2 sz));
assert (Int.logand sz (sz - 1) == 0);
Array.make sz v

let size t = Array.length t [@@inline]
let mask t = size t - 1 [@@inline]

let index i t =
(* Because [size t] is a power of two, [i mod (size t)] is the same as
[i land (size t - 1)], that is, [i land (mask t)]. *)
Int.logand i (mask t)
[@@inline]

let get t i = Array.unsafe_get t (index i t) [@@inline]
let put t i v = Array.unsafe_set t (index i t) v [@@inline]

let transfer src dst top num =
ArrayExtra.blit_circularly (* source array and index: *)
src
(index top src) (* target array and index: *)
dst
(index top dst) (* number of elements: *)
num
[@@inline]

let grow t top bottom =
let sz = size t in
assert (bottom - top = sz);
let dst = create (2 * sz) (Obj.magic ()) in
transfer t dst top sz;
dst

let shrink t top bottom =
let sz = size t in
assert (bottom - top <= sz / 2);
let dst = create (sz / 2) (Obj.magic ()) in
transfer t dst top (bottom - top);
dst
end

let min_size = 32
let shrink_const = 3
(** This must be a power of two. *)
let min_capacity = 16

type 'a t = {
top : int Atomic.t;
bottom : int Atomic.t;
tab : 'a ref CArray.t Atomic.t;
mutable next_shrink : int;
top_cache : int ref;
mutable tab : 'a ref array;
}

let create () =
{
top = Atomic.make 1;
bottom = Atomic.make 1;
tab = Atomic.make (CArray.create min_size (Obj.magic ()));
next_shrink = 0;
}

let set_next_shrink q =
let sz = CArray.size (Atomic.get q.tab) in
if sz <= min_size then q.next_shrink <- 0
else q.next_shrink <- sz / shrink_const

let grow q t b =
Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
set_next_shrink q

let size q =
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
b - t
let top = Atomic.make_contended 0 in
let tab = Array.make min_capacity (Obj.magic ()) in
let bottom = Atomic.make_contended 0 in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
ArrayExtra.blit_circularly a
(t land (sz - 1))
new_a
(t land (new_sz - 1))
(b - t);
new_a

let push q v =
let v' = ref v in
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
let a =
if size = CArray.size a then (
grow q t b;
Atomic.get q.tab)
else a
in
CArray.put a b v';
Atomic.set q.bottom (b + 1)

let release ptr =
let res = !ptr in
(* we know this ptr will never be dereferenced, but want to
break the reference to ensure that the contents of the
deque array get garbage collected *)
ptr := Obj.magic ();
res
[@@inline]

let pop q =
if size q = 0 then raise Exit
else
let b = Atomic.get q.bottom - 1 in
Atomic.set q.bottom b;
let v = ref v in
(* Read of [bottom] by the owner simply does not require a fence as the
[bottom] is only mutated by the owner. *)
let b = Atomic.fenceless_get q.bottom in
let t_cache = !(q.top_cache) in
let a = q.tab in
let size = b - t_cache in
let capacity = Array.length a in
if
size < capacity
||
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
if size < 0 then (
(* empty queue *)
Atomic.set q.bottom (b + 1);
raise Exit)
else
let out = CArray.get a b in
if b = t then
(* single last element *)
if Atomic.compare_and_set q.top t (t + 1) then (
Atomic.set q.bottom (b + 1);
release out)
else (
Atomic.set q.bottom (b + 1);
raise Exit)
else (
(* non-empty queue *)
if q.next_shrink > size then (
Atomic.set q.tab (CArray.shrink a t b);
set_next_shrink q);
release out)
q.top_cache := t;
t != t_cache
then begin
Array.unsafe_set a (b land (capacity - 1)) v;
Atomic.incr q.bottom
end
else
let a = realloc a t_cache b capacity (capacity lsl 1) in
Array.unsafe_set a (b land (Array.length a - 1)) v;
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let pop_opt q = try Some (pop q) with Exit -> None
exception Empty

let rec steal backoff q =
let t = Atomic.get q.top in
let pop_as : type a r. a t -> (a, r) poly -> r =
fun q poly ->
let b = Atomic.fetch_and_add q.bottom (-1) - 1 in
(* Read of [top] at this point requires no fence as we simply need to ensure
that the read happens after updating [bottom]. *)
let t = Atomic.fenceless_get q.top in
let size = b - t in
if 0 < size then begin
let a = q.tab in
let capacity = Array.length a in
let out = Array.unsafe_get a (b land (capacity - 1)) in
let res = !out in
out := Obj.magic ();
if size + size + size <= capacity - min_capacity then
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
end
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
updated, because in either case [top] has been incremented. *)
q.top_cache := t + 1;
let got = Atomic.compare_and_set q.top t (t + 1) in
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
if got then begin
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else match poly with Option -> None | Value -> raise Empty
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Empty
end

let pop_exn q = pop_as q Value
let pop_opt q = pop_as q Option

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
(* Read of [top] does not require a fence at this point, but the read of
[top] must happen before the read of [bottom]. The write of [top] later
has no effect in case we happened to read an old value of [top]. *)
let t = Atomic.fenceless_get q.top in
let b = Atomic.get q.bottom in
let size = b - t in
if size <= 0 then raise Exit
else
let a = Atomic.get q.tab in
let out = CArray.get a t in
if Atomic.compare_and_set q.top t (t + 1) then release out
else steal (Backoff.once backoff) q

let steal q = steal Backoff.default q
let steal_opt q = try Some (steal q) with Exit -> None
if 0 < size then
let a = q.tab in
let out = Array.unsafe_get a (t land (Array.length a - 1)) in
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise Empty

let steal_exn q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
10 changes: 6 additions & 4 deletions src/ws_deque.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ type 'a t
val create : unit -> 'a t
(** [create ()] returns a new empty work-stealing queue. *)

exception Empty

(** {1 Queue owner functions} *)

val push : 'a t -> 'a -> unit
(** [push t v] adds [v] to the front of the queue [q].
It should only be invoked by the domain which owns the queue [q]. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue
val pop_exn : 'a t -> 'a
(** [pop_exn q] removes and returns the first element in queue
[q].It should only be invoked by the domain which owns the queue
[q].
Expand All @@ -39,8 +41,8 @@ val pop_opt : 'a t -> 'a option

(** {1 Stealers function} *)

val steal : 'a t -> 'a
(** [steal q] removes and returns the last element from queue
val steal_exn : 'a t -> 'a
(** [steal_exn q] removes and returns the last element from queue
[q]. It should only be invoked by domain which doesn't own the
queue [q].
Expand Down
6 changes: 4 additions & 2 deletions test/ws_deque/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
(test
(package saturn)
(name ws_deque_dscheck)
(libraries atomic dscheck alcotest backoff)
(libraries atomic dscheck alcotest backoff multicore-magic-dscheck)
(build_if
(>= %{ocaml_version} 5))
(modules ArrayExtra ws_deque ws_deque_dscheck))
(modules ArrayExtra ws_deque ws_deque_dscheck)
(flags
(:standard -open Multicore_magic_dscheck)))

(test
(package saturn)
Expand Down
Loading

0 comments on commit 2035010

Please sign in to comment.