Skip to content

Commit

Permalink
Add new functions and improve documentations of Treiber stack.
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrm committed Nov 5, 2024
1 parent 37d41e2 commit 8f72d22
Show file tree
Hide file tree
Showing 4 changed files with 633 additions and 80 deletions.
144 changes: 133 additions & 11 deletions src/treiber_stack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,151 @@
type 'a node = Nil | Cons of { value : 'a; tail : 'a node }
type 'a t = 'a node Atomic.t

let create () = Atomic.make Nil |> Multicore_magic.copy_as_padded
let create () = Atomic.make_contended Nil
let is_empty t = Atomic.get t == Nil

let rec push t value backoff =
let tail = Atomic.get t in
let cons = Cons { value; tail } in
if not (Atomic.compare_and_set t tail cons) then
push t value (Backoff.once backoff)
let of_list list =
List.fold_left (fun acc elt -> Cons { value = elt; tail = acc }) Nil list
|> Atomic.make_contended

let push t value = push t value Backoff.default
let of_seq seq =
Seq.fold_left (fun acc elt -> Cons { value = elt; tail = acc }) Nil seq
|> Atomic.make_contended

(* *)

exception Empty

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
let peek_as : type a r. a t -> (a, r) poly -> r =
fun t poly ->
match Atomic.get t with
| Nil -> begin
match poly with Option -> None | Value -> raise_notrace Empty
end
| Cons cons -> (
match poly with Option -> Some cons.value | Value -> cons.value)

let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option

