Skip to content

Commit

Permalink
Optimize work-stealing deque
Browse files Browse the repository at this point in the history
- Add padding to avoid false sharing
- Use a GADT to express desired result type
- Use various tweaks to improve performance
- Remove negative test that uses the WS deque in an invalid unsafe way
- Implement caching of the thief side index
  • Loading branch information
polytypic committed Aug 11, 2024
1 parent c6729d9 commit cd21775
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 142 deletions.
244 changes: 109 additions & 135 deletions src_lockfree/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
* Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
* Copyright (c) 2021, Tom Kelly <[email protected]>
* Copyright (c) 2024, Vesa Karvonen <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand All @@ -27,6 +28,8 @@
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
*)

module Atomic = Multicore_magic.Transparent_atomic

module type S = sig
type 'a t

Expand All @@ -38,152 +41,123 @@ module type S = sig
val steal_opt : 'a t -> 'a option
end

module CArray = struct
type 'a t = 'a array

let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1)

let create sz v =
(* [sz] must be a power of two. *)
assert (0 < sz && sz = Int.shift_left 1 (log2 sz));
assert (Int.logand sz (sz - 1) == 0);
Array.make sz v

let size t = Array.length t [@@inline]
let mask t = size t - 1 [@@inline]

let index i t =
(* Because [size t] is a power of two, [i mod (size t)] is the same as
[i land (size t - 1)], that is, [i land (mask t)]. *)
Int.logand i (mask t)
[@@inline]

let get t i = Array.unsafe_get t (index i t) [@@inline]
let put t i v = Array.unsafe_set t (index i t) v [@@inline]

let transfer src dst top num =
ArrayExtra.blit_circularly (* source array and index: *)
src
(index top src) (* target array and index: *)
dst
(index top dst) (* number of elements: *)
num
[@@inline]

let grow t top bottom =
let sz = size t in
assert (bottom - top = sz);
let dst = create (2 * sz) (Obj.magic ()) in
transfer t dst top sz;
dst

let shrink t top bottom =
let sz = size t in
assert (bottom - top <= sz / 2);
let dst = create (sz / 2) (Obj.magic ()) in
transfer t dst top (bottom - top);
dst
end

module M : S = struct
let min_size = 32
let shrink_const = 3
(** This must be a power of two. *)
let min_capacity = 16

type 'a t = {
top : int Atomic.t;
bottom : int Atomic.t;
tab : 'a ref CArray.t Atomic.t;
mutable next_shrink : int;
top_cache : int ref;
mutable tab : 'a ref array;
}

let create () =
{
top = Atomic.make 1;
bottom = Atomic.make 1;
tab = Atomic.make (CArray.create min_size (Obj.magic ()));
next_shrink = 0;
}

let set_next_shrink q =
let sz = CArray.size (Atomic.get q.tab) in
if sz <= min_size then q.next_shrink <- 0
else q.next_shrink <- sz / shrink_const

let grow q t b =
Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
set_next_shrink q

let size q =
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
b - t
let top = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let tab = Array.make min_capacity (Obj.magic ()) in
let bottom = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
ArrayExtra.blit_circularly a
(t land (sz - 1))
new_a
(t land (new_sz - 1))
(b - t);
new_a

let push q v =
let v' = ref v in
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
let a =
if size = CArray.size a then (
grow q t b;
Atomic.get q.tab)
else a
in
CArray.put a b v';
Atomic.set q.bottom (b + 1)

let release ptr =
let res = !ptr in
(* we know this ptr will never be dereferenced, but want to
break the reference to ensure that the contents of the
deque array get garbage collected *)
ptr := Obj.magic ();
res
[@@inline]

