Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uniformize spcs documentation and functions #165

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 21 additions & 56 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,51 +1,18 @@
open Multicore_bench

let run_one ~unsafe ~budgetf ?(size_exponent = 3)
?(n_msgs = 80 * Util.iter_factor) () =
let init _ = () in
let work, before =
if unsafe then
let module Queue = Saturn.Single_prod_single_cons_queue_unsafe in
let t = Queue.create ~size_exponent in
module type BENCH = sig
val run_suite : budgetf:float -> Metric.t list
end

let before () =
while Queue.size t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
for i = 1 to n do
Queue.push_exn t i
done
in
let work i () =
if i = 0 then
let rec loop n =
if 0 < n then
if Queue.try_push t n then loop (n - 1)
else begin
Domain.cpu_relax ();
loop n
end
in
loop n_msgs
else
let rec loop n =
if 0 < n then
match Queue.pop_opt t with
| Some _ -> loop (n - 1)
| None ->
Domain.cpu_relax ();
loop n
in
loop n_msgs
in
(work, before)
else
let module Queue = Saturn.Single_prod_single_cons_queue in
module Make (Queue : Spsc_queue_intf.SPSC_queue) : BENCH = struct
let run_one ~budgetf ?(size_exponent = 3) ?(n_msgs = 80 * Util.iter_factor) ()
=
let init _ = () in
let work, before =
let t = Queue.create ~size_exponent in

let before () =
while Queue.size t <> 0 do
while Queue.length t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
Expand Down Expand Up @@ -76,21 +43,19 @@ let run_one ~unsafe ~budgetf ?(size_exponent = 3)
loop n_msgs
in
(work, before)
in
in

let config =
Printf.sprintf "2 workers, capacity %d%s" (1 lsl size_exponent)
(if unsafe then " (unsafe)" else "")
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
let config =
Printf.sprintf "2 workers, capacity %d" (1 lsl size_exponent)
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
let run ~unsafe =
let run_suite ~budgetf =
[ 0; 3; 6; 9; 12; 15 ]
|> List.concat_map @@ fun size_exponent ->
run_one ~budgetf ~size_exponent ~unsafe ()
in
List.fold_right2
(fun safe unsafe acc -> safe :: unsafe :: acc)
(run ~unsafe:false) (run ~unsafe:true) []
run_one ~budgetf ~size_exponent ()
end

module Safe = Make (Saturn.Single_prod_single_cons_queue)
module Unsafe = Make (Saturn.Single_prod_single_cons_queue_unsafe)
5 changes: 5 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ let () =
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/spsc_queue/spsc_queue_intf.mli spsc_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml))
Expand Down
4 changes: 3 additions & 1 deletion bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ let benchmarks =
("Saturn Queue_unsafe", Bench_queue.Unsafe.run_suite);
("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite);
("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.Safe.run_suite);
( "Saturn Single_prod_single_cons_queue_unsafe",
Bench_spsc_queue.Unsafe.run_suite );
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
("Saturn Htbl", Bench_htbl.Safe.run_suite);
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let () =
(library
(name saturn)
(public_name saturn)
(modules_without_implementation htbl_intf bounded_queue_intf)
(modules_without_implementation htbl_intf bounded_queue_intf spsc_queue_intf)
(libraries backoff multicore-magic |}
^ maybe_threads
^ {| ))
Expand Down
8 changes: 8 additions & 0 deletions src/spsc_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(mdx
(package saturn)
(enabled_if
(and
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files spsc_queue_intf.mli))
43 changes: 35 additions & 8 deletions src/spsc_queue/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type 'a t = {
}

exception Full
exception Empty

(* *)

let create ~size_exponent =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
Expand All @@ -52,6 +55,23 @@ let create ~size_exponent =
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

let of_list_exn ~size_exponent values =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
invalid_arg "size_exponent out of range";
let size = 1 lsl size_exponent in
let nvalues = List.length values in
if nvalues > size then raise Full;
let array = Array.make size None in
List.iteri (fun i elt -> Array.set array i (Some elt)) values;
let tail = Atomic.make_contended nvalues in
let s = Obj.size (Obj.repr tail) in
let tail_cache = Padded_int_ref.make s nvalues in
let head = Atomic.make_contended 0 in
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let push_as (type r) t element (mono : r mono) : r =
Expand All @@ -66,17 +86,21 @@ let push_as (type r) t element (mono : r mono) : r =
head == head_cache
then match mono with Unit -> raise_notrace Full | Bool -> false
else begin
Array.unsafe_set t.array (tail land (size - 1)) (Some element);
Array.set t.array (tail land (size - 1)) (Some element);
Atomic.incr t.tail;
match mono with Unit -> () | Bool -> true
end

let push_exn t element = push_as t element Unit
let try_push t element = push_as t element Bool

exception Empty
(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type op = Peek | Pop

let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
Expand All @@ -88,25 +112,28 @@ let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
let tail = Atomic.get t.tail in
Padded_int_ref.set t.tail_cache tail;
tail_cache == tail
then match poly with Value -> raise_notrace Empty | Option -> None
then match poly with Value | Unit -> raise_notrace Empty | Option -> None
else
let index = head land (Array.length t.array - 1) in
let v = Array.unsafe_get t.array index in
let v = Array.get t.array index in
begin
match op with
| Pop ->
Array.unsafe_set t.array index None;
Array.set t.array index None;
Atomic.incr t.head
| Peek -> ()
end;
match poly with Value -> Option.get v | Option -> v
match poly with Value -> Option.get v | Option -> v | Unit -> ()

let pop_exn t = pop_or_peek_as t Pop Value
let pop_opt t = pop_or_peek_as t Pop Option
let peek_exn t = pop_or_peek_as t Peek Value
let peek_opt t = pop_or_peek_as t Peek Option
let drop_exn t = pop_or_peek_as t Pop Unit

(* *)

let size t =
let length t =
let tail = Atomic.get t.tail in
let head = Atomic.get t.head in
tail - head
79 changes: 0 additions & 79 deletions src/spsc_queue/spsc_queue_intf.ml

This file was deleted.

Loading