Skip to content

Commit

Permalink
From bounded and blocking queue to bounded queue.
Browse files Browse the repository at this point in the history
Additional functions and tests
  • Loading branch information
lyrm committed Nov 5, 2024
1 parent 5a61a00 commit 75f5687
Show file tree
Hide file tree
Showing 9 changed files with 668 additions and 233 deletions.
6 changes: 4 additions & 2 deletions bench/bench_bounded_stack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ module Stack = Saturn_lockfree.Bounded_stack
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Stack.create () in

let op push = if push then Stack.push t 101 else Stack.pop_opt t |> ignore in
let op push =
if push then Stack.try_push t 101 |> ignore else Stack.pop_opt t |> ignore
in

let init _ =
assert (Stack.is_empty t);
Expand Down Expand Up @@ -35,7 +37,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Stack.push t i
Stack.try_push t i |> ignore
done;
work ()
end
Expand Down
2 changes: 0 additions & 2 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
(documentation "https://ocaml-multicore.github.io/saturn/")
(using mdx 0.4)


(package
(name saturn)
(synopsis "Collection of concurent-safe data structures for Multicore OCaml")
(depends
(ocaml (>= 4.14))
(backoff (>= 0.1.0))
(picos (>= 0.5.0))
(multicore-magic (>= 2.3.0))
(alcotest (and (>= 1.7.0) :with-test))
(domain_shims (and (>= 0.1.0) :with-test))
Expand Down
314 changes: 183 additions & 131 deletions src/bounded_stack.ml
Original file line number Diff line number Diff line change
@@ -1,156 +1,208 @@
open Picos

type 'a node = Nil | Cons of { value : 'a; tail : 'a node; capacity : int }
type 'a t = { head : 'a node Atomic.t; capacity : int }

type 'a t = {
head : 'a node Atomic.t;
cons_waiters : Trigger.t list Atomic.t;
prod_waiters : Trigger.t list Atomic.t;
capacity : int;
}

(* *)
let create ?(capacity = Int.max_int) () =
let head = Atomic.make_contended Nil in
let cons_waiters = Atomic.make_contended [] in
let prod_waiters = Atomic.make_contended [] in
{ head; cons_waiters; prod_waiters; capacity = max capacity 1 }
|> Multicore_magic.copy_as_padded

let is_empty t = Atomic.get t.head == Nil

let rec signal_all waiters =
let triggers = Atomic.get waiters in
if triggers != [] then
if Atomic.compare_and_set waiters triggers [] then
List.iter Trigger.signal triggers
else signal_all waiters

let rec peek t =
let old_head = Atomic.get t.head in
match old_head with
| Nil ->
let trigger = Trigger.create () in
let triggers = Atomic.get t.cons_waiters in
if Atomic.compare_and_set t.cons_waiters triggers (trigger :: triggers)
then begin
if Atomic.get t.head != Nil then signal_all t.cons_waiters
else
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) ->
signal_all t.cons_waiters;
Printexc.raise_with_backtrace exn bt
end;
peek t
| Cons cons -> cons.value

let peek_opt t =
let head = Atomic.get t.head in
match head with Nil -> None | Cons cons -> Some cons.value

let rec pop t backoff =
match Atomic.get t.head with
| Nil ->
let trigger = Trigger.create () in
let triggers = Atomic.get t.cons_waiters in
if Atomic.compare_and_set t.cons_waiters triggers (trigger :: triggers)
then begin
if Atomic.get t.head != Nil then signal_all t.cons_waiters
else
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) ->
signal_all t.cons_waiters;
Printexc.raise_with_backtrace exn bt
end;
pop t backoff
| Cons cons_r as old_head ->
if Atomic.compare_and_set t.head old_head cons_r.tail then (
signal_all t.prod_waiters;
cons_r.value)
else pop t (Backoff.once backoff)
{ head; capacity = max capacity 1 } |> Multicore_magic.copy_as_padded

let pop t = pop t Backoff.default
let length t =
match Atomic.get t.head with Nil -> 0 | Cons cons -> cons.capacity

let rec pop_opt t backoff =
let is_empty t = Atomic.get t.head = Nil

exception Empty
exception Full

