Skip to content

Commit

Permalink
Uniformize spcs documentation and functions (#165)
Browse files Browse the repository at this point in the history
* Improve documentation.

* Add of_list_exn and drop_exn functions and dscheck tests for these functions

* Rename Spsc_queue.size to length for uniformization.

* Add examples in documentation

* Add bench for spsc_unsafe.
  • Loading branch information
lyrm authored Dec 2, 2024
1 parent fc34768 commit fc0e6a2
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 164 deletions.
77 changes: 21 additions & 56 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,51 +1,18 @@
open Multicore_bench

let run_one ~unsafe ~budgetf ?(size_exponent = 3)
?(n_msgs = 80 * Util.iter_factor) () =
let init _ = () in
let work, before =
if unsafe then
let module Queue = Saturn.Single_prod_single_cons_queue_unsafe in
let t = Queue.create ~size_exponent in
module type BENCH = sig
val run_suite : budgetf:float -> Metric.t list
end

let before () =
while Queue.size t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
for i = 1 to n do
Queue.push_exn t i
done
in
let work i () =
if i = 0 then
let rec loop n =
if 0 < n then
if Queue.try_push t n then loop (n - 1)
else begin
Domain.cpu_relax ();
loop n
end
in
loop n_msgs
else
let rec loop n =
if 0 < n then
match Queue.pop_opt t with
| Some _ -> loop (n - 1)
| None ->
Domain.cpu_relax ();
loop n
in
loop n_msgs
in
(work, before)
else
let module Queue = Saturn.Single_prod_single_cons_queue in
module Make (Queue : Spsc_queue_intf.SPSC_queue) : BENCH = struct
let run_one ~budgetf ?(size_exponent = 3) ?(n_msgs = 80 * Util.iter_factor) ()
=
let init _ = () in
let work, before =
let t = Queue.create ~size_exponent in

let before () =
while Queue.size t <> 0 do
while Queue.length t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
Expand Down Expand Up @@ -76,21 +43,19 @@ let run_one ~unsafe ~budgetf ?(size_exponent = 3)
loop n_msgs
in
(work, before)
in
in

let config =
Printf.sprintf "2 workers, capacity %d%s" (1 lsl size_exponent)
(if unsafe then " (unsafe)" else "")
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
let config =
Printf.sprintf "2 workers, capacity %d" (1 lsl size_exponent)
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
let run ~unsafe =
let run_suite ~budgetf =
[ 0; 3; 6; 9; 12; 15 ]
|> List.concat_map @@ fun size_exponent ->
run_one ~budgetf ~size_exponent ~unsafe ()
in
List.fold_right2
(fun safe unsafe acc -> safe :: unsafe :: acc)
(run ~unsafe:false) (run ~unsafe:true) []
run_one ~budgetf ~size_exponent ()
end

module Safe = Make (Saturn.Single_prod_single_cons_queue)
module Unsafe = Make (Saturn.Single_prod_single_cons_queue_unsafe)
5 changes: 5 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ let () =
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/spsc_queue/spsc_queue_intf.mli spsc_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml))
Expand Down
4 changes: 3 additions & 1 deletion bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ let benchmarks =
("Saturn Queue_unsafe", Bench_queue.Unsafe.run_suite);
("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite);
("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.Safe.run_suite);
( "Saturn Single_prod_single_cons_queue_unsafe",
Bench_spsc_queue.Unsafe.run_suite );
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
("Saturn Htbl", Bench_htbl.Safe.run_suite);
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let () =
(library
(name saturn)
(public_name saturn)
(modules_without_implementation htbl_intf bounded_queue_intf)
(modules_without_implementation htbl_intf bounded_queue_intf spsc_queue_intf)
(libraries backoff multicore-magic |}
^ maybe_threads
^ {| ))
Expand Down
8 changes: 8 additions & 0 deletions src/spsc_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(mdx
(package saturn)
(enabled_if
(and
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files spsc_queue_intf.mli))
43 changes: 35 additions & 8 deletions src/spsc_queue/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type 'a t = {
}

exception Full
exception Empty

(* *)

let create ~size_exponent =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
Expand All @@ -52,6 +55,23 @@ let create ~size_exponent =
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

let of_list_exn ~size_exponent values =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
invalid_arg "size_exponent out of range";
let size = 1 lsl size_exponent in
let nvalues = List.length values in
if nvalues > size then raise Full;
let array = Array.make size None in
List.iteri (fun i elt -> Array.set array i (Some elt)) values;
let tail = Atomic.make_contended nvalues in
let s = Obj.size (Obj.repr tail) in
let tail_cache = Padded_int_ref.make s nvalues in
let head = Atomic.make_contended 0 in
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let push_as (type r) t element (mono : r mono) : r =
Expand All @@ -66,17 +86,21 @@ let push_as (type r) t element (mono : r mono) : r =
head == head_cache
then match mono with Unit -> raise_notrace Full | Bool -> false
else begin
Array.unsafe_set t.array (tail land (size - 1)) (Some element);
Array.set t.array (tail land (size - 1)) (Some element);
Atomic.incr t.tail;
match mono with Unit -> () | Bool -> true
end

let push_exn t element = push_as t element Unit
let try_push t element = push_as t element Bool

exception Empty
(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type op = Peek | Pop

let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
Expand All @@ -88,25 +112,28 @@ let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
let tail = Atomic.get t.tail in
Padded_int_ref.set t.tail_cache tail;
tail_cache == tail
then match poly with Value -> raise_notrace Empty | Option -> None
then match poly with Value | Unit -> raise_notrace Empty | Option -> None
else
let index = head land (Array.length t.array - 1) in
let v = Array.unsafe_get t.array index in
let v = Array.get t.array index in
begin
match op with
| Pop ->
Array.unsafe_set t.array index None;
Array.set t.array index None;
Atomic.incr t.head
| Peek -> ()
end;
match poly with Value -> Option.get v | Option -> v
match poly with Value -> Option.get v | Option -> v | Unit -> ()

let pop_exn t = pop_or_peek_as t Pop Value
let pop_opt t = pop_or_peek_as t Pop Option
let peek_exn t = pop_or_peek_as t Peek Value
let peek_opt t = pop_or_peek_as t Peek Option
let drop_exn t = pop_or_peek_as t Pop Unit

(* *)

let size t =
let length t =
let tail = Atomic.get t.tail in
let head = Atomic.get t.head in
tail - head
79 changes: 0 additions & 79 deletions src/spsc_queue/spsc_queue_intf.ml

This file was deleted.

Loading

0 comments on commit fc0e6a2

Please sign in to comment.