diff --git a/bench/bench_ws_deque.ml b/bench/bench_ws_deque.ml index f822ef9c..6c12e4e1 100644 --- a/bench/bench_ws_deque.ml +++ b/bench/bench_ws_deque.ml @@ -14,22 +14,22 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () = in let rec try_own own = - match Ws_deque.pop (Array.unsafe_get deques own) with + match Ws_deque.pop_exn (Array.unsafe_get deques own) with | work -> work - | exception Exit -> try_steal own (next own) + | exception Ws_deque.Empty -> try_steal own (next own) and try_steal own other = - if other = own then raise_notrace Exit + if other = own then raise_notrace Ws_deque.Empty else - match Ws_deque.steal (Array.unsafe_get deques other) with + match Ws_deque.steal_exn (Array.unsafe_get deques other) with | work -> work - | exception Exit -> try_steal own (next other) + | exception Ws_deque.Empty -> try_steal own (next other) in let rec run own = match try_own own with | work -> work own; run own - | exception Exit -> + | exception Ws_deque.Empty -> if not !exit then begin Domain.cpu_relax (); run own @@ -47,7 +47,7 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () = if x == Obj.magic exit then begin begin match try_own own with - | exception Exit -> Domain.cpu_relax () + | exception Ws_deque.Empty -> Domain.cpu_relax () | work -> work own end; await own promise @@ -90,14 +90,19 @@ let run_as_one_domain ~budgetf ?(n_msgs = 150 * Util.iter_factor) order = let op_lifo push = if push then Ws_deque.push t 101 - else match Ws_deque.pop t with _ -> () | exception Exit -> () + else + match Ws_deque.pop_exn t with _ -> () | exception Ws_deque.Empty -> () and op_fifo push = if push then Ws_deque.push t 101 - else match Ws_deque.steal t with _ -> () | exception Exit -> () + else + match Ws_deque.steal_exn t with _ -> () | exception Ws_deque.Empty -> () in let init _ = - assert (match Ws_deque.steal t with _ -> false | exception Exit -> true); + assert ( + match Ws_deque.steal_exn t with + | _ -> false + | exception Ws_deque.Empty -> true); Util.generate_push_and_pop_sequence n_msgs in let work _ bits = @@ -121,7 +126,10 @@ let run_as_spmc ~budgetf ~n_thiefs () = let n_msgs_to_steal = Atomic.make 0 |> Multicore_magic.copy_as_padded in let init _ = - assert (match Ws_deque.steal t with _ -> false | exception Exit -> true); + assert ( + match Ws_deque.steal_exn t with + | _ -> false + | exception Ws_deque.Empty -> true); Atomic.set n_msgs_to_steal n_msgs in let work i () = @@ -131,8 +139,8 @@ let run_as_spmc ~budgetf ~n_thiefs () = if 0 < n then let rec loop n = if 0 < n then - match Ws_deque.steal t with - | exception Exit -> + match Ws_deque.steal_exn t with + | exception Ws_deque.Empty -> Domain.cpu_relax (); loop n | _ -> loop (n - 1) diff --git a/src/ws_deque.ml b/src/ws_deque.ml index e09e9530..5a5b6bd7 100644 --- a/src/ws_deque.ml +++ b/src/ws_deque.ml @@ -3,6 +3,7 @@ * Copyright (c) 2015, KC Sivaramakrishnan * Copyright (c) 2017, Nicolas ASSOUAD * Copyright (c) 2021, Tom Kelly + * Copyright (c) 2024, Vesa Karvonen * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -27,150 +28,125 @@ * https://dl.acm.org/doi/abs/10.1145/2442516.2442524 *) -module CArray = struct - type 'a t = 'a array +module Atomic = Multicore_magic.Transparent_atomic - let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1) - - let create sz v = - (* [sz] must be a power of two. *) - assert (0 < sz && sz = Int.shift_left 1 (log2 sz)); - assert (Int.logand sz (sz - 1) == 0); - Array.make sz v - - let size t = Array.length t [@@inline] - let mask t = size t - 1 [@@inline] - - let index i t = - (* Because [size t] is a power of two, [i mod (size t)] is the same as - [i land (size t - 1)], that is, [i land (mask t)]. *) - Int.logand i (mask t) - [@@inline] - - let get t i = Array.unsafe_get t (index i t) [@@inline] - let put t i v = Array.unsafe_set t (index i t) v [@@inline] - - let transfer src dst top num = - ArrayExtra.blit_circularly (* source array and index: *) - src - (index top src) (* target array and index: *) - dst - (index top dst) (* number of elements: *) - num - [@@inline] - - let grow t top bottom = - let sz = size t in - assert (bottom - top = sz); - let dst = create (2 * sz) (Obj.magic ()) in - transfer t dst top sz; - dst - - let shrink t top bottom = - let sz = size t in - assert (bottom - top <= sz / 2); - let dst = create (sz / 2) (Obj.magic ()) in - transfer t dst top (bottom - top); - dst -end - -let min_size = 32 -let shrink_const = 3 +(** This must be a power of two. *) +let min_capacity = 16 type 'a t = { top : int Atomic.t; bottom : int Atomic.t; - tab : 'a ref CArray.t Atomic.t; - mutable next_shrink : int; + top_cache : int ref; + mutable tab : 'a ref array; } let create () = - { - top = Atomic.make 1; - bottom = Atomic.make 1; - tab = Atomic.make (CArray.create min_size (Obj.magic ())); - next_shrink = 0; - } - -let set_next_shrink q = - let sz = CArray.size (Atomic.get q.tab) in - if sz <= min_size then q.next_shrink <- 0 - else q.next_shrink <- sz / shrink_const - -let grow q t b = - Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b); - set_next_shrink q - -let size q = - let b = Atomic.get q.bottom in - let t = Atomic.get q.top in - b - t + let top = Atomic.make_contended 0 in + let tab = Array.make min_capacity (Obj.magic ()) in + let bottom = Atomic.make_contended 0 in + let top_cache = ref 0 |> Multicore_magic.copy_as_padded in + { top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded + +let realloc a t b sz new_sz = + let new_a = Array.make new_sz (Obj.magic ()) in + ArrayExtra.blit_circularly a + (t land (sz - 1)) + new_a + (t land (new_sz - 1)) + (b - t); + new_a let push q v = - let v' = ref v in - let b = Atomic.get q.bottom in - let t = Atomic.get q.top in - let a = Atomic.get q.tab in - let size = b - t in - let a = - if size = CArray.size a then ( - grow q t b; - Atomic.get q.tab) - else a - in - CArray.put a b v'; - Atomic.set q.bottom (b + 1) - -let release ptr = - let res = !ptr in - (* we know this ptr will never be dereferenced, but want to - break the reference to ensure that the contents of the - deque array get garbage collected *) - ptr := Obj.magic (); - res -[@@inline] - -let pop q = - if size q = 0 then raise Exit - else - let b = Atomic.get q.bottom - 1 in - Atomic.set q.bottom b; + let v = ref v in + (* Read of [bottom] by the owner simply does not require a fence as the + [bottom] is only mutated by the owner. *) + let b = Atomic.fenceless_get q.bottom in + let t_cache = !(q.top_cache) in + let a = q.tab in + let size = b - t_cache in + let capacity = Array.length a in + if + size < capacity + || let t = Atomic.get q.top in - let a = Atomic.get q.tab in - let size = b - t in - if size < 0 then ( - (* empty queue *) - Atomic.set q.bottom (b + 1); - raise Exit) - else - let out = CArray.get a b in - if b = t then - (* single last element *) - if Atomic.compare_and_set q.top t (t + 1) then ( - Atomic.set q.bottom (b + 1); - release out) - else ( - Atomic.set q.bottom (b + 1); - raise Exit) - else ( - (* non-empty queue *) - if q.next_shrink > size then ( - Atomic.set q.tab (CArray.shrink a t b); - set_next_shrink q); - release out) + q.top_cache := t; + t != t_cache + then begin + Array.unsafe_set a (b land (capacity - 1)) v; + Atomic.incr q.bottom + end + else + let a = realloc a t_cache b capacity (capacity lsl 1) in + Array.unsafe_set a (b land (Array.length a - 1)) v; + q.tab <- a; + Atomic.incr q.bottom + +type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly -let pop_opt q = try Some (pop q) with Exit -> None +exception Empty -let rec steal backoff q = - let t = Atomic.get q.top in +let pop_as : type a r. a t -> (a, r) poly -> r = + fun q poly -> + let b = Atomic.fetch_and_add q.bottom (-1) - 1 in + (* Read of [top] at this point requires no fence as we simply need to ensure + that the read happens after updating [bottom]. *) + let t = Atomic.fenceless_get q.top in + let size = b - t in + if 0 < size then begin + let a = q.tab in + let capacity = Array.length a in + let out = Array.unsafe_get a (b land (capacity - 1)) in + let res = !out in + out := Obj.magic (); + if size + size + size <= capacity - min_capacity then + q.tab <- realloc a t b capacity (capacity lsr 1); + match poly with Option -> Some res | Value -> res + end + else if b = t then begin + (* Whether or not the [compare_and_set] below succeeds, [top_cache] can be + updated, because in either case [top] has been incremented. *) + q.top_cache := t + 1; + let got = Atomic.compare_and_set q.top t (t + 1) in + (* This write of [bottom] requires no fence. The deque is empty and + remains so until the next [push]. *) + Atomic.fenceless_set q.bottom (b + 1); + if got then begin + let a = q.tab in + let out = Array.unsafe_get a (b land (Array.length a - 1)) in + let res = !out in + out := Obj.magic (); + match poly with Option -> Some res | Value -> res + end + else match poly with Option -> None | Value -> raise Empty + end + else begin + (* This write of [bottom] requires no fence. The deque is empty and + remains so until the next [push]. *) + Atomic.fenceless_set q.bottom (b + 1); + match poly with Option -> None | Value -> raise Empty + end + +let pop_exn q = pop_as q Value +let pop_opt q = pop_as q Option + +let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r = + fun q backoff poly -> + (* Read of [top] does not require a fence at this point, but the read of + [top] must happen before the read of [bottom]. The write of [top] later + has no effect in case we happened to read an old value of [top]. *) + let t = Atomic.fenceless_get q.top in let b = Atomic.get q.bottom in let size = b - t in - if size <= 0 then raise Exit - else - let a = Atomic.get q.tab in - let out = CArray.get a t in - if Atomic.compare_and_set q.top t (t + 1) then release out - else steal (Backoff.once backoff) q - -let steal q = steal Backoff.default q -let steal_opt q = try Some (steal q) with Exit -> None + if 0 < size then + let a = q.tab in + let out = Array.unsafe_get a (t land (Array.length a - 1)) in + if Atomic.compare_and_set q.top t (t + 1) then begin + let res = !out in + out := Obj.magic (); + match poly with Option -> Some res | Value -> res + end + else steal_as q (Backoff.once backoff) poly + else match poly with Option -> None | Value -> raise Empty + +let steal_exn q = steal_as q Backoff.default Value +let steal_opt q = steal_as q Backoff.default Option diff --git a/src/ws_deque.mli b/src/ws_deque.mli index 08654e09..5c111ea8 100644 --- a/src/ws_deque.mli +++ b/src/ws_deque.mli @@ -19,14 +19,16 @@ type 'a t val create : unit -> 'a t (** [create ()] returns a new empty work-stealing queue. *) +exception Empty + (** {1 Queue owner functions} *) val push : 'a t -> 'a -> unit (** [push t v] adds [v] to the front of the queue [q]. It should only be invoked by the domain which owns the queue [q]. *) -val pop : 'a t -> 'a -(** [pop q] removes and returns the first element in queue +val pop_exn : 'a t -> 'a +(** [pop_exn q] removes and returns the first element in queue [q].It should only be invoked by the domain which owns the queue [q]. @@ -39,8 +41,8 @@ val pop_opt : 'a t -> 'a option (** {1 Stealers function} *) -val steal : 'a t -> 'a -(** [steal q] removes and returns the last element from queue +val steal_exn : 'a t -> 'a +(** [steal_exn q] removes and returns the last element from queue [q]. It should only be invoked by domain which doesn't own the queue [q]. diff --git a/test/ws_deque/dune b/test/ws_deque/dune index 30f0858f..4540b230 100644 --- a/test/ws_deque/dune +++ b/test/ws_deque/dune @@ -11,10 +11,12 @@ (test (package saturn) (name ws_deque_dscheck) - (libraries atomic dscheck alcotest backoff) + (libraries atomic dscheck alcotest backoff multicore-magic-dscheck) (build_if (>= %{ocaml_version} 5)) - (modules ArrayExtra ws_deque ws_deque_dscheck)) + (modules ArrayExtra ws_deque ws_deque_dscheck) + (flags + (:standard -open Multicore_magic_dscheck))) (test (package saturn) diff --git a/test/ws_deque/qcheck_ws_deque.ml b/test/ws_deque/qcheck_ws_deque.ml index 1a2a52d5..299b2c90 100644 --- a/test/ws_deque/qcheck_ws_deque.ml +++ b/test/ws_deque/qcheck_ws_deque.ml @@ -33,13 +33,13 @@ let tests_one_producer = let deque = deque_of_list (l @ l') in let pop_list = - extract_n_of_deque deque Ws_deque.pop (List.length l') + extract_n_of_deque deque Ws_deque.pop_exn (List.length l') in pop_list = List.rev l')); (* TEST 2 - single producer no stealer : - forall q of size n, forall m > n, poping m times raises Exit (m-n) times. *) + forall q of size n, forall m > n, poping m times raises Empty (m-n) times. *) QCheck.( - Test.make ~name:"pop_on_empty_deque_raises_exit" ~count:1 + Test.make ~name:"pop_on_empty_deque_raises_empty" ~count:1 (pair (list int) small_nat) (fun (l, m) -> assume (m > 0); @@ -49,7 +49,8 @@ let tests_one_producer = let deque = deque_of_list l in for _i = 0 to m - 1 do - try ignore (Ws_deque.pop deque) with Exit -> incr count + try ignore (Ws_deque.pop_exn deque) + with Ws_deque.Empty -> incr count done; !count = m - n)); @@ -63,7 +64,7 @@ let tests_one_producer_one_stealer = This checks : - order is preserved (first push = first steal) - - Exit is raised only when the deque is empty *) + - Empty is raised only when the deque is empty *) QCheck.( Test.make ~name:"steals_are_in_order" (pair (list int) small_nat) @@ -72,14 +73,14 @@ let tests_one_producer_one_stealer = let deque = deque_of_list l in (* Then the stealer domain steals [n] times. The output list - is composed of all stolen value. If an [Exit] is raised, + is composed of all stolen value. If an [Empty] is raised, it is register as a [None] value in the returned list.*) let stealer = Domain.spawn (fun () -> let steal' deque = - match Ws_deque.steal deque with + match Ws_deque.steal_exn deque with | value -> Some value - | exception Exit -> + | exception Ws_deque.Empty -> Domain.cpu_relax (); None in @@ -99,7 +100,7 @@ let tests_one_producer_one_stealer = nfirst expected_stolen) && (* The [n - (List.length l)] last values of [steal_list] - should be [None] (i.e. the [steal] function had raised [Exit]). *) + should be [None] (i.e. the [steal] function had raised [Empty]). *) let exits = List.filteri (fun i _ -> i >= List.length l) steal_list in List.for_all (function None -> true | _ -> false) exits)); (* TEST 2 with 1 producer, 1 stealer and parallel execution. @@ -108,7 +109,7 @@ let tests_one_producer_one_stealer = This test checks : - order is preserved (first push = first steal) - - Exit is raised only when the deque is empty *) + - Empty is raised only when the deque is empty *) QCheck.( Test.make ~name:"parallel_pushes_and_steals" (pair (list small_int) (int_bound 200)) @@ -119,14 +120,14 @@ let tests_one_producer_one_stealer = (* The stealer domain steals n times. If a value [v] is stolen, it is registered as [Some v] in the returned list whereas any - [Exit] raised is registered as a [None].*) + [Empty] raised is registered as a [None].*) let stealer = Domain.spawn (fun () -> Barrier.await barrier; let steal' deque = - match Ws_deque.steal deque with + match Ws_deque.steal_exn deque with | value -> Some value - | exception Exit -> + | exception Ws_deque.Empty -> Domain.cpu_relax (); None in @@ -175,23 +176,23 @@ let tests_one_producer_one_stealer = let barrier = Barrier.create 2 in Random.self_init (); let pop' deque = - match Ws_deque.pop deque with + match Ws_deque.pop_exn deque with | value -> Some value - | exception Exit -> + | exception Ws_deque.Empty -> Domain.cpu_relax (); None in (* The stealer domain steals [nsteal] times. If a value [v] is stolen, - it is registered as [Some v] in the returned list whereas any [Exit] + it is registered as [Some v] in the returned list whereas any [Empty] raised, it is registered as a [None].*) let stealer = Domain.spawn (fun () -> Barrier.await barrier; let steal' deque = - match Ws_deque.steal deque with + match Ws_deque.steal_exn deque with | value -> Some value - | exception Exit -> + | exception Ws_deque.Empty -> Domain.cpu_relax (); None in @@ -225,7 +226,7 @@ let tests_one_producer_two_stealers = This test checks : - order is preserved (first push = first steal) - no element is stolen by both stealers - - Exit is raised only when the deque is empty *) + - Empty is raised only when the deque is empty *) QCheck.( Test.make ~name:"parallel_steals" (pair (list small_int) (pair small_nat small_nat)) @@ -242,9 +243,9 @@ let tests_one_producer_two_stealers = for i = 0 to nsteal - 1 do res.(i) <- - (match Ws_deque.steal deque with + (match Ws_deque.steal_exn deque with | value -> Some value - | exception Exit -> + | exception Ws_deque.Empty -> Domain.cpu_relax (); None) done; diff --git a/test/ws_deque/stm_ws_deque.ml b/test/ws_deque/stm_ws_deque.ml index 5db982a4..c2090ddc 100644 --- a/test/ws_deque/stm_ws_deque.ml +++ b/test/ws_deque/stm_ws_deque.ml @@ -47,14 +47,14 @@ module Spec = struct let run c d = match c with | Push i -> Res (unit, Ws_deque.push d i) - | Pop -> Res (result int exn, protect Ws_deque.pop d) - | Steal -> Res (result int exn, protect Ws_deque.steal d) + | Pop -> Res (result int exn, protect Ws_deque.pop_exn d) + | Steal -> Res (result int exn, protect Ws_deque.steal_exn d) let postcond c (s : state) res = match (c, res) with | Push _, Res ((Unit, _), _) -> true | Pop, Res ((Result (Int, Exn), _), res) -> ( - match s with [] -> res = Error Exit | j :: _ -> res = Ok j) + match s with [] -> res = Error Ws_deque.Empty | j :: _ -> res = Ok j) | Steal, Res ((Result (Int, Exn), _), res) -> ( match List.rev s with [] -> Result.is_error res | j :: _ -> res = Ok j) | _, _ -> false @@ -77,10 +77,6 @@ let () = assume (Dom.all_interleavings_ok triple); repeat rep_count Dom.agree_prop_par_asym triple) in - [ - agree_test_par_asym ~count ~name:(name ^ " parallel"); - (* Note: this can generate, e.g., pop commands/actions in different threads, thus violating the spec. *) - Dom.neg_agree_test_par ~count ~name:(name ^ " parallel, negative"); - ] + [ agree_test_par_asym ~count ~name:(name ^ " parallel") ] in Stm_run.run ~name:"Saturn.Ws_deque" ~make_domain (module Spec) |> exit diff --git a/test/ws_deque/test_ws_deque.ml b/test/ws_deque/test_ws_deque.ml index bc6a1bc5..1173a448 100644 --- a/test/ws_deque/test_ws_deque.ml +++ b/test/ws_deque/test_ws_deque.ml @@ -3,8 +3,8 @@ open Saturn.Work_stealing_deque let test_empty () = let q = create () in - match pop q with - | exception Exit -> print_string "test_exit: ok\n" + match pop_exn q with + | exception Empty -> print_string "test_exit: ok\n" | _ -> assert false let test_push_and_pop () = @@ -12,9 +12,9 @@ let test_push_and_pop () = push q 1; push q 10; push q 100; - assert (pop q = 100); - assert (pop q = 10); - assert (pop q = 1); + assert (pop_exn q = 100); + assert (pop_exn q = 10); + assert (pop_exn q = 1); print_string "test_push_and_pop: ok\n" let test_push_and_steal () = @@ -25,7 +25,7 @@ let test_push_and_steal () = let domains = Array.init 3 (fun _ -> Domain.spawn (fun _ -> - let v = steal q in + let v = steal_exn q in assert (v = 1 || v = 10 || v = 100))) in Array.iter Domain.join domains; @@ -64,8 +64,8 @@ let test_concurrent_workload () = pushed := x :: !pushed; decr n and pop () = - match pop q with - | exception Exit -> + match pop_exn q with + | exception Empty -> Domain.cpu_relax (); false | x -> @@ -89,8 +89,8 @@ let test_concurrent_workload () = Array.init thieves (fun i -> Domain.spawn (fun () -> let steal () = - match steal q with - | exception Exit -> Domain.cpu_relax () + match steal_exn q with + | exception Empty -> Domain.cpu_relax () | x -> stolen.(i) <- x :: stolen.(i) in diff --git a/test/ws_deque/ws_deque_dscheck.ml b/test/ws_deque/ws_deque_dscheck.ml index c8a3afce..1cfe9775 100644 --- a/test/ws_deque/ws_deque_dscheck.ml +++ b/test/ws_deque/ws_deque_dscheck.ml @@ -2,7 +2,7 @@ let drain_remaining queue = let remaining = ref 0 in (try while true do - Ws_deque.pop queue |> ignore; + Ws_deque.pop_exn queue |> ignore; remaining := !remaining + 1 done with _ -> ()); @@ -21,7 +21,7 @@ let owner_stealer () = Ws_deque.push queue 0 done; for _ = 1 to total_items / 2 do - match Ws_deque.pop queue with + match Ws_deque.pop_exn queue with | exception _ -> () | _ -> popped := !popped + 1 done); @@ -29,7 +29,7 @@ let owner_stealer () = (* stealer *) Atomic.spawn (fun () -> for _ = 1 to total_items / 2 do - match Ws_deque.steal queue with + match Ws_deque.steal_exn queue with | exception _ -> () | _ -> popped := !popped + 1 done); @@ -50,7 +50,7 @@ let popper_stealer () = (* stealers *) let popped = ref 0 in let stealer () = - match Ws_deque.steal queue with + match Ws_deque.steal_exn queue with | exception _ -> () | _ -> popped := !popped + 1 in