let of_list ?(capacity = Int.max_int) list =
if capacity < List.length list then raise Full
else
let head =
List.fold_left
(fun (len, acc) elt ->
(len + 1, Cons { value = elt; tail = acc; capacity = len }))
(1, Nil) list
|> snd |> Atomic.make_contended
in
{ head; capacity } |> Multicore_magic.copy_as_padded

let of_seq ?(capacity = Int.max_int) seq =
if capacity < Seq.length seq then raise Full
else
let head =
Seq.fold_left
(fun (len, acc) elt ->
(len + 1, Cons { value = elt; tail = acc; capacity = len }))
(1, Nil) seq
|> snd |> Atomic.make_contended
in
{ head; capacity } |> Multicore_magic.copy_as_padded

(* *)

type ('a, _) poly1 = Option : ('a, 'a option) poly1 | Value : ('a, 'a) poly1

let peek_as : type a r. a t -> (a, r) poly1 -> r =
fun t poly ->
match Atomic.get t.head with
| Nil -> begin
match poly with Option -> None | Value -> raise_notrace Empty
end
| Cons cons_r -> (
match poly with Option -> Some cons_r.value | Value -> cons_r.value)

let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option

type ('a, _) poly2 =
| Option : ('a, 'a option) poly2
| Value : ('a, 'a) poly2
| Unit : ('a, unit) poly2

(* *)
let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r =
fun t backoff poly ->
match Atomic.get t.head with
| Nil -> None
| Nil -> begin
match poly with
| Option -> None
| Value -> raise_notrace Empty
| Unit -> raise_notrace Empty
end
| Cons cons_r as old_head ->
if Atomic.compare_and_set t.head old_head cons_r.tail then (
signal_all t.prod_waiters;
Some cons_r.value)
else pop_opt t (Backoff.once backoff)
if Atomic.compare_and_set t.head old_head cons_r.tail then
match poly with
| Option -> Some cons_r.value
| Value -> cons_r.value
| Unit -> ()
else pop_as t (Backoff.once backoff) poly

let pop_opt t = pop_opt t Backoff.default
let pop_exn t = pop_as t Backoff.default Value
let pop_opt t = pop_as t Backoff.default Option
let drop_exn t = pop_as t Backoff.default Unit

let rec push t backoff value =
let rec pop_all t backoff =
match Atomic.get t.head with
| Nil ->
let new_head = Cons { value; tail = Nil; capacity = 1 } in
if Atomic.compare_and_set t.head Nil new_head then
signal_all t.cons_waiters
else push t (Backoff.once backoff) value
| Cons cons_r as old_head ->
if cons_r.capacity >= t.capacity then begin
let trigger = Trigger.create () in
let triggers = Atomic.get t.prod_waiters in
if Atomic.compare_and_set t.prod_waiters triggers (trigger :: triggers)
then begin
if Atomic.get t.head != old_head then signal_all t.prod_waiters
else
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) ->
signal_all t.prod_waiters;
Printexc.raise_with_backtrace exn bt
end;
push t backoff value
end
else
let new_head =
Cons { value; tail = old_head; capacity = cons_r.capacity + 1 }
| Nil -> []
| old_head ->
if Atomic.compare_and_set t.head old_head Nil then
let[@tail_mod_cons] rec aux = function
| Nil -> []
| Cons cons -> cons.value :: aux cons.tail
in
if Atomic.compare_and_set t.head old_head new_head then
signal_all t.cons_waiters
else push t (Backoff.once backoff) value
aux old_head
else pop_all t (Backoff.once backoff)

let push t value = push t Backoff.default value
let pop_all t = pop_all t Backoff.default

let rec try_push t backoff value =
let to_seq t =
match Atomic.get t.head with
| Nil -> Seq.empty
| old_head ->
let rec aux s () =
match s with
| Nil -> Seq.Nil
| Cons cons -> Seq.Cons (cons.value, aux cons.tail)
in
aux old_head

(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let rec push_as : type r. 'a t -> Backoff.t -> 'a -> r mono -> r =
fun t backoff value mono ->
match Atomic.get t.head with
| Nil ->
let new_head = Cons { value; tail = Nil; capacity = 1 } in
if Atomic.compare_and_set t.head Nil new_head then (
signal_all t.cons_waiters;
true)
else try_push t (Backoff.once backoff) value
if
Atomic.compare_and_set t.head Nil
@@ Cons { value; tail = Nil; capacity = 1 }
then match mono with Bool -> true | Unit -> ()
else push_as t (Backoff.once backoff) value mono
| Cons cons_r as old_head ->
if cons_r.capacity >= t.capacity then false
if cons_r.capacity >= t.capacity then
match mono with Bool -> false | Unit -> raise Full
else
let new_head =
Cons { value; tail = old_head; capacity = cons_r.capacity + 1 }
in
if Atomic.compare_and_set t.head old_head new_head then (
signal_all t.cons_waiters;
true)
else try_push t (Backoff.once backoff) value
if Atomic.compare_and_set t.head old_head new_head then
match mono with Bool -> true | Unit -> ()
else push_as t (Backoff.once backoff) value mono

let try_push t value = try_push t Backoff.default value
let push_exn t value = push_as t Backoff.default value Unit
let try_push t value = push_as t Backoff.default value Bool

let length t =
match Atomic.get t.head with Nil -> 0 | Cons cons -> cons.capacity
type ('a, _) poly3 = Value : ('a, 'a) poly3 | Bool : ('a, bool) poly3

let rec pop_all t backoff =
let rec set_as : type v r. v t -> v -> Backoff.t -> (v, r) poly3 -> r =
fun t value backoff poly ->
match Atomic.get t.head with
| Nil -> []
| old_head ->
if Atomic.compare_and_set t.head old_head Nil then (
signal_all t.prod_waiters;
let rec aux acc = function
| Nil -> List.rev acc
| Cons cons -> aux (cons.value :: acc) cons.tail
in
aux [] old_head)
else pop_all t (Backoff.once backoff)

let pop_all t = pop_all t Backoff.default
| Nil -> ( match poly with Value -> raise_notrace Empty | Bool -> false)
| Cons cons_r as old_head ->
if Atomic.compare_and_set t.head old_head @@ Cons { cons_r with value }
then match poly with Value -> cons_r.value | Bool -> true
else set_as t value (Backoff.once backoff) poly

let set_exn t value = set_as t value Backoff.default Value
let try_set t value = set_as t value Backoff.default Bool

let rec push_all_as : type r. 'a t -> Backoff.t -> 'a list -> r mono -> r =
fun t backoff values mono ->
let len = List.length values in
if len = 0 then match mono with Unit -> () | Bool -> true
else if len > t.capacity then
match mono with Unit -> raise Full | Bool -> false
else
let rec build_node len acc = function
| [] -> acc
| x :: xs ->
build_node (len + 1)
(Cons { capacity = len + 1; tail = acc; value = x })
xs
in
match Atomic.get t.head with
| Nil ->
if Atomic.compare_and_set t.head Nil (build_node 0 Nil values) then
match mono with Bool -> true | Unit -> ()
else push_all_as t (Backoff.once backoff) values mono
| Cons cons_r as old_head ->
if cons_r.capacity + len > t.capacity then
match mono with Bool -> false | Unit -> raise Full
else if
Atomic.compare_and_set t.head old_head
@@ build_node cons_r.capacity old_head values
then match mono with Bool -> true | Unit -> ()
else push_all_as t (Backoff.once backoff) values mono

let try_push_all t values = push_all_as t Backoff.default values Bool
let push_all_exn t values = push_all_as t Backoff.default values Unit
let add_seq_exn t seq = push_all_as t Backoff.default (List.of_seq seq) Unit
let try_add_seq t seq = push_all_as t Backoff.default (List.of_seq seq) Bool

(* *)

type op = Set | Pop

let try_compare_and_ t old_value new_value op =
let rec aux backoff =
match Atomic.get t.head with
| Nil -> false
| Cons cons_r as old_head ->
if cons_r.value == old_value then
if
Atomic.compare_and_set t.head old_head
@@
match op with
| Set -> Cons { cons_r with value = new_value }
| Pop -> cons_r.tail
then true
else aux (Backoff.once backoff)
else false
in
aux Backoff.default

let try_compare_and_pop t value = try_compare_and_ t value value Pop

let try_compare_and_set t old_value new_value =
try_compare_and_ t old_value new_value Set
Loading

0 comments on commit 75f5687

Please sign in to comment.