let pop q =
if size q = 0 then raise Exit
else
let b = Atomic.get q.bottom - 1 in
Atomic.set q.bottom b;
let v = ref v in
(* Read of [bottom] by the owner simply does not require a fence as the
[bottom] is only mutated by the owner. *)
let b = Atomic.fenceless_get q.bottom in
let t_cache = !(q.top_cache) in
let a = q.tab in
let size = b - t_cache in
let capacity = Array.length a in
if
size < capacity
||
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
if size < 0 then (
(* empty queue *)
Atomic.set q.bottom (b + 1);
raise Exit)
else
let out = CArray.get a b in
if b = t then
(* single last element *)
if Atomic.compare_and_set q.top t (t + 1) then (
Atomic.set q.bottom (b + 1);
release out)
else (
Atomic.set q.bottom (b + 1);
raise Exit)
else (
(* non-empty queue *)
if q.next_shrink > size then (
Atomic.set q.tab (CArray.shrink a t b);
set_next_shrink q);
release out)

let pop_opt q = try Some (pop q) with Exit -> None

let rec steal backoff q =
let t = Atomic.get q.top in
q.top_cache := t;
t != t_cache
then begin
Array.unsafe_set a (b land (capacity - 1)) v;
Atomic.incr q.bottom
end
else
let a = realloc a t_cache b capacity (capacity lsl 1) in
Array.unsafe_set a (b land (Array.length a - 1)) v;
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let pop_as : type a r. a t -> (a, r) poly -> r =
fun q poly ->
let b = Atomic.fetch_and_add q.bottom (-1) - 1 in
(* Read of [top] at this point requires no fence as we simply need to ensure
that the read happens after updating [bottom]. *)
let t = Atomic.fenceless_get q.top in
let size = b - t in
if 0 < size then begin
let a = q.tab in
let capacity = Array.length a in
let out = Array.unsafe_get a (b land (capacity - 1)) in
let res = !out in
out := Obj.magic ();
if size + size + size <= capacity - min_capacity then
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
end
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
updated, because in either case [top] has been incremented. *)
q.top_cache := t + 1;
let got = Atomic.compare_and_set q.top t (t + 1) in
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
if got then begin
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else match poly with Option -> None | Value -> raise_notrace Exit
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise_notrace Exit
end

let pop q = pop_as q Value
let pop_opt q = pop_as q Option

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
(* Read of [top] does not require a fence at this point, but the read of
[top] must happen before the read of [bottom]. The write of [top] later
has no effect in case we happened to read an old value of [top]. *)
let t = Atomic.fenceless_get q.top in
let b = Atomic.get q.bottom in
let size = b - t in
if size <= 0 then raise Exit
else
let a = Atomic.get q.tab in
let out = CArray.get a t in
if Atomic.compare_and_set q.top t (t + 1) then release out
else steal (Backoff.once backoff) q

let steal q = steal Backoff.default q
let steal_opt q = try Some (steal q) with Exit -> None
if 0 < size then
let a = q.tab in
let out = Array.unsafe_get a (t land (Array.length a - 1)) in
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise_notrace Exit

let steal q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
end
6 changes: 4 additions & 2 deletions test/ws_deque/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
(test
(package saturn_lockfree)
(name ws_deque_dscheck)
(libraries atomic dscheck alcotest backoff)
(libraries atomic dscheck alcotest backoff multicore-magic-dscheck)
(build_if
(>= %{ocaml_version} 5))
(modules ArrayExtra ws_deque ws_deque_dscheck))
(modules ArrayExtra ws_deque ws_deque_dscheck)
(flags
(:standard -open Multicore_magic_dscheck)))

(test
(package saturn_lockfree)
Expand Down
6 changes: 1 addition & 5 deletions test/ws_deque/stm_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ let () =
assume (Dom.all_interleavings_ok triple);
repeat rep_count Dom.agree_prop_par_asym triple)
in
[
agree_test_par_asym ~count ~name:(name ^ " parallel");
(* Note: this can generate, e.g., pop commands/actions in different threads, thus violating the spec. *)
Dom.neg_agree_test_par ~count ~name:(name ^ " parallel, negative");
]
[ agree_test_par_asym ~count ~name:(name ^ " parallel") ]
in
Stm_run.run ~count:1000 ~name:"Saturn_lockfree.Ws_deque" ~verbose:true
~make_domain
Expand Down

0 comments on commit cd21775

Please sign in to comment.