type ('a, _) poly2 =
| Option : ('a, 'a option) poly2
| Value : ('a, 'a) poly2
| Unit : ('a, unit) poly2

let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r =
fun t backoff poly ->
match Atomic.get t with
| Nil -> begin match poly with Option -> None | Value -> raise Empty end
| Nil -> begin
match poly with Option -> None | Value | Unit -> raise_notrace Empty
end
| Cons cons_r as cons ->
if Atomic.compare_and_set t cons cons_r.tail then
match poly with Option -> Some cons_r.value | Value -> cons_r.value
match poly with
| Option -> Some cons_r.value
| Value -> cons_r.value
| Unit -> ()
else pop_as t (Backoff.once backoff) poly

let pop t = pop_as t Backoff.default Value
let pop_exn t = pop_as t Backoff.default Value
let pop_opt t = pop_as t Backoff.default Option
let drop_exn t = pop_as t Backoff.default Unit

let rec pop_all t backoff =
match Atomic.get t with
| Nil -> []
| old_head ->
if Atomic.compare_and_set t old_head Nil then
let[@tail_mod_cons] rec aux = function
| Nil -> []
| Cons cons -> cons.value :: aux cons.tail
in
aux old_head
else pop_all t (Backoff.once backoff)

let pop_all t = pop_all t Backoff.default

let to_seq t =
match Atomic.get t with
| Nil -> Seq.empty
| old_head ->
let rec aux s () =
match s with
| Nil -> Seq.Nil
| Cons cons -> Seq.Cons (cons.value, aux cons.tail)
in
aux old_head
(* *)

let rec push t value backoff =
let tail = Atomic.get t in
let cons = Cons { value; tail } in
if not (Atomic.compare_and_set t tail cons) then
push t value (Backoff.once backoff)

let push t value = push t value Backoff.default

(**)

type ('a, _) poly3 = Value : ('a, 'a) poly3 | Bool : ('a, bool) poly3

let rec set_as : type v r. v t -> v -> Backoff.t -> (v, r) poly3 -> r =
fun t value backoff poly ->
match Atomic.get t with
| Nil -> ( match poly with Value -> raise_notrace Empty | Bool -> false)
| Cons cons_r as old_head ->
if Atomic.compare_and_set t old_head @@ Cons { cons_r with value } then
match poly with Value -> cons_r.value | Bool -> true
else set_as t value (Backoff.once backoff) poly

let set_exn t value = set_as t value Backoff.default Value
let try_set t value = set_as t value Backoff.default Bool

(**)

let rec push_all_ t backoff values =
let rec build_node acc = function
| [] -> acc
| x :: xs -> build_node (Cons { tail = acc; value = x }) xs
in
match Atomic.get t with
| Nil ->
if Atomic.compare_and_set t Nil (build_node Nil values) then ()
else push_all_ t (Backoff.once backoff) values
| Cons _ as old_head ->
if Atomic.compare_and_set t old_head @@ build_node old_head values then ()
else push_all_ t (Backoff.once backoff) values

let push_all t values =
match values with [] -> () | _ -> push_all_ t Backoff.default values

let add_seq t seq = push_all_ t Backoff.default (List.of_seq seq)

(* *)

type op = Set | Pop

let try_compare_and_ t old_value new_value op =
let rec aux backoff =
match Atomic.get t with
| Nil -> false
| Cons cons_r as old_head ->
if cons_r.value == old_value then
if
Atomic.compare_and_set t old_head
@@
match op with
| Set -> Cons { cons_r with value = new_value }
| Pop -> cons_r.tail
then true
else aux (Backoff.once backoff)
else false
in
aux Backoff.default

let try_compare_and_pop t value = try_compare_and_ t value value Pop

let try_compare_and_set t old_value new_value =
try_compare_and_ t old_value new_value Set
190 changes: 175 additions & 15 deletions src/treiber_stack.mli
Original file line number Diff line number Diff line change
@@ -1,31 +1,191 @@
(** Classic multi-producer multi-consumer Treiber stack.
All function are lockfree. It is the recommended starting point
when needing LIFO structure. *)
All functions are lock-free. It is the recommended starting point
when needing a LIFO structure. *)

(** {1 API} *)

type 'a t
(** Type of Treiber stack holding items of type [t]. *)
(** Represents a lock-free Treiber stack holding elements of type ['a]. *)

val create : unit -> 'a t
(** [create ()] returns a new and empty Treiber stack. *)
(** [create ()] creates a new empty Treiber stack. *)

val of_list : 'a list -> 'a t
(** [of_list list] creates a new Treiber stack from a list. *)

val is_empty : 'a t -> bool
(** [is_empty s] checks whether stack [s] is empty. *)
(** [is_empty stack] returns [true] if the [stack] is empty, otherwise [false]. *)

val push : 'a t -> 'a -> unit
(** [push s v] adds the element [v] at the top of stack [s]. *)
(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop} is applied to an empty queue. *)
(** Raised when {!pop_exn}, {!peek_exn}, {!drop_exn}, or {!set_exn} is
applied to an empty stack.
val pop : 'a t -> 'a
(** [pop s] removes and returns the topmost element in the
stack [s].
This exception is meant to avoid allocations required by an option type.
As such, it does not register backtrace information and it is recommended to
use the following pattern to catch it.
@raise Empty if [a] is empty.
*)
{@ocaml skip[
match pop_exn s with
| value -> () (* ... *)
| exception Empty -> () (* ... *)
]} *)

val peek_exn : 'a t -> 'a
(** [peek_exn stack] returns the top element of the [stack] without removing it.
@raises Empty if the [stack] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt stack] returns [Some] of the top element of the [stack] without
removing it, or [None] if the [stack] is empty. *)

val pop_exn : 'a t -> 'a
(** [pop_exn stack] removes and returns the top element of the [stack].
@raises Empty if the [stack] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt s] removes and returns the topmost element in the
stack [s], or returns [None] if the stack is empty.
(** [pop_opt stack] removes and returns [Some] of the top element of the [stack],
or [None] if the [stack] is empty. *)

val drop_exn : 'a t -> unit
(** [drop_exn stack] removes the top element of the [stack].
@raises Empty if the [stack] is empty. *)

val try_compare_and_pop : 'a t -> 'a -> bool
(** [try_compare_and_pop stack before] tries to remove the top element of the
[stack] if it is equal to [before]. Returns [true] on success and [false] in
case the stack is empty or if the top element is not equal to [before].
ℹ️ The values are compared using physical equality, i.e. the [==] operator. *)

val pop_all : 'a t -> 'a list
(** [pop_all stack] removes and returns all elements of the [stack] in LIFO
order.
{[
# open Saturn_lockfree.Stack
# let t : int t = create ()
val t : int t = <abstr>
# push t 1
- : unit = ()
# push t 2
- : unit = ()
# push t 3
- : unit = ()
# pop_all t
- : int list = [3; 2; 1]
]}
*)

(** {2 Producer functions} *)

val push : 'a t -> 'a -> unit
(** [push stack element] adds [element] to the top of the [stack]. *)

val push_all : 'a t -> 'a list -> unit
(** [push_all stack elements] adds all [elements] to the top of the [stack].
{[
# let t : int t = create ()
val t : int t = <abstr>
# push_all t [1; 2; 3; 4]
- : unit = ()
# pop_opt t
- : int option = Some 4
# pop_opt t
- : int option = Some 3
# pop_all t
- : int list = [2; 1]
]}
*)

(** {3 Updating bindings} *)

val try_set : 'a t -> 'a -> bool
(** [try_set stack value] tries to update the top element of the [stack] to
[value]. Returns [true] on success and [false] if the [stack] is empty.
*)

val try_compare_and_set : 'a t -> 'a -> 'a -> bool
(** [try_compare_and_set stack before after] tries to update the top element of
the [stack] from the [before] value to the [after] value. Returns [true] on
success and [false] if the [stack] is empty or the top element is not equal
to [before].
ℹ️ The values are compared using physical equality, i.e. the [==]
operator. *)

val set_exn : 'a t -> 'a -> 'a
(** [set_exn stack after] tries to update the top element of [stack] from some
[before] value to the [after] value. Returns the [before] value on success.
@raise Empty if the [stack] is empty. *)

(** {2 With Sequences }*)
val to_seq : 'a t -> 'a Seq.t
(** [to_seq stack] takes a snapshot of [stack] and returns its value top to
bottom.
🐌 This is a linear time operation. *)

val of_seq : 'a Seq.t -> 'a t
(** [of_seq seq] creates a stack from a [seq]. It must be finite. *)

val add_seq : 'a t -> 'a Seq.t -> unit
(** [add_seq stack seq] adds all elements of [seq] to the top of the
[stack]. [seq] must be finite. *)

(** {1 Examples}
An example top-level session:
{[
# open Saturn_lockfree.Stack
# let t : int t = create ()
val t : int t = <abstr>
# push t 42
- : unit = ()
# push_all t [1; 2; 3]
- : unit = ()
# pop_exn t
- : int = 3
# peek_opt t
- : int option = Some 2
# pop_all t
- : int list = [2; 1; 42]
# pop_exn t
Exception: Saturn_lockfree__Treiber_stack.Empty.]}
A multicore example:
{@ocaml non-deterministic[
# open Saturn_lockfree.Stack
# let t : int t = create ()
val t : int t = <abstr>
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let pusher () =
Atomic.decr barrier;
while Atomic.get barrier != 0 do Domain.cpu_relax () done;
push_all t [1;2;3] |> ignore;
push t 42;
push t 12
val pusher : unit -> unit = <fun>
# let popper () =
Atomic.decr barrier;
while Atomic.get barrier != 0 do Domain.cpu_relax () done;
List.init 6 (fun i -> Domain.cpu_relax (); pop_opt t)
val popper : unit -> int option list = <fun>
# let domain_pusher = Domain.spawn pusher
val domain_pusher : unit Domain.t = <abstr>
# let domain_popper = Domain.spawn popper
val domain_popper : int option list Domain.t = <abstr>
# Domain.join domain_pusher
- : unit = ()
# Domain.join domain_popper
- : int option list = [Some 42; Some 3; Some 2; Some 1; None; Some 12]
]}
*)
Loading

0 comments on commit 8f72d22

Please sign in to comment.