Skip to content

Commit

Permalink
* Add of_list_exn and drop_exn functions and dscheck tests for these …
Browse files Browse the repository at this point in the history
…functions

* Rename Spsc_queue.size to length for uniformization.

* Add examples in documentation

* Add bench for spsc_unsafe.
  • Loading branch information
lyrm committed Nov 26, 2024
1 parent 2b56b33 commit 2b50833
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 96 deletions.
77 changes: 21 additions & 56 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,51 +1,18 @@
open Multicore_bench

let run_one ~unsafe ~budgetf ?(size_exponent = 3)
?(n_msgs = 80 * Util.iter_factor) () =
let init _ = () in
let work, before =
if unsafe then
let module Queue = Saturn.Single_prod_single_cons_queue_unsafe in
let t = Queue.create ~size_exponent in
module type BENCH = sig
val run_suite : budgetf:float -> Metric.t list
end

let before () =
while Queue.size t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
for i = 1 to n do
Queue.push_exn t i
done
in
let work i () =
if i = 0 then
let rec loop n =
if 0 < n then
if Queue.try_push t n then loop (n - 1)
else begin
Domain.cpu_relax ();
loop n
end
in
loop n_msgs
else
let rec loop n =
if 0 < n then
match Queue.pop_opt t with
| Some _ -> loop (n - 1)
| None ->
Domain.cpu_relax ();
loop n
in
loop n_msgs
in
(work, before)
else
let module Queue = Saturn.Single_prod_single_cons_queue in
module Make (Queue : Spsc_queue_intf.SPSC_queue) : BENCH = struct
let run_one ~budgetf ?(size_exponent = 3) ?(n_msgs = 80 * Util.iter_factor) ()
=
let init _ = () in
let work, before =
let t = Queue.create ~size_exponent in

let before () =
while Queue.size t <> 0 do
while Queue.length t <> 0 do
Queue.pop_exn t |> ignore
done;
let n = Random.int ((1 lsl size_exponent) + 1) in
Expand Down Expand Up @@ -76,21 +43,19 @@ let run_one ~unsafe ~budgetf ?(size_exponent = 3)
loop n_msgs
in
(work, before)
in
in

let config =
Printf.sprintf "2 workers, capacity %d%s" (1 lsl size_exponent)
(if unsafe then " (unsafe)" else "")
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
let config =
Printf.sprintf "2 workers, capacity %d" (1 lsl size_exponent)
in
Times.record ~budgetf ~n_domains:2 ~before ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
let run ~unsafe =
let run_suite ~budgetf =
[ 0; 3; 6; 9; 12; 15 ]
|> List.concat_map @@ fun size_exponent ->
run_one ~budgetf ~size_exponent ~unsafe ()
in
List.fold_right2
(fun safe unsafe acc -> safe :: unsafe :: acc)
(run ~unsafe:false) (run ~unsafe:true) []
run_one ~budgetf ~size_exponent ()
end

