Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bounded stack #156

Merged
merged 11 commits into from
Nov 21, 2024
74 changes: 74 additions & 0 deletions bench/bench_bounded_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
open Multicore_bench
module Stack = Saturn.Bounded_stack

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Stack.create () in

let op push =
if push then Stack.try_push t 101 |> ignore else Stack.pop_opt t |> ignore
in

let init _ =
assert (Stack.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Stack.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Stack.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Stack.try_push t i |> ignore
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then begin
for _ = 1 to n do
while Option.is_none (Stack.pop_opt t) do
Domain.cpu_relax ()
done
done;
work ()
end
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s" (format "adder" n_adders) (format "taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ let benchmarks =
("Saturn Htbl", Bench_htbl.run_suite);
("Saturn Stack", Bench_stack.run_suite);
("Saturn Work_stealing_deque", Bench_ws_deque.run_suite);
("Saturn Bounded_Stack", Bench_bounded_stack.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
1 change: 0 additions & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
(documentation "https://ocaml-multicore.github.io/saturn/")
(using mdx 0.4)


(package
(name saturn)
(synopsis "Collection of concurent-safe data structures for Multicore OCaml")
Expand Down
117 changes: 117 additions & 0 deletions src/bounded_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
type 'a t = { head : (int * 'a list) Atomic.t; capacity : int }

(* *)
let create ?(capacity = Int.max_int) () =
let head = Atomic.make_contended (0, []) in
{ head; capacity = max capacity 1 }

let length t = fst (Atomic.get t.head)
let is_full t = t.capacity = length t
let is_empty t = Atomic.get t.head = (0, [])

exception Empty
exception Full

let of_list ?(capacity = Int.max_int) list =
if capacity < List.length list then raise Full
else
let head = Atomic.make_contended (List.length list, List.rev list) in
{ head; capacity } |> Multicore_magic.copy_as_padded

let of_seq ?(capacity = Int.max_int) seq =
if capacity < Seq.length seq then raise Full
else
let list = List.of_seq seq in
let head = Atomic.make_contended (List.length list, List.rev list) in
{ head; capacity } |> Multicore_magic.copy_as_padded

(* *)

type ('a, _) poly1 = Option : ('a, 'a option) poly1 | Value : ('a, 'a) poly1

let peek_as : type a r. a t -> (a, r) poly1 -> r =
fun t poly ->
match Atomic.get t.head with
| _, [] -> begin match poly with Option -> None | Value -> raise Empty end
| _, value :: _ -> ( match poly with Option -> Some value | Value -> value)

let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option

type ('a, _) poly2 =
| Option : ('a, 'a option) poly2
| Value : ('a, 'a) poly2
| Unit : ('a, unit) poly2

(* *)
let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r =
fun t backoff poly ->
match Atomic.get t.head with
| _, [] -> begin
match poly with
| Option -> None
| Value -> raise Empty
| Unit -> raise Empty
end
| (len, hd :: tl) as old_head ->
if Atomic.compare_and_set t.head old_head (len - 1, tl) then
match poly with Option -> Some hd | Value -> hd | Unit -> ()
else pop_as t (Backoff.once backoff) poly

let pop_exn t = pop_as t Backoff.default Value
let pop_opt t = pop_as t Backoff.default Option
let drop_exn t = pop_as t Backoff.default Unit

let rec pop_all t backoff =
match Atomic.get t.head with
| _, [] -> []
| (_, values) as old_head ->
if Atomic.compare_and_set t.head old_head (0, []) then values
else pop_all t (Backoff.once backoff)

let pop_all t = pop_all t Backoff.default

let to_seq t =
match Atomic.get t.head with
| _, [] -> Seq.empty
| _, values -> List.to_seq values
(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let rec push_as : type r. 'a t -> Backoff.t -> 'a -> r mono -> r =
fun t backoff value mono ->
let ((len, values) as before) = Atomic.get t.head in
if len >= t.capacity then match mono with Bool -> false | Unit -> raise Full
else
let after = (len + 1, value :: values) in

if Atomic.compare_and_set t.head before after then
match mono with Bool -> true | Unit -> ()
else push_as t (Backoff.once backoff) value mono

let push_exn t value = push_as t Backoff.default value Unit
let try_push t value = push_as t Backoff.default value Bool

let rec push_all_as : type r. 'a t -> Backoff.t -> 'a list -> r mono -> r =
fun t backoff values mono ->
let len = List.length values in
if len = 0 then match mono with Unit -> () | Bool -> true
else if len > t.capacity then
match mono with Unit -> raise Full | Bool -> false
else
let ((curr_len, prev_values) as before) = Atomic.get t.head in
if curr_len + len > t.capacity then
match mono with Bool -> false | Unit -> raise Full
else
let after =
(curr_len + len, List.rev_append (List.rev values) prev_values)
in
if Atomic.compare_and_set t.head before after then
match mono with Bool -> true | Unit -> ()
else push_all_as t (Backoff.once backoff) values mono

let try_push_all t values = push_all_as t Backoff.default (List.rev values) Bool
let push_all_exn t values = push_all_as t Backoff.default (List.rev values) Unit
let add_seq_exn t seq = push_all_as t Backoff.default (List.of_seq seq) Unit
let try_add_seq t seq = push_all_as t Backoff.default (List.of_seq seq) Bool
Loading
Loading