diff --git a/src_lockfree/ws_deque.ml b/src_lockfree/ws_deque.ml index f6cddabc..49cc76b4 100644 --- a/src_lockfree/ws_deque.ml +++ b/src_lockfree/ws_deque.ml @@ -3,6 +3,7 @@ * Copyright (c) 2015, KC Sivaramakrishnan * Copyright (c) 2017, Nicolas ASSOUAD * Copyright (c) 2021, Tom Kelly + * Copyright (c) 2024, Vesa Karvonen * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -27,6 +28,8 @@ * https://dl.acm.org/doi/abs/10.1145/2442516.2442524 *) +module Atomic = Transparent_atomic + module type S = sig type 'a t @@ -38,162 +41,110 @@ module type S = sig val steal_opt : 'a t -> 'a option end -module CArray = struct - type 'a t = 'a array - - let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1) - - let create sz v = - (* [sz] must be a power of two. *) - assert (0 < sz && sz = Int.shift_left 1 (log2 sz)); - assert (Int.logand sz (sz - 1) == 0); - Array.make sz v - - let[@inline] size t = Array.length t - let[@inline] mask t = size t - 1 - - let[@inline] index i t = - (* Because [size t] is a power of two, [i mod (size t)] is the same as - [i land (size t - 1)], that is, [i land (mask t)]. *) - Int.logand i (mask t) - - let[@inline] get t i = Array.unsafe_get t (index i t) - let[@inline] put t i v = Array.unsafe_set t (index i t) v - - let[@inline] transfer src dst top num = - ArrayExtra.blit_circularly (* source array and index: *) - src - (index top src) (* target array and index: *) - dst - (index top dst) (* number of elements: *) - num - - let grow t top bottom = - let sz = size t in - assert (bottom - top = sz); - let dst = create (2 * sz) (Obj.magic ()) in - transfer t dst top sz; - dst - - let shrink t top bottom = - let sz = size t in - assert (bottom - top <= sz / 2); - let dst = create (sz / 2) (Obj.magic ()) in - transfer t dst top (bottom - top); - dst -end - module M : S = struct - let min_size = 32 - let shrink_const = 3 + (** This must be a power of two. *) + let min_capacity = 16 type 'a t = { top : int Atomic.t; bottom : int Atomic.t; - tab : 'a ref CArray.t Atomic.t; - mutable next_shrink : int; + mutable tab : 'a ref array; } let create () = - let top = Atomic.make 1 |> Multicore_magic.copy_as_padded in - let bottom = Atomic.make 1 |> Multicore_magic.copy_as_padded in - let tab = - Atomic.make (CArray.create min_size (Obj.magic ())) - |> Multicore_magic.copy_as_padded - in - let next_shrink = 0 in - { top; bottom; tab; next_shrink } |> Multicore_magic.copy_as_padded - - let set_next_shrink q = - let sz = CArray.size (Atomic.get q.tab) in - if sz <= min_size then q.next_shrink <- 0 - else q.next_shrink <- sz / shrink_const - - let grow q t b = - Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b); - set_next_shrink q - - let size q = - let b = Atomic.get q.bottom in - let t = Atomic.get q.top in - b - t + let top = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let bottom = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let tab = Array.make min_capacity (Obj.magic ()) in + { top; bottom; tab } |> Multicore_magic.copy_as_padded + + let realloc a t b sz new_sz = + let new_a = Array.make new_sz (Obj.magic ()) in + ArrayExtra.blit_circularly a + (t land (sz - 1)) + new_a + (t land (new_sz - 1)) + (b - t); + new_a let push q v = - let v' = ref v in - let b = Atomic.get q.bottom in + let v = ref v in + (* Read of [bottom] by the owner simply does not require a fence as the + [bottom] is only mutated by the owner. *) + let b = Atomic.fenceless_get q.bottom in let t = Atomic.get q.top in - let a = Atomic.get q.tab in + let a = q.tab in let size = b - t in - let a = - if size = CArray.size a then begin - grow q t b; - Atomic.get q.tab - end - else a - in - CArray.put a b v'; - Atomic.set q.bottom (b + 1) + let capacity = Array.length a in + if size < capacity then begin + Array.unsafe_set a (b land (capacity - 1)) v; + Atomic.incr q.bottom + end + else + let a = realloc a t b capacity (capacity lsl 1) in + Array.unsafe_set a (b land (Array.length a - 1)) v; + q.tab <- a; + Atomic.incr q.bottom type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly - let[@inline] release : type a r. a ref -> (a, r) poly -> r = - fun ptr poly -> - let res = !ptr in - (* we know this ptr will never be dereferenced, but want to - break the reference to ensure that the contents of the - deque array get garbage collected *) - ptr := Obj.magic (); - match poly with Option -> Some res | Value -> res - let pop_as : type a r. a t -> (a, r) poly -> r = fun q poly -> - if size q = 0 then match poly with Option -> None | Value -> raise Exit + let b = Atomic.fetch_and_add q.bottom (-1) - 1 in + (* Read of [top] at this point requires no fence as we simply need to ensure + that the read happens after updating [bottom]. *) + let t = Atomic.fenceless_get q.top in + let size = b - t in + if 0 < size then begin + let a = q.tab in + let capacity = Array.length a in + let out = Array.unsafe_get a (b land (capacity - 1)) in + let res = !out in + out := Obj.magic (); + if size + size + size <= capacity - min_capacity then + q.tab <- realloc a t b capacity (capacity lsr 1); + match poly with Option -> Some res | Value -> res + end + else if size < 0 then begin + (* This write of [bottom] requires no fence. The deque is empty and + remains so until the next [push]. *) + Atomic.fenceless_set q.bottom (b + 1); + match poly with Option -> None | Value -> raise_notrace Exit + end else - let b = Atomic.get q.bottom - 1 in - Atomic.set q.bottom b; - let t = Atomic.get q.top in - let a = Atomic.get q.tab in - let size = b - t in - if size < 0 then begin - (* empty queue *) - Atomic.set q.bottom (b + 1); - match poly with Option -> None | Value -> raise Exit + let got = Atomic.compare_and_set q.top t (t + 1) in + (* This write of [bottom] requires no fence. The deque is empty and + remains so until the next [push]. *) + Atomic.fenceless_set q.bottom (b + 1); + let a = q.tab in + let out = Array.unsafe_get a (b land (Array.length a - 1)) in + if got then begin + let res = !out in + out := Obj.magic (); + match poly with Option -> Some res | Value -> res end - else - let out = CArray.get a b in - if b = t then - (* single last element *) - if Atomic.compare_and_set q.top t (t + 1) then begin - Atomic.set q.bottom (b + 1); - release out poly - end - else begin - Atomic.set q.bottom (b + 1); - match poly with Option -> None | Value -> raise Exit - end - else begin - (* non-empty queue *) - if q.next_shrink > size then begin - Atomic.set q.tab (CArray.shrink a t b); - set_next_shrink q - end; - release out poly - end + else match poly with Option -> None | Value -> raise_notrace Exit let pop q = pop_as q Value let pop_opt q = pop_as q Option let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r = fun q backoff poly -> - let t = Atomic.get q.top in + (* Read of [top] does not require a fence at this point, but the read of + [top] must happen before the read of [bottom]. The write of [top] later + has no effect in case we happened to read an old value of [top]. *) + let t = Atomic.fenceless_get q.top in let b = Atomic.get q.bottom in let size = b - t in - if size <= 0 then match poly with Option -> None | Value -> raise Exit - else - let a = Atomic.get q.tab in - let out = CArray.get a t in - if Atomic.compare_and_set q.top t (t + 1) then release out poly + if 0 < size then + let a = q.tab in + let out = Array.unsafe_get a (t land (Array.length a - 1)) in + if Atomic.compare_and_set q.top t (t + 1) then begin + let res = !out in + out := Obj.magic (); + match poly with Option -> Some res | Value -> res + end else steal_as q (Backoff.once backoff) poly + else match poly with Option -> None | Value -> raise_notrace Exit let steal q = steal_as q Backoff.default Value let steal_opt q = steal_as q Backoff.default Option diff --git a/test/ws_deque/dune b/test/ws_deque/dune index 79970ef0..72a13a5a 100644 --- a/test/ws_deque/dune +++ b/test/ws_deque/dune @@ -11,7 +11,13 @@ (test (package saturn_lockfree) (name ws_deque_dscheck) - (libraries atomic dscheck alcotest backoff multicore-magic) + (libraries + atomic + dscheck + alcotest + backoff + transparent_atomic + multicore-magic) (modules ArrayExtra ws_deque ws_deque_dscheck)) (test diff --git a/test/ws_deque/stm_ws_deque.ml b/test/ws_deque/stm_ws_deque.ml index 7ba91283..6a04b468 100644 --- a/test/ws_deque/stm_ws_deque.ml +++ b/test/ws_deque/stm_ws_deque.ml @@ -83,6 +83,7 @@ let () = agree_test_par_asym ~count ~name:"STM Saturn_lockfree.Ws_deque test parallel"; (* Note: this can generate, e.g., pop commands/actions in different threads, thus violating the spec. *) + (* WSDT_dom.neg_agree_test_par ~count - ~name:"STM Saturn_lockfree.Ws_deque test parallel, negative"; + ~name:"STM Saturn_lockfree.Ws_deque test parallel, negative"; *) ]