Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ws deque update : documentation improvement and drop functions #168

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ let () =
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files treiber_stack.mli bounded_stack.mli))
(files treiber_stack.mli bounded_stack.mli ws_deque.mli))
|}
39 changes: 32 additions & 7 deletions src/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ let create () =
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let next_pow2 n =
let rec loop acc = if acc >= n then acc else loop (acc lsl 1) in
loop 1

let of_list l =
let len = List.length l in
let capacity = min min_capacity (next_pow2 len) in
lyrm marked this conversation as resolved.
Show resolved Hide resolved
let top = Atomic.make_contended 0 in
let tab = Array.make capacity (Obj.magic ()) in
List.iteri (fun i x -> Array.unsafe_set tab i (ref x)) l;
let bottom = Atomic.make_contended len in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

(* *)

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
ArrayExtra.blit_circularly a
Expand Down Expand Up @@ -81,7 +97,12 @@ let push q v =
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

exception Empty

Expand All @@ -100,7 +121,7 @@ let pop_as : type a r. a t -> (a, r) poly -> r =
out := Obj.magic ();
if size + size + size <= capacity - min_capacity then
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
Expand All @@ -115,19 +136,22 @@ let pop_as : type a r. a t -> (a, r) poly -> r =
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else match poly with Option -> None | Value -> raise Empty
else match poly with Option -> None | Value | Unit -> raise Empty
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Empty
match poly with Option -> None | Value | Unit -> raise Empty
end

let pop_exn q = pop_as q Value
let pop_opt q = pop_as q Option
let drop_exn q = pop_as q Unit

(* *)

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
Expand All @@ -143,10 +167,11 @@ let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise Empty
else match poly with Option -> None | Value | Unit -> raise Empty

let steal_exn q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
let steal_drop_exn q = steal_as q Backoff.default Unit
157 changes: 129 additions & 28 deletions src/ws_deque.mli
Original file line number Diff line number Diff line change
@@ -1,55 +1,156 @@
(** Lock-free single-producer, multi-consumer dynamic-size double-ended queue (deque).

The main strength of deque in a typical work-stealing setup with per-core structure
is efficient work distribution. Owner uses [push] and [pop] method to operate at
one end of the deque, while other (free) cores can efficiently steal work on the
other side.
The main strength of a deque in a typical work-stealing setup with a
per-core structure, is efficient work distribution. The owner uses [push]
and [pop] methods to operate at one end of the deque, while other (free)
cores can efficiently steal work from the other side.

This approach is great for throughput. Stealers and owner working on different sides
reduces contention in work distribution. Further, local LIFO order runs related tasks
one after one improves locality.
This approach is great for throughput. Stealers and the owner working on
different sides, reduce contention in work distribution. Further, the
local LIFO order, running related tasks one after another, improves locality.

On the other hand, the local LIFO order does not offer any fairness guarantees.
Thus, it is not the best choice when tail latency matters.
On the other hand, the local LIFO order does not offer any fairness
guarantees. Thus, it is not the best choice when tail latency matters.
*)

(** {1 API} *)

type 'a t
(** Type of work-stealing queue *)

val create : unit -> 'a t
(** [create ()] returns a new empty work-stealing queue. *)

val of_list : 'a list -> 'a t
(** [of_list list] creates a new work-stealing queue from [list].

🐌 This is a linear-time operation.

{[
# open Saturn.Work_stealing_deque
# let t : int t = of_list [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 4
# pop_opt t
- : int option = Some 3
]}
*)

exception Empty

(** {1 Queue owner functions} *)
(** {2 Queue owner functions} *)

val push : 'a t -> 'a -> unit
(** [push t v] adds [v] to the front of the queue [q].
It should only be invoked by the domain which owns the queue [q]. *)
(** [push queue element] adds [element] at the end of the [queue].
It should only be invoked by the domain that owns the [queue]. *)

val pop_exn : 'a t -> 'a
(** [pop_exn q] removes and returns the first element in queue
[q].It should only be invoked by the domain which owns the queue
[q].
(** [pop_exn queue] removes and returns the last element of the [queue]. It
should only be invoked by the domain that owns the [queue].

@raise Exit if the queue is empty.
*)
@raises Empty if the [queue] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)
(** [pop_opt queue] removes and returns [Some] of the last element of the
[queue], or returns [None] if the [queue] is empty. *)
lyrm marked this conversation as resolved.
Show resolved Hide resolved

(** {1 Stealers function} *)
val drop_exn : 'a t -> unit
(** [drop_exn queue] removes the last element of the [queue].

@raises Empty if the [queue] is empty. *)

(** {2 Stealer functions} *)

val steal_exn : 'a t -> 'a
(** [steal_exn q] removes and returns the last element from queue
[q]. It should only be invoked by domain which doesn't own the
queue [q].
(** [steal_exn queue] removes and returns the first element of the [queue].
It should only be invoked by a domain that doesn't own the [queue].
lyrm marked this conversation as resolved.
Show resolved Hide resolved

@raise Exit if the queue is empty.
*)
@raises Empty if the [queue] is empty. *)

