Skip to content

Commit

Permalink
Bounded stack (#156)
Browse files Browse the repository at this point in the history
* Add bound_stack.
* Add tests
  • Loading branch information
lyrm authored Nov 21, 2024
1 parent d509fc6 commit e50b636
Show file tree
Hide file tree
Showing 13 changed files with 854 additions and 3 deletions.
74 changes: 74 additions & 0 deletions bench/bench_bounded_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
open Multicore_bench
module Stack = Saturn.Bounded_stack

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Stack.create () in

let op push =
if push then Stack.try_push t 101 |> ignore else Stack.pop_opt t |> ignore
in

let init _ =
assert (Stack.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Stack.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Stack.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Stack.try_push t i |> ignore
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then begin
for _ = 1 to n do
while Option.is_none (Stack.pop_opt t) do
Domain.cpu_relax ()
done
done;
work ()
end
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s" (format "adder" n_adders) (format "taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ let benchmarks =
("Saturn Htbl", Bench_htbl.run_suite);
("Saturn Stack", Bench_stack.run_suite);
("Saturn Work_stealing_deque", Bench_ws_deque.run_suite);
("Saturn Bounded_Stack", Bench_bounded_stack.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
1 change: 0 additions & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
(documentation "https://ocaml-multicore.github.io/saturn/")
(using mdx 0.4)


(package
(name saturn)
(synopsis "Collection of concurent-safe data structures for Multicore OCaml")
Expand Down
117 changes: 117 additions & 0 deletions src/bounded_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
type 'a t = { head : (int * 'a list) Atomic.t; capacity : int }

(* *)
let create ?(capacity = Int.max_int) () =
let head = Atomic.make_contended (0, []) in
{ head; capacity = max capacity 1 }

let length t = fst (Atomic.get t.head)
let is_full t = t.capacity = length t
let is_empty t = Atomic.get t.head = (0, [])

exception Empty
exception Full

let of_list ?(capacity = Int.max_int) list =
if capacity < List.length list then raise Full
else
let head = Atomic.make_contended (List.length list, List.rev list) in
{ head; capacity } |> Multicore_magic.copy_as_padded

let of_seq ?(capacity = Int.max_int) seq =
if capacity < Seq.length seq then raise Full
else
let list = List.of_seq seq in
let head = Atomic.make_contended (List.length list, List.rev list) in
{ head; capacity } |> Multicore_magic.copy_as_padded

(* *)

type ('a, _) poly1 = Option : ('a, 'a option) poly1 | Value : ('a, 'a) poly1

let peek_as : type a r. a t -> (a, r) poly1 -> r =
fun t poly ->
match Atomic.get t.head with
| _, [] -> begin match poly with Option -> None | Value -> raise Empty end
| _, value :: _ -> ( match poly with Option -> Some value | Value -> value)

let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option

type ('a, _) poly2 =
| Option : ('a, 'a option) poly2
| Value : ('a, 'a) poly2
| Unit : ('a, unit) poly2

(* *)
let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r =
fun t backoff poly ->
match Atomic.get t.head with
| _, [] -> begin
match poly with
| Option -> None
| Value -> raise Empty
| Unit -> raise Empty
end
| (len, hd :: tl) as old_head ->
if Atomic.compare_and_set t.head old_head (len - 1, tl) then
match poly with Option -> Some hd | Value -> hd | Unit -> ()
else pop_as t (Backoff.once backoff) poly

let pop_exn t = pop_as t Backoff.default Value
let pop_opt t = pop_as t Backoff.default Option
let drop_exn t = pop_as t Backoff.default Unit

let rec pop_all t backoff =
match Atomic.get t.head with
| _, [] -> []
| (_, values) as old_head ->
if Atomic.compare_and_set t.head old_head (0, []) then values
else pop_all t (Backoff.once backoff)

let pop_all t = pop_all t Backoff.default

let to_seq t =
match Atomic.get t.head with
| _, [] -> Seq.empty
| _, values -> List.to_seq values
(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let rec push_as : type r. 'a t -> Backoff.t -> 'a -> r mono -> r =
fun t backoff value mono ->
let ((len, values) as before) = Atomic.get t.head in
if len >= t.capacity then match mono with Bool -> false | Unit -> raise Full
else
let after = (len + 1, value :: values) in

if Atomic.compare_and_set t.head before after then
match mono with Bool -> true | Unit -> ()
else push_as t (Backoff.once backoff) value mono

let push_exn t value = push_as t Backoff.default value Unit
let try_push t value = push_as t Backoff.default value Bool

let rec push_all_as : type r. 'a t -> Backoff.t -> 'a list -> r mono -> r =
fun t backoff values mono ->
let len = List.length values in
if len = 0 then match mono with Unit -> () | Bool -> true
else if len > t.capacity then
match mono with Unit -> raise Full | Bool -> false
else
let ((curr_len, prev_values) as before) = Atomic.get t.head in
if curr_len + len > t.capacity then
match mono with Bool -> false | Unit -> raise Full
else
let after =
(curr_len + len, List.rev_append (List.rev values) prev_values)
in
if Atomic.compare_and_set t.head before after then
match mono with Bool -> true | Unit -> ()
else push_all_as t (Backoff.once backoff) values mono

let try_push_all t values = push_all_as t Backoff.default (List.rev values) Bool
let push_all_exn t values = push_all_as t Backoff.default (List.rev values) Unit
let add_seq_exn t seq = push_all_as t Backoff.default (List.of_seq seq) Unit
let try_add_seq t seq = push_all_as t Backoff.default (List.of_seq seq) Bool
Loading

0 comments on commit e50b636

Please sign in to comment.