module Safe = Make (Saturn.Single_prod_single_cons_queue)
module Unsafe = Make (Saturn.Single_prod_single_cons_queue_unsafe)
5 changes: 5 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ let () =
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/spsc_queue/spsc_queue_intf.mli spsc_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml))
Expand Down
4 changes: 3 additions & 1 deletion bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ let benchmarks =
("Saturn Queue_unsafe", Bench_queue.Unsafe.run_suite);
("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite);
("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.Safe.run_suite);
( "Saturn Single_prod_single_cons_queue_unsafe",
Bench_spsc_queue.Unsafe.run_suite );
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
("Saturn Htbl", Bench_htbl.Safe.run_suite);
Expand Down
8 changes: 8 additions & 0 deletions src/spsc_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(mdx
(package saturn)
(enabled_if
(and
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files spsc_queue_intf.mli))
37 changes: 32 additions & 5 deletions src/spsc_queue/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type 'a t = {
}

exception Full
exception Empty

(* *)

let create ~size_exponent =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
Expand All @@ -52,6 +55,23 @@ let create ~size_exponent =
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

let of_list_exn ~size_exponent values =
if size_exponent < 0 || Sys.int_size - 2 < size_exponent then
invalid_arg "size_exponent out of range";
let size = 1 lsl size_exponent in
let nvalues = List.length values in
if nvalues > size then raise Full;
let array = Array.make size None in
List.iteri (fun i elt -> Array.unsafe_set array i (Some elt)) values;
let tail = Atomic.make_contended nvalues in
let s = Obj.size (Obj.repr tail) in
let tail_cache = Padded_int_ref.make s nvalues in
let head = Atomic.make_contended 0 in
let head_cache = Padded_int_ref.make s 0 in
{ array; tail; tail_cache; head; head_cache }

(* *)

type _ mono = Unit : unit mono | Bool : bool mono

let push_as (type r) t element (mono : r mono) : r =
Expand All @@ -74,9 +94,13 @@ let push_as (type r) t element (mono : r mono) : r =
let push_exn t element = push_as t element Unit
let try_push t element = push_as t element Bool

exception Empty
(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type op = Peek | Pop

let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
Expand All @@ -88,7 +112,7 @@ let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
let tail = Atomic.get t.tail in
Padded_int_ref.set t.tail_cache tail;
tail_cache == tail
then match poly with Value -> raise_notrace Empty | Option -> None
then match poly with Value | Unit -> raise_notrace Empty | Option -> None
else
let index = head land (Array.length t.array - 1) in
let v = Array.unsafe_get t.array index in
Expand All @@ -99,14 +123,17 @@ let pop_or_peek_as (type a r) (t : a t) op (poly : (a, r) poly) : r =
Atomic.incr t.head
| Peek -> ()
end;
match poly with Value -> Option.get v | Option -> v
match poly with Value -> Option.get v | Option -> v | Unit -> ()

let pop_exn t = pop_or_peek_as t Pop Value
let pop_opt t = pop_or_peek_as t Pop Option
let peek_exn t = pop_or_peek_as t Peek Value
let peek_opt t = pop_or_peek_as t Peek Option
let drop_exn t = pop_or_peek_as t Pop Unit

(* *)

let size t =
let length t =
let tail = Atomic.get t.tail in
let head = Atomic.get t.head in
tail - head
151 changes: 134 additions & 17 deletions src/spsc_queue/spsc_queue_intf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,63 @@ module type SPSC_queue = sig
(** {1 API} *)

type 'a t
(** Represent single-producer single-consumer non-resizable queue
that works in FIFO order. *)
(** Represents a single-producer single-consumer non-resizable queue
that works in FIFO order. *)

val create : size_exponent:int -> 'a t
(** [create ~size_exponent:int] create a new single-producer single-consumer
(** [create ~size_exponent:int] creates a new single-producer single-consumer
queue of maximum size [2^size_exponent] and initially empty. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when
called from either consumer or producer domain. Otherwise, it is safe to
call but provides only an *indication* of the size of the structure. *)
val of_list_exn : size_exponent:int -> 'a list -> 'a t
(** [of_list_exn ~size_exponent list] creates a new queue from a list.
@raises Full if the length of [list] is greater than [2^size_exponent].
🐌 This is a linear-time operation.
{[
# open Saturn.Single_prod_single_cons_queue
# let t : int t = of_list_exn ~size_exponent:6 [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# pop_opt t
- : int option = Some 2
]}
*)

val length : 'a t -> int
(** [length] returns the length of the queue. This method linearizes only when
called from either the consumer or producer domain. Otherwise, it is safe to
call but provides only an *indication* of the size of the structure. *)

(** {2 Producer functions} *)

exception Full
(** Raised when {!push_exn} is applied to a full queue. *)

val push_exn : 'a t -> 'a -> unit
(** [push qeueu elt] adds the element [elt] at the end of the [queue].
(** [push queue elt] adds the element [elt] at the end of the [queue].
This method can be used by at most one domain at a time.
@raise Full if the [queue] is full. *)
@raises Full if the [queue] is full. *)

val try_push : 'a t -> 'a -> bool
(** [try_push qeueue elt] tries to add the element [elt] at the end of the
(** [try_push queue elt] tries to add the element [elt] at the end of the
[queue]. If the queue [q] is full, [false] is returned. This method can be
used by at most one domain at a time. *)

(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop_exn} or {!peek_exn} is applied to an empty queue. *)
(** Raised when {!pop_exn}, {!peek_exn}, or {!drop_exn} is applied to an empty
queue. *)

val pop_exn : 'a t -> 'a
(** [pop_exn queue] removes and returns the first element in [queue]. This
method can be used by at most one domain at a time.
method can be used by at most one domain at a time.
@raise Empty if the [queue] is empty. *)
@raises Empty if the [queue] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt queue] removes and returns [Some] of the first element of the
Expand All @@ -50,12 +69,110 @@ module type SPSC_queue = sig

val peek_exn : 'a t -> 'a
(** [peek_exn queue] returns the first element in [queue] without removing it.
This method can be used by at most one domain at a time.
This method can be used by at most one domain at a time.
@raise Empty if the [queue] is empty. *)
@raises Empty if the [queue] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns [Some] of the first element in queue [q], or [None]
if the queue is empty. This method can be used by at most one domain at a
(** [peek_opt queue] returns [Some] of the first element in [queue], or [None]
if the queue is empty. This method can be used by at most one domain at a
time. *)

val drop_exn : 'a t -> unit
(** [drop_exn queue] removes the top element of the [queue].
@raises Empty if the [queue] is empty. *)

(** {1 Examples} *)

(** {2 Sequential example} *)

(** {[
# open Saturn.Single_prod_single_cons_queue
# let t : int t = create ~size_exponent:2
val t : int t = <abstr>
# push_exn t 1
- : unit = ()
# push_exn t 2
- : unit = ()
# try_push t 3
- : bool = true
# try_push t 4
- : bool = true
# try_push t 5
- : bool = false
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# drop_exn t
- : unit = ()
# pop_exn t
- : int = 3
# pop_opt t
- : int option = Some 4
# pop_exn t
Exception: Saturn__Spsc_queue.Empty.
]} *)

(** {2 Parallel example}
Note: The barrier is used in this example solely to make the results more
interesting by increasing the likelihood of parallelism. Spawning a domain is
a costly operation, especially compared to the relatively small amount of work
being performed here. In practice, using a barrier in this manner is unnecessary.
{@ocaml non-deterministic[
# open Saturn.Single_prod_single_cons_queue
# let t : int t = create ~size_exponent:5
val t : int t = <abstr>
# let nwork = 5
val nwork : int = 5
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let consumer_work () =
(* Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done; *)
let rec loop n =
if n < 1 then ()
else
(Domain.cpu_relax ();
match pop_opt t with
| Some p -> Format.printf "Popped %d\n%!" p; loop (n-1)
| None -> loop n)
in
loop nwork
val consumer_work : unit -> unit = <fun>
# let producer_work () =
(* Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done; *)
for i = 1 to nwork do
Domain.cpu_relax ();
try_push t i |> ignore;
Format.printf "Pushed %d\n%!" i
done
val producer_work : unit -> unit = <fun>
# let consumer = Domain.spawn consumer_work
val consumer : unit Domain.t = <abstr>
# let producer = Domain.spawn producer_work
Pushed 1
Popped 1
Pushed 2
Popped 2
Pushed 3
Popped 3
Pushed 4
Popped 4
Popped 5
Pushed 5
val producer : unit Domain.t = <abstr>
# Domain.join consumer
- : unit = ()
# Domain.join producer
- : unit = ()
]} *)
end
Loading

0 comments on commit 2b50833

Please sign in to comment.