val steal_opt : 'a t -> 'a option
(** [steal_opt q] removes and returns the last element from queue
[q], or returns [None] if the queue is empty. It should only be
invoked by domain which doesn't own the queue [q]. *)
(** [steal_opt queue] removes and returns [Some] of the first element of the
[queue], or returns [None] if the [queue] is empty. It should only be
invoked by a domain that doesn't own the [queue]. *)

val steal_drop_exn : 'a t -> unit
(** [steal_drop_exn queue] removes the first element of the [queue].

@raises Empty if the [queue] is empty. *)

(** {1 Examples} *)

(** {2 Sequential example}
An example top-level session:
{[
# open Saturn.Work_stealing_deque
# let t : int t = of_list [1;2;3;4;5;6]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 6
# steal_opt t
- : int option = Some 1
# drop_exn t
- : unit = ()
# pop_opt t
- : int option = Some 4
# steal_drop_exn t
- : unit = ()
# steal_exn t
- : int = 3
# steal_exn t
Exception: Saturn__Ws_deque.Empty.
]}
*)

(** {2 Multicore example}
Note: The barrier is used in this example solely to make the results more
interesting by increasing the likelihood of parallelism. Spawning a domain is
a costly operation, especially compared to the relatively small amount of work
being performed here. In practice, using a barrier in this manner is unnecessary.


{@ocaml non-deterministic=command[
# open Saturn.Work_stealing_deque
# let t : int t = create ()
val t : int t = <abstr>
# let barrier = Atomic.make 3
val barrier : int Atomic.t = <abstr>

# let owner () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;
for i = 1 to 10 do push t i; Domain.cpu_relax () done
val owner : unit -> unit = <fun>

# let stealer id () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;

for _ = 1 to 5 do
match steal_opt t with
| None -> ()
| Some v -> Format.printf "Stealer %s stole %d@." id v
done
val stealer : string -> unit -> unit = <fun>

# let stealerA = Domain.spawn (stealer "A")
val stealerA : unit Domain.t = <abstr>
# let stealerB = Domain.spawn (stealer "B")
val stealerB : unit Domain.t = <abstr>
# owner ()
Stealer A stole 1
Stealer B stole 2
Stealer A stole 3
Stealer B stole 4
Stealer A stole 5
Stealer A stole 7
Stealer B stole 6
Stealer A stole 8
Stealer B stole 9
Stealer B stole 10
- : unit = ()
# Domain.join stealerA; Domain.join stealerB
- : unit = ()
]}
*)
18 changes: 16 additions & 2 deletions test/ws_deque/stm_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ open Util
module Ws_deque = Saturn.Work_stealing_deque

module Spec = struct
type cmd = Push of int | Pop | Steal
type cmd = Push of int | Pop | Drop | Steal | Steal_drop

let show_cmd c =
match c with
| Push i -> "Push " ^ string_of_int i
| Pop -> "Pop"
| Drop -> "Drop"
| Steal -> "Steal"
| Steal_drop -> "Steal_drop"

type state = int list
type sut = int Ws_deque.t
Expand All @@ -24,11 +26,15 @@ module Spec = struct
[
Gen.map (fun i -> Push i) int_gen;
Gen.return Pop;
Gen.return Drop;
(*Gen.return Steal;*)
(* No point in stealing from yourself :-D *)
])

let stealer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Steal)
let stealer_cmd _s =
QCheck.make ~print:show_cmd
(Gen.oneof [ Gen.return Steal; Gen.return Steal_drop ])

let init_state = []
let init_sut () = Ws_deque.create ()
let cleanup _ = ()
Expand All @@ -40,23 +46,31 @@ module Spec = struct
(*if i<>1213 then i::s else s*)
(* an artificial fault *)
| Pop -> ( match s with [] -> s | _ :: s' -> s')
| Drop -> ( match s with [] -> s | _ :: s' -> s')
| Steal -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s')
| Steal_drop -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s')

let precond _ _ = true

let run c d =
match c with
| Push i -> Res (unit, Ws_deque.push d i)
| Pop -> Res (result int exn, protect Ws_deque.pop_exn d)
| Drop -> Res (result unit exn, protect Ws_deque.drop_exn d)
| Steal -> Res (result int exn, protect Ws_deque.steal_exn d)
| Steal_drop -> Res (result unit exn, protect Ws_deque.steal_drop_exn d)

let postcond c (s : state) res =
match (c, res) with
| Push _, Res ((Unit, _), _) -> true
| Pop, Res ((Result (Int, Exn), _), res) -> (
match s with [] -> res = Error Ws_deque.Empty | j :: _ -> res = Ok j)
| Drop, Res ((Result (Unit, Exn), _), res) -> (
match s with [] -> res = Error Ws_deque.Empty | _ -> res = Ok ())
| Steal, Res ((Result (Int, Exn), _), res) -> (
match List.rev s with [] -> Result.is_error res | j :: _ -> res = Ok j)
| Steal_drop, Res ((Result (Unit, Exn), _), res) -> (
match List.rev s with [] -> Result.is_error res | _ -> res = Ok ())
| _, _ -> false
end

Expand Down
Loading