diff --git a/bench/bench_bounded_queue.ml b/bench/bench_bounded_queue.ml new file mode 100644 index 00000000..aae68491 --- /dev/null +++ b/bench/bench_bounded_queue.ml @@ -0,0 +1,91 @@ +open Multicore_bench + +module type BENCH = sig + val run_suite : budgetf:float -> Metric.t list +end + +module Make (Bounded_queue : Bounded_queue_intf.BOUNDED_QUEUE) : BENCH = struct + let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Bounded_queue.create () in + + let op push = + if push then Bounded_queue.try_push t 101 |> ignore + else Bounded_queue.pop_opt t |> ignore + in + + let init _ = + assert (Bounded_queue.is_empty t); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" + ~config:"one domain" + + let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) + ?(n_msgs = 50 * Util.iter_factor) () = + let n_domains = n_adders + n_takers in + + let t = Bounded_queue.create () in + + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + assert (Bounded_queue.is_empty t); + Atomic.set n_msgs_to_take n_msgs; + Atomic.set n_msgs_to_add n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Bounded_queue.try_push t i |> ignore + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if n <> 0 then + let rec loop n = + if 0 < n then begin + match Bounded_queue.pop_opt t with + | None -> + Domain.cpu_relax (); + loop n + | Some _ -> loop (n - 1) + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + + let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2 ] [ 1; 2 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + run_one ~budgetf ~n_adders ~n_takers ()) +end + +module Safe = Make (Saturn.Bounded_queue) +module Unsafe = Make (Saturn.Bounded_queue_unsafe) diff --git a/bench/dune b/bench/dune index 671a62c5..cf68fa33 100644 --- a/bench/dune +++ b/bench/dune @@ -17,6 +17,11 @@ let () = (copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml)) (package saturn)) +(rule + (action + (copy ../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml)) + (package saturn)) + (test (package saturn) (name main) diff --git a/bench/main.ml b/bench/main.ml index c072d174..4cf7f0dd 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -2,6 +2,8 @@ let benchmarks = [ ("Saturn Queue", Bench_queue.Safe.run_suite); ("Saturn Queue_unsafe", Bench_queue.Unsafe.run_suite); + ("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite); + ("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite); ("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite); ("Saturn Size", Bench_size.run_suite); ("Saturn Skiplist", Bench_skiplist.run_suite); diff --git a/src/bounded_queue/bounded_queue.body.ml b/src/bounded_queue/bounded_queue.body.ml new file mode 100644 index 00000000..6e44356e --- /dev/null +++ b/src/bounded_queue/bounded_queue.body.ml @@ -0,0 +1,190 @@ +(* Copyright (c) 2023-2024, Vesa Karvonen + Copyright (c) 2024, Carine Morel + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH + REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, + INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM + LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR + OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR + PERFORMANCE OF THIS SOFTWARE. *) + +let[@inline] get_capacity (Node r : (_, [< `Node ]) node) = r.capacity + +let[@inline] set_capacity (Node r : (_, [< `Node ]) node) value = + r.capacity <- value + +let[@inline] get_counter (Node r : (_, [< `Node ]) node) = r.counter + +let[@inline] set_counter (Node r : (_, [< `Node ]) node) value = + r.counter <- value + +type 'a t = { + head : ('a, [ `Node ]) node Atomic.t; + capacity : int; + tail : ('a, [ `Node ]) node Atomic.t; +} + +exception Full +exception Empty + +let create ?(capacity = Int.max_int) () = + let value = Obj.magic () in + let node = make_node ~value ~capacity ~counter:0 Null in + let head = Atomic.make_contended node and tail = Atomic.make_contended node in + { head; capacity; tail } + +let of_list_exn ?(capacity = Int.max_int) list : 'a t = + let len = List.length list in + if len > capacity then raise Full + else + match list |> List.rev with + | [] -> create ~capacity () + | hd :: tl -> + let tail = + make_node ~value:hd ~capacity:(capacity - len - 1) ~counter:len Null + in + let _, _, next = + List.fold_left + (fun (counter, capacity, next) value -> + ( counter - 1, + capacity + 1, + make_node ~value ~capacity ~counter next )) + (len - 1, capacity - len, tail) + tl + in + let head = + Atomic.make_contended + (make_node ~value:(Obj.magic ()) ~capacity ~counter:0 next) + in + + { head; capacity; tail = Atomic.make tail } + +let capacity_of t = t.capacity + +let is_empty t = + let head = Atomic.get t.head in + fenceless_get_next head == Link Null + +let is_full t = + let tail = Atomic.get t.tail in + let capacity = get_capacity tail in + if capacity = 0 then begin + let old_head = Atomic.get t.head in + let length = get_counter tail - get_counter old_head in + let capacity = t.capacity - length in + if 0 < capacity then begin + set_capacity tail capacity; + false + end + else true + end + else false + +let rec snapshot t = + let head = Atomic.get t.head in + let tail = fenceless_get t.tail in + match fenceless_get_next tail with + | Link (Node _ as node) -> + Atomic.compare_and_set t.tail tail node |> ignore; + snapshot t + | Link Null -> if Atomic.get t.head != head then snapshot t else (head, tail) + +let length t = + let head, tail = snapshot t in + get_counter tail - get_counter head + +(* *) + +type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly + +let rec peek_as : type a r. a t -> (a, r) poly -> r = + fun t poly -> + let old_head = Atomic.get t.head in + match fenceless_get_next old_head with + | Link Null -> ( match poly with Value -> raise Empty | Option -> None) + | Link (Node r) -> ( + let value = r.value in + if Atomic.get t.head != old_head then peek_as t poly + else match poly with Value -> value | Option -> Some value) + +(* *) + +type ('a, _) poly2 = + | Option : ('a, 'a option) poly2 + | Value : ('a, 'a) poly2 + | Unit : ('a, unit) poly2 + +let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r = + fun t backoff poly -> + let old_head = Atomic.get t.head in + match fenceless_get_next old_head with + | Link Null -> ( + match poly with Option -> None | Value | Unit -> raise Empty) + | Link (Node node as new_head) -> + if Atomic.compare_and_set t.head old_head new_head then begin + let value = node.value in + node.value <- Obj.magic (); + match poly with Option -> Some value | Value -> value | Unit -> () + end + else pop_as t (Backoff.once backoff) poly + +(* *) + +let rec fix_tail tail new_tail = + let old_tail = Atomic.get tail in + if + get_next new_tail == Link Null + && not (Atomic.compare_and_set tail old_tail new_tail) + then fix_tail tail new_tail + +(* *) + +type _ mono = Bool : bool mono | Unit : unit mono + +let rec push_as : + type r. 'a t -> ('a, [ `Node ]) node -> ('a, [ `Node ]) node -> r mono -> r + = + fun t new_node old_tail mono -> + let capacity = get_capacity old_tail in + if capacity = 0 then begin + let old_head = Atomic.get t.head in + let length = get_counter old_tail - get_counter old_head in + let capacity = t.capacity - length in + if 0 < capacity then begin + set_capacity old_tail capacity; + push_as t new_node old_tail mono + end + else match mono with Unit -> raise Full | Bool -> false + end + else begin + set_capacity new_node (capacity - 1); + set_counter new_node (get_counter old_tail + 1); + if not (compare_and_set_next old_tail (Link Null) (Link new_node)) then + push_as t new_node (link_as_node (get_next old_tail)) mono + else begin + if not (Atomic.compare_and_set t.tail old_tail new_node) then + fix_tail t.tail new_node; + match mono with Unit -> () | Bool -> true + end + end + +(* *) + +let[@inline] peek_opt t = peek_as t Option +let[@inline] peek_exn t = peek_as t Value +let[@inline] pop_opt t = pop_as t Backoff.default Option +let[@inline] pop_exn t = pop_as t Backoff.default Value +let[@inline] drop_exn t = pop_as t Backoff.default Unit + +let[@inline] try_push t value = + let new_node = make_node ~value ~capacity:0 ~counter:0 Null in + push_as t new_node (Atomic.get t.tail) Bool + +let[@inline] push_exn t value = + let new_node = make_node ~value ~capacity:0 ~counter:0 Null in + push_as t new_node (Atomic.get t.tail) Unit diff --git a/src/bounded_queue/bounded_queue.head_safe.ml b/src/bounded_queue/bounded_queue.head_safe.ml new file mode 100644 index 00000000..e30f3338 --- /dev/null +++ b/src/bounded_queue/bounded_queue.head_safe.ml @@ -0,0 +1,28 @@ +type ('a, _) node = + | Null : ('a, [> `Null ]) node + | Node : { + next : 'a link Atomic.t; + mutable value : 'a; + mutable capacity : int; + mutable counter : int; + } + -> ('a, [> `Node ]) node + +and 'a link = Link : ('a, [< `Null | `Node ]) node -> 'a link [@@unboxed] + +let[@inline] make_node ~value ~capacity ~counter next = + Node { next = Atomic.make (Link next); value; capacity; counter } + +let[@inline] link_as_node (Link n) : (_, [< `Node ]) node = + match n with Null -> assert false | Node _ as node -> node + +let[@inline] get_next (Node node : (_, [< `Node ]) node) = Atomic.get node.next + +let[@inline] fenceless_get_next (Node node : (_, [< `Node ]) node) = + Atomic.get node.next + +let[@inline] compare_and_set_next (Node node : (_, [< `Node ]) node) before + after = + Atomic.compare_and_set node.next before after + +let fenceless_get = Atomic.get diff --git a/src/bounded_queue/bounded_queue.head_unsafe.ml b/src/bounded_queue/bounded_queue.head_unsafe.ml new file mode 100644 index 00000000..1c397046 --- /dev/null +++ b/src/bounded_queue/bounded_queue.head_unsafe.ml @@ -0,0 +1,31 @@ +module Atomic = Multicore_magic.Transparent_atomic + +type ('a, _) node = + | Null : ('a, [> `Null ]) node + | Node : { + mutable _next : 'a link; + mutable value : 'a; + mutable capacity : int; + mutable counter : int; + } + -> ('a, [> `Node ]) node + +and 'a link = Link : ('a, [< `Null | `Node ]) node -> 'a link [@@unboxed] + +let[@inline] make_node ~value ~capacity ~counter next = + Node { _next = Link next; value; capacity; counter } + +external link_as_node : 'a link -> ('a, [ `Node ]) node = "%identity" + +external next_as_atomic : ('a, [< `Node ]) node -> 'a link Atomic.t + = "%identity" + +let[@inline] get_next node = Atomic.get (next_as_atomic node) + +let[@inline] fenceless_get_next node = + Atomic.fenceless_get (next_as_atomic node) + +let[@inline] compare_and_set_next node before after = + Atomic.compare_and_set (next_as_atomic node) before after + +let fenceless_get = Atomic.fenceless_get diff --git a/src/bounded_queue/bounded_queue.mli b/src/bounded_queue/bounded_queue.mli new file mode 100644 index 00000000..d64f6612 --- /dev/null +++ b/src/bounded_queue/bounded_queue.mli @@ -0,0 +1 @@ +include Bounded_queue_intf.BOUNDED_QUEUE diff --git a/src/bounded_queue/bounded_queue_intf.mli b/src/bounded_queue/bounded_queue_intf.mli new file mode 100644 index 00000000..509f822c --- /dev/null +++ b/src/bounded_queue/bounded_queue_intf.mli @@ -0,0 +1,172 @@ +(* Copyright (c) 2023-2024, Vesa Karvonen + Copyright (c) 2024, Carine Morel + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH + REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, + INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM + LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR + OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR + PERFORMANCE OF THIS SOFTWARE. *) + +module type BOUNDED_QUEUE = sig + (** Lock-free bounded Queue. + + This module implements a lock-free bounded queue based on Michael-Scott's queue + algorithm. Adding a capacity to this algorithm adds a general overhead to the + operations, and thus, it is recommended to use the unbounded queue + {!Saturn.Queue} if you don't need it. + *) + + (** {1 API} *) + + type 'a t + (** Represents a lock-free bounded queue holding elements of type ['a]. *) + + val create : ?capacity:int -> unit -> 'a t + (** [create ~capacity ()] creates a new empty bounded queue with a maximum + capacity of [capacity]. The default [capacity] value is [Int.max_int].*) + + val of_list_exn : ?capacity:int -> 'a list -> 'a t + (** [of_list_exn ~capacity list] creates a new queue from a list. + + @raises Full if the length of [list] is greater than [capacity]. + + {[ + # open Saturn.Bounded_queue + # let t : int t = of_list_exn [1;2;3;4] + val t : int t = + # pop_opt t + - : int option = Some 1 + # pop_opt t + - : int option = Some 2 + # length t + - : int = 2 + ]} + *) + + val length : 'a t -> int + (** [length queue] returns the number of elements currently in the [queue]. *) + + val capacity_of : 'a t -> int + (** [capacity_of queue] returns the maximum number of elements that the [queue] + can hold. *) + + val is_empty : 'a t -> bool + (** [is_empty queue] returns [true] if the [queue] is empty, otherwise [false]. *) + + val is_full : 'a t -> bool + (** [is_full queue] returns [true] if the [queue] is full, otherwise [false]. *) + + (** {2 Consumer functions} *) + + exception Empty + (** Raised when {!pop_exn}, {!peek_exn}, or {!drop_exn} is applied to an empty + stack. *) + + val peek_exn : 'a t -> 'a + (** [peek_exn queue] returns the first element of the [queue] without removing it. + + @raises Empty if the [queue] is empty. *) + + val peek_opt : 'a t -> 'a option + (** [peek_opt queue] returns [Some] of the first element of the [queue] without + removing it, or [None] if the [queue] is empty. *) + + val pop_exn : 'a t -> 'a + (** [pop_exn queue] removes and returns the first element of the [queue]. + + @raises Empty if the [queue] is empty. *) + + val pop_opt : 'a t -> 'a option + (** [pop_opt queue] removes and returns [Some] of the first element of the [queue], + or [None] if the [queue] is empty. *) + + val drop_exn : 'a t -> unit + (** [drop_exn queue] removes the top element of the [queue]. + @raises Empty if the [queue] is empty. *) + + (** {2 Producer functions} *) + + exception Full + (** Raised when {!push_exn} or {!push_all_exn} is applied to a full queue. *) + + val push_exn : 'a t -> 'a -> unit + (** [push_exn queue element] adds [element] at the end of the [queue]. + + @raises Full if the [queue] is full. *) + + val try_push : 'a t -> 'a -> bool + (** [try_push queue element] tries to add [element] at the end of the [queue]. + Returns [true] if the element was successfully added, or [false] if the + queue is full. *) +end + +(** {1 Examples} + An example top-level session: + {[ + # open Saturn.Bounded_queue + # let t : int t = create ~capacity:3 () + val t : int t = + # try_push t 1 + - : bool = true + # try_push t 2 + - : bool = true + # push_exn t 3 + - : unit = () + # push_exn t 4 + Exception: Saturn__Bounded_queue.Full. + # try_push t 4 + - : bool = false + # pop_exn t + - : int = 1 + # peek_opt t + - : int option = Some 2 + # drop_exn t + - : unit = () + # pop_opt t + - : int option = Some 3 + # pop_opt t + - : int option = None + # pop_exn t + Exception: Saturn__Bounded_queue.Empty.]} + + A multicore example: + {@ocaml non-deterministic[ + # open Saturn.Bounded_queue + # let t :int t = create ~capacity:4 () + val t : int t = + + # let barrier = Atomic.make 2 + val barrier : int Atomic.t = + + # let pusher () = + Atomic.decr barrier; + while Atomic.get barrier != 0 do Domain.cpu_relax () done; + List.init 8 (fun i -> i) + |> List.map (fun i -> Domain.cpu_relax (); try_push t i) + val pusher : unit -> bool list = + + # let popper () = + Atomic.decr barrier; + while Atomic.get barrier != 0 do Domain.cpu_relax () done; + List.init 8 (fun i -> Domain.cpu_relax (); pop_opt t) + val popper : unit -> int option list = + + # let domain_pusher = Domain.spawn pusher + val domain_pusher : bool list Domain.t = + + # let domain_popper = Domain.spawn popper + val domain_popper : int option list Domain.t = + + # Domain.join domain_pusher + - : bool list = [true; true; true; true; true; false; true; true] + # Domain.join domain_popper + - : int option list = [None; None; Some 0; None; Some 1; Some 2; Some 3; Some 4] + ]} + + *) diff --git a/src/bounded_queue/bounded_queue_unsafe.mli b/src/bounded_queue/bounded_queue_unsafe.mli new file mode 100644 index 00000000..d64f6612 --- /dev/null +++ b/src/bounded_queue/bounded_queue_unsafe.mli @@ -0,0 +1 @@ +include Bounded_queue_intf.BOUNDED_QUEUE diff --git a/src/bounded_queue/dune b/src/bounded_queue/dune new file mode 100644 index 00000000..843ccd9a --- /dev/null +++ b/src/bounded_queue/dune @@ -0,0 +1,28 @@ +(rule + (action + (with-stdout-to + bounded_queue.ml + (progn + (echo "# 1 \"bounded_queue.head_safe.ml\"\n") + (cat bounded_queue.head_safe.ml) + (echo "# 1 \"bounded_queue.body.ml\"\n") + (cat bounded_queue.body.ml))))) + +(rule + (action + (with-stdout-to + bounded_queue_unsafe.ml + (progn + (echo "# 1 \"bounded_queue.head_unsafe.ml\"\n") + (cat bounded_queue.head_unsafe.ml) + (echo "# 1 \"bounded_queue.body.ml\"\n") + (cat bounded_queue.body.ml))))) + +(mdx + (package saturn) + (enabled_if + (and + (<> %{os_type} Win32) + (>= %{ocaml_version} 5.0.0))) + (libraries saturn) + (files bounded_queue_intf.mli)) diff --git a/src/dune b/src/dune index edec87ee..fa8a6d27 100644 --- a/src/dune +++ b/src/dune @@ -12,7 +12,7 @@ let () = (library (name saturn) (public_name saturn) - (modules_without_implementation htbl_intf) + (modules_without_implementation htbl_intf bounded_queue_intf) (libraries backoff multicore-magic |} ^ maybe_threads ^ {| )) diff --git a/src/saturn.ml b/src/saturn.ml index 7da90e4d..5441421f 100644 --- a/src/saturn.ml +++ b/src/saturn.ml @@ -28,6 +28,8 @@ Copyright (c) 2017, Nicolas ASSOUAD module Queue = Michael_scott_queue module Queue_unsafe = Michael_scott_queue_unsafe +module Bounded_queue = Bounded_queue +module Bounded_queue_unsafe = Bounded_queue_unsafe module Stack = Treiber_stack module Bounded_stack = Bounded_stack module Work_stealing_deque = Ws_deque diff --git a/src/saturn.mli b/src/saturn.mli index 52f9f821..21b5c526 100644 --- a/src/saturn.mli +++ b/src/saturn.mli @@ -34,6 +34,8 @@ module Queue = Michael_scott_queue module Queue_unsafe = Michael_scott_queue_unsafe module Stack = Treiber_stack module Bounded_stack = Bounded_stack +module Bounded_queue = Bounded_queue +module Bounded_queue_unsafe = Bounded_queue_unsafe module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe diff --git a/test/bounded_queue/bounded_queues/bounded_queues.ml b/test/bounded_queue/bounded_queues/bounded_queues.ml new file mode 100644 index 00000000..13853976 --- /dev/null +++ b/test/bounded_queue/bounded_queues/bounded_queues.ml @@ -0,0 +1,17 @@ +module type Bounded_queue_tests = sig + include Bounded_queue_intf.BOUNDED_QUEUE + + val name : string +end + +module Bounded_queue : Bounded_queue_tests = struct + include Saturn.Bounded_queue + + let name = "Bounded_queue_safe" +end + +module Bounded_queue_unsafe : Bounded_queue_tests = struct + include Saturn.Bounded_queue_unsafe + + let name = "Bounded_queue_unsafe" +end diff --git a/test/bounded_queue/bounded_queues/dune b/test/bounded_queue/bounded_queues/dune new file mode 100644 index 00000000..6e281fc5 --- /dev/null +++ b/test/bounded_queue/bounded_queues/dune @@ -0,0 +1,10 @@ +(rule + (action + (copy + ../../../src/bounded_queue/bounded_queue_intf.mli + bounded_queue_intf.ml)) + (package saturn)) + +(library + (name bounded_queues) + (libraries saturn)) diff --git a/test/bounded_queue/dscheck_bounded_queue.ml b/test/bounded_queue/dscheck_bounded_queue.ml new file mode 100644 index 00000000..239498ca --- /dev/null +++ b/test/bounded_queue/dscheck_bounded_queue.ml @@ -0,0 +1,382 @@ +[@@@warning "-32"] + +module Atomic = Dscheck.TracedAtomic +module Cue = Bounded_queue + +(* Dscheck only tests the safe implementation of Bounded_queue. To make Bounded_queue_unsafe compatible with Dscheck, it needs to be modified to essentially become the safe version. *) + +let drain cue = + let rec pop_until_empty acc = + match Cue.pop_opt cue with + | None -> acc |> List.rev + | Some v -> pop_until_empty (v :: acc) + in + pop_until_empty [] + +let push_pop () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let items_total = 4 in + + (* producer *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + Cue.try_push cue i |> ignore + done); + + (* consumer *) + let popped = ref [] in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + begin + match Cue.pop_opt cue with + | None -> () + | Some v -> popped := v :: !popped + end; + (* Ensure is_empty does not interfere with other functions *) + Cue.is_empty cue |> ignore + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> + let remaining = drain cue in + let pushed = List.init items_total (fun x -> x + 1) in + List.sort Int.compare (!popped @ remaining) = pushed))) + +let is_empty () = + Atomic.trace (fun () -> + let cue = Cue.create () in + + (* producer *) + Atomic.spawn (fun () -> Cue.try_push cue 1 |> ignore); + + (* consumer *) + let res = ref false in + Atomic.spawn (fun () -> + match Cue.pop_opt cue with + | None -> res := true + | Some _ -> res := Cue.is_empty cue); + + (* checks*) + Atomic.final (fun () -> Atomic.check (fun () -> !res))) + +let push_length_is_full () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let items_total = 4 in + + Cue.try_push cue 0 |> ignore; + + (* producer *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + Cue.try_push cue i |> ignore + done); + + (* consumer *) + let length_res = ref [] in + let is_full_res = ref false in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + length_res := Cue.length cue :: !length_res; + is_full_res := Cue.is_full cue || !is_full_res + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> not !is_full_res); + + Atomic.check (fun () -> + let pushed = drain cue in + pushed = List.init (items_total + 1) Fun.id); + + Atomic.check (fun () -> + !length_res |> List.rev = List.sort compare !length_res + && List.for_all + (fun x -> x >= 1 && x <= items_total + 1) + !length_res))) + +let push_length_is_full_with_capacity () = + Atomic.trace (fun () -> + let capacity = 3 in + let cue = Cue.create ~capacity () in + let items_total = 5 in + + Cue.try_push cue 0 |> ignore; + + (* producer *) + Atomic.spawn (fun () -> + for i = 1 to items_total - 1 do + Cue.try_push cue i |> ignore + done); + + (* consumer *) + let length_res = ref [] in + let test = ref true in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + let len = Cue.length cue in + length_res := len :: !length_res; + test := if len < capacity then !test else !test && Cue.is_full cue + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> !test); + + Atomic.check (fun () -> + let pushed = drain cue in + pushed = List.init capacity Fun.id); + + Atomic.check (fun () -> + !length_res |> List.rev = List.sort compare !length_res + && List.for_all (fun x -> x >= 1 && x <= capacity) !length_res))) + +let push_drop () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let items_total = 4 in + + (* producer *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + Cue.try_push cue i |> ignore + done); + + (* consumer *) + let dropped = ref 0 in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + match Cue.drop_exn cue with + | () -> dropped := !dropped + 1 + | exception Cue.Empty -> () + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> + let remaining = drain cue in + remaining + = List.init (items_total - !dropped) (fun x -> x + !dropped + 1)))) + +let push_pop_with_capacity () = + Atomic.trace (fun () -> + let cue = Cue.create ~capacity:2 () in + let items_total = 4 in + + (* producer *) + let pushed = Array.make items_total false in + Atomic.spawn (fun () -> + Array.iteri (fun i _ -> pushed.(i) <- Cue.try_push cue i) pushed); + + (* consumer *) + let popped = Array.make items_total None in + Atomic.spawn (fun () -> + Array.iteri (fun i _ -> popped.(i) <- Cue.pop_opt cue) popped); + (* checks*) + Atomic.final (fun () -> + let popped = Array.to_list popped |> List.filter_map Fun.id in + let remaining = drain cue in + Atomic.check (fun () -> + let xor a b = (a && not b) || ((not a) && b) in + try + Array.iteri + (fun i elt -> + if elt then begin + if not @@ xor (List.mem i remaining) (List.mem i popped) + then raise Exit + end + else if List.mem i remaining || List.mem i popped then + raise Exit) + pushed; + true + with _ -> false))) + +let push_push () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let items_total = 6 in + + (* two producers *) + for i = 0 to 1 do + Atomic.spawn (fun () -> + for j = 1 to items_total / 2 do + (* even nums belong to thr 1, odd nums to thr 2 *) + Cue.try_push cue (i + (j * 2)) |> ignore + done) + done; + + (* checks*) + Atomic.final (fun () -> + let items = drain cue in + + (* got the same number of items out as in *) + Atomic.check (fun () -> items_total = List.length items); + + (* they are in fifo order *) + let odd, even = List.partition (fun v -> v mod 2 == 0) items in + + Atomic.check (fun () -> List.sort Int.compare odd = odd); + Atomic.check (fun () -> List.sort Int.compare even = even))) + +let push_push_with_capacity () = + Atomic.trace (fun () -> + let capacity = 3 in + let cue = Cue.create ~capacity () in + let items_total = 6 in + + (* two producers *) + for i = 0 to 1 do + Atomic.spawn (fun () -> + for j = 1 to items_total / 2 do + (* even nums belong to thr 1, odd nums to thr 2 *) + Cue.try_push cue (i + (j * 2)) |> ignore + done) + done; + + (* checks*) + Atomic.final (fun () -> + let items = drain cue in + + (* got the same number of items out as in *) + Atomic.check (fun () -> capacity = List.length items))) + +let pop_pop () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let items_total = 4 in + + for i = 1 to items_total do + Cue.try_push cue i |> ignore + done; + + (* two consumers *) + let lists = [ ref []; ref [] ] in + List.iter + (fun list -> + Atomic.spawn (fun () -> + for _ = 1 to items_total / 2 do + (* even nums belong to thr 1, odd nums to thr 2 *) + list := Option.get (Cue.pop_opt cue) :: !list + done) + |> ignore) + lists; + + (* checks*) + Atomic.final (fun () -> + let l1 = !(List.nth lists 0) in + let l2 = !(List.nth lists 1) in + + (* got the same number of items out as in *) + Atomic.check (fun () -> items_total = List.length l1 + List.length l2); + + (* they are in fifo order *) + Atomic.check (fun () -> List.sort Int.compare l1 = List.rev l1); + Atomic.check (fun () -> List.sort Int.compare l2 = List.rev l2))) + +let two_domains () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let n1, n2 = (1, 2) in + + (* two producers *) + let lists = + [ + (List.init n1 (fun i -> i), ref []); + (List.init n2 (fun i -> i + n1), ref []); + ] + in + List.iter + (fun (lpush, lpop) -> + Atomic.spawn (fun () -> + List.iter + (fun elt -> + (* even nums belong to thr 1, odd nums to thr 2 *) + Cue.try_push cue elt |> ignore; + lpop := Option.get (Cue.pop_opt cue) :: !lpop) + lpush) + |> ignore) + lists; + + (* checks*) + Atomic.final (fun () -> + let lpop1 = !(List.nth lists 0 |> snd) in + let lpop2 = !(List.nth lists 1 |> snd) in + + (* got the same number of items out as in *) + Atomic.check (fun () -> List.length lpop1 = 1); + Atomic.check (fun () -> List.length lpop2 = 2); + + (* no element are missing *) + Atomic.check (fun () -> + List.sort Int.compare (lpop1 @ lpop2) + = List.init (n1 + n2) (fun i -> i)))) + +let two_domains_more_pop () = + Atomic.trace (fun () -> + let cue = Cue.create () in + let n1, n2 = (2, 1) in + + (* two producers *) + let lists = + [ + (List.init n1 (fun i -> i), ref []); + (List.init n2 (fun i -> i + n1), ref []); + ] + in + List.iter + (fun (lpush, lpop) -> + Atomic.spawn (fun () -> + List.iter + (fun elt -> + Cue.try_push cue elt |> ignore; + lpop := Cue.pop_opt cue :: !lpop; + lpop := Cue.pop_opt cue :: !lpop) + lpush) + |> ignore) + lists; + + (* checks*) + Atomic.final (fun () -> + let lpop1 = + !(List.nth lists 0 |> snd) + |> List.filter Option.is_some |> List.map Option.get + in + let lpop2 = + !(List.nth lists 1 |> snd) + |> List.filter Option.is_some |> List.map Option.get + in + + (* got the same number of items out as in *) + Atomic.check (fun () -> + n1 + n2 = List.length lpop1 + List.length lpop2); + + (* no element are missing *) + Atomic.check (fun () -> + List.sort Int.compare (lpop1 @ lpop2) + = List.init (n1 + n2) (fun i -> i)))) + +let tests = + let open Alcotest in + [ + ( "basic", + [ + test_case "1-producer-1-consumer" `Slow push_pop; + test_case "push-length-is_full" `Slow push_length_is_full; + test_case "push-length-is_full-capacity" `Slow + push_length_is_full_with_capacity; + test_case "2-domains-is_empty" `Slow is_empty; + test_case "1-producer-1-consumer-capacity" `Slow push_pop_with_capacity; + test_case "1-push-1-drop" `Slow push_drop; + test_case "2-producers" `Slow push_push; + test_case "2-producers-capacity" `Slow push_push_with_capacity; + test_case "2-consumers" `Slow pop_pop; + test_case "2-domains" `Slow two_domains; + test_case "2-domains-more-pops" `Slow two_domains_more_pop; + ] ); + ] + +let () = + let open Alcotest in + run "dscheck_bounded_queue" tests diff --git a/test/bounded_queue/dune b/test/bounded_queue/dune new file mode 100644 index 00000000..3e1b23b7 --- /dev/null +++ b/test/bounded_queue/dune @@ -0,0 +1,57 @@ +(rule + (action + (copy ../../src/bounded_queue/bounded_queue.ml bounded_queue.ml)) + (package saturn)) + +(rule + (action + (copy + ../../src/bounded_queue/bounded_queue_unsafe.ml + bounded_queue_unsafe.ml)) + (package saturn)) + +(rule + (action + (copy ../../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml)) + (package saturn)) + +(test + (package saturn) + (name dscheck_bounded_queue) + (libraries alcotest atomic backoff dscheck multicore-magic-dscheck) + (build_if + (and + (>= %{ocaml_version} 5) + (not + (and + (= %{arch_sixtyfour} false) + (= %{architecture} arm))))) + (modules + bounded_queue + bounded_queue_unsafe + bounded_queue_intf + dscheck_bounded_queue) + (flags + (:standard -open Multicore_magic_dscheck))) + +(test + (package saturn) + (name stm_bounded_queue) + (modules stm_bounded_queue) + (libraries bounded_queues saturn qcheck-core qcheck-stm.stm stm_run) + (enabled_if + (= %{arch_sixtyfour} true))) + +(test + (package saturn) + (name qcheck_bounded_queue) + (libraries + bounded_queues + saturn + barrier + qcheck + qcheck-core + qcheck-alcotest + domain_shims + alcotest) + (modules qcheck_bounded_queue)) diff --git a/test/bounded_queue/qcheck_bounded_queue.ml b/test/bounded_queue/qcheck_bounded_queue.ml new file mode 100644 index 00000000..b2b847fd --- /dev/null +++ b/test/bounded_queue/qcheck_bounded_queue.ml @@ -0,0 +1,339 @@ +module Qcheck_bounded_queue (Bounded_queue : Bounded_queues.Bounded_queue_tests) = +struct + let tests_sequential = + QCheck. + [ + (* TEST 1: push *) + Test.make ~name:"push" (list int) (fun lpush -> + assume (lpush <> []); + (* Building a random Bounded_queue *) + let queue = Bounded_queue.create () in + List.iter (Bounded_queue.push_exn queue) lpush; + + (* Testing property *) + (not (Bounded_queue.is_empty queue)) + && Bounded_queue.length queue = List.length lpush); + (* TEST 1: of_list *) + Test.make ~name:"of_list_exn" (list int) (fun lpush -> + assume (lpush <> []); + (* Building a random Bounded_queue *) + let queue = Bounded_queue.of_list_exn lpush in + + (* Testing property *) + (not (Bounded_queue.is_empty queue)) + && Bounded_queue.length queue = List.length lpush); + (* TEST 1bis: push *) + Test.make ~name:"of_list_exn_raise_full" + (pair (list int) small_nat) + (fun (lpush, capacity) -> + assume (lpush <> []); + (* Building a random Bounded_queue *) + match Bounded_queue.of_list_exn ~capacity lpush with + | queue -> + capacity >= List.length lpush + && (not (Bounded_queue.is_empty queue)) + && Bounded_queue.length queue = List.length lpush + | exception Bounded_queue.Full -> capacity <= List.length lpush); + (* TEST 1: push and full *) + Test.make ~name:"push_capacity" (list int) (fun lpush -> + assume (lpush <> []); + (* Building a random Bounded_queue *) + let capacity = 10 in + let queue = Bounded_queue.create ~capacity () in + List.map + (fun elt -> + try + Bounded_queue.push_exn queue elt; + true + with Bounded_queue.Full -> false) + lpush + |> List.filteri (fun i elt -> if i < capacity then not elt else elt) + |> ( = ) []); + (* TEST 2 - push, pop until empty *) + Test.make ~name:"push_pop_opt_until_empty" (list int) (fun lpush -> + (* Building a random Bounded_queue *) + let queue = Bounded_queue.create () in + List.iter (Bounded_queue.push_exn queue) lpush; + + (* Popping until [is_empty q] is true *) + let count = ref 0 in + while not (Bounded_queue.is_empty queue) do + incr count; + ignore (Bounded_queue.pop_opt queue) + done; + + (* Testing property *) + Bounded_queue.pop_opt queue = None && !count = List.length lpush); + (* TEST 3 - push, pop_opt, check FIFO *) + Test.make ~name:"fifo" (list int) (fun lpush -> + (* Building a random Bounded_queue *) + let queue = Bounded_queue.create () in + List.iter (Bounded_queue.push_exn queue) lpush; + + let out = ref [] in + let insert v = out := v :: !out in + + for _ = 1 to List.length lpush do + match Bounded_queue.pop_opt queue with + | None -> assert false + | Some v -> insert v + done; + + (* Testing property *) + lpush = List.rev !out); + (* TEST 3 - push, pop_opt, peek_opt check FIFO *) + Test.make ~name:"fifo_peek_opt" (list int) (fun lpush -> + (* Building a random Bounded_queue *) + let queue = Bounded_queue.create () in + List.iter (Bounded_queue.push_exn queue) lpush; + + let pop = ref [] in + let peek = ref [] in + let insert out v = out := v :: !out in + + for _ = 1 to List.length lpush do + match Bounded_queue.peek_opt queue with + | None -> assert false + | Some v -> ( + insert peek v; + match Bounded_queue.pop_opt queue with + | None -> assert false + | Some v -> insert pop v) + done; + + (* Testing property *) + lpush = List.rev !pop && lpush = List.rev !peek); + ] + + let tests_one_consumer_one_producer = + QCheck. + [ + (* TEST 1 - one consumer one producer: + Parallel [push] and [pop_opt]. *) + Test.make ~name:"parallel_fifo" (list int) (fun lpush -> + (* Initialization *) + let queue = Bounded_queue.create () in + let barrier = Barrier.create 2 in + + (* Producer pushes. *) + let producer = + Domain.spawn (fun () -> + Barrier.await barrier; + List.iter (Bounded_queue.push_exn queue) lpush) + in + + Barrier.await barrier; + let fifo = + List.fold_left + (fun acc item -> + let rec pop_one () = + match Bounded_queue.pop_opt queue with + | None -> + Domain.cpu_relax (); + pop_one () + | Some item' -> acc && item = item' + in + pop_one ()) + true lpush + in + let empty = Bounded_queue.is_empty queue in + + (* Ensure nothing is left behind. *) + Domain.join producer; + fifo && empty); + (* TEST 2 - one consumer one producer: + Parallel [push] and [peek_opt] and [pop_opt]. *) + Test.make ~name:"parallel_peek" (list int) (fun pushed -> + (* Initialization *) + let npush = List.length pushed in + let queue = Bounded_queue.create () in + let barrier = Barrier.create 2 in + + (* Producer pushes. *) + let producer = + Domain.spawn (fun () -> + Barrier.await barrier; + List.iter (Bounded_queue.push_exn queue) pushed) + in + + let peeked = ref [] in + let popped = ref [] in + Barrier.await barrier; + for _ = 1 to npush do + peeked := Bounded_queue.peek_opt queue :: !peeked; + popped := Bounded_queue.pop_opt queue :: !popped + done; + + Domain.join producer; + let rec check = function + | _, [], [] -> true + | pushed, None :: peeked, None :: popped -> + check (pushed, peeked, popped) + | push :: pushed, None :: peeked, Some pop :: popped + when push = pop -> + check (pushed, peeked, popped) + | push :: pushed, Some peek :: peeked, Some pop :: popped + when push = peek && push = pop -> + check (pushed, peeked, popped) + | _, _, _ -> false + in + check (pushed, List.rev @@ !peeked, List.rev @@ !popped)); + ] + + let tests_two_domains = + QCheck. + [ + (* TEST 1 - two domains doing multiple times one push then one pop_opt. + Parallel [push] and [pop_opt]. + *) + Test.make ~name:"parallel_pop_opt_push" (pair small_nat small_nat) + (fun (npush1, npush2) -> + (* Initialization *) + let queue = Bounded_queue.create () in + let barrier = Barrier.create 2 in + + (* Using these lists instead of a random one enables to + check for more properties. *) + let lpush1 = List.init npush1 (fun i -> i) in + let lpush2 = List.init npush2 (fun i -> i + npush1) in + + let work lpush = + List.map + (fun elt -> + Bounded_queue.push_exn queue elt; + Domain.cpu_relax (); + Bounded_queue.pop_opt queue) + lpush + in + + let domain1 = + Domain.spawn (fun () -> + Barrier.await barrier; + work lpush1) + in + let popped2 = + Barrier.await barrier; + work lpush2 + in + + (* As a domain always pushs before popping, all pops + succeeds. *) + let popped1 = Domain.join domain1 |> List.map Option.get in + let popped2 = popped2 |> List.map Option.get in + + (* Check 1 : no elements are missing (everyting is popped). *) + let all_elt_in = + List.sort compare (popped1 @ popped2) = lpush1 @ lpush2 + in + (* filter the elements pushed and popped by domain 1 *) + let push1_pop1 = List.filter (fun elt -> elt < npush1) popped1 in + (* filter the elements pushed by domain 2 and popped by domain 1 *) + let push2_pop1 = List.filter (fun elt -> elt >= npush1) popped1 in + (* filter the elements pushed by domain 1 and popped by domain 2 *) + let push1_pop2 = List.filter (fun elt -> elt < npush1) popped2 in + (* filter the elements pushed and popped by domain 2 *) + let push2_pop2 = List.filter (fun elt -> elt >= npush1) popped2 in + + (* all these lists must be sorted *) + let is_sorted list = List.sort compare list = list in + all_elt_in && is_sorted push1_pop1 && is_sorted push1_pop2 + && is_sorted push2_pop1 && is_sorted push2_pop2); + (* TEST 2 - + Parallel [push] and [pop_opt] with two domains + + Two domains randomly pushs and pops in parallel. They stop as + soon as they have finished pushing a list of element to + push. *) + Test.make ~name:"parallel_pop_opt_push_random" + (pair small_nat small_nat) (fun (npush1, npush2) -> + (* Initialization *) + let queue = Bounded_queue.create () in + let barrier = Barrier.create 2 in + + let lpush1 = List.init npush1 (fun i -> i) in + let lpush2 = List.init npush2 (fun i -> i + npush1) in + + let work lpush = + let consecutive_pop = ref 0 in + let rec loop lpush popped = + let what_to_do = Random.int 2 in + if what_to_do = 0 || !consecutive_pop > 10 then ( + (* randomly choosing between pushing and popping except + if too many consecutive pops have already occurred *) + consecutive_pop := 0; + match lpush with + | [] -> popped + | elt :: xs -> + Bounded_queue.push_exn queue elt; + loop xs popped) + else ( + incr consecutive_pop; + let p = Bounded_queue.pop_opt queue in + loop lpush (p :: popped)) + in + loop lpush [] + in + + let domain1 = + Domain.spawn (fun () -> + Barrier.await barrier; + work lpush1) + in + let popped2 = + Barrier.await barrier; + work lpush2 + in + + let popped1 = + Domain.join domain1 + |> List.filter (function None -> false | _ -> true) + |> List.map Option.get + in + let popped2 = + popped2 + |> List.filter (function None -> false | _ -> true) + |> List.map Option.get + in + + (* Pop everything that is still on the Bounded_queue *) + let popped3 = + let rec loop popped = + match Bounded_queue.pop_opt queue with + | None -> popped + | Some v -> loop (v :: popped) + in + loop [] + in + (* Check that no element is missing. *) + let all_n_elt_in = + List.sort compare (popped1 @ popped2 @ popped3) = lpush1 @ lpush2 + in + + all_n_elt_in); + ] +end + +let () = + let to_alcotest = List.map QCheck_alcotest.to_alcotest in + + let module Safe = Qcheck_bounded_queue (Bounded_queues.Bounded_queue) in + let name = "safe" in + let safe_tests = + [ + ("test_sequential_" ^ name, to_alcotest Safe.tests_sequential); + ( "one_cons_one_prod_" ^ name, + to_alcotest Safe.tests_one_consumer_one_producer ); + ("two_domains_" ^ name, to_alcotest Safe.tests_two_domains); + ] + in + let module Unsafe = Qcheck_bounded_queue (Bounded_queues.Bounded_queue_unsafe) in + let name = "unsafe" in + let unsafe_tests = + [ + ("test_sequential_" ^ name, to_alcotest Unsafe.tests_sequential); + ( "one_cons_one_prod_" ^ name, + to_alcotest Unsafe.tests_one_consumer_one_producer ); + ("two_domains_" ^ name, to_alcotest Unsafe.tests_two_domains); + ] + in + Alcotest.run "Bounded_queue" (safe_tests @ unsafe_tests) diff --git a/test/bounded_queue/stm_bounded_queue.ml b/test/bounded_queue/stm_bounded_queue.ml new file mode 100644 index 00000000..f9163016 --- /dev/null +++ b/test/bounded_queue/stm_bounded_queue.ml @@ -0,0 +1,110 @@ +(** Sequential and Parallel model-based tests of (bounded queue) Bounded_queue. *) + +open QCheck +open STM +module Bounded_queue = Saturn.Bounded_queue + +module STM_Bounded_queue (Bounded_queue : Bounded_queues.Bounded_queue_tests) = +struct + module Spec = struct + type cmd = + | Try_push of int + | Pop_opt + | Peek_opt + | Drop_exn + | Length + | Is_empty + | Is_full + + let show_cmd c = + match c with + | Try_push i -> "Try_push " ^ string_of_int i + | Pop_opt -> "Pop_opt" + | Peek_opt -> "Peek_opt" + | Drop_exn -> "Drop_exn" + | Length -> "Length" + | Is_empty -> "Is_empty" + | Is_full -> "Is_full" + + type state = int * int * int list + type sut = int Bounded_queue.t + + let arb_cmd _s = + let int_gen = Gen.nat in + QCheck.make ~print:show_cmd + (Gen.oneof + [ + Gen.map (fun i -> Try_push i) int_gen; + Gen.return Pop_opt; + Gen.return Peek_opt; + Gen.return Drop_exn; + Gen.return Length; + Gen.return Is_empty; + Gen.return Is_full; + ]) + + let init_state = (100, 0, []) + let init_sut () = Bounded_queue.create ~capacity:100 () + let cleanup _ = () + + let next_state c ((capacity, size, content) as s) = + match c with + | Try_push i -> + if size = capacity then s else (capacity, size + 1, i :: content) + | Pop_opt | Drop_exn -> ( + match List.rev content with + | [] -> s + | _ :: content' -> (capacity, size - 1, List.rev content')) + | Peek_opt -> s + | Is_empty -> s + | Is_full -> s + | Length -> s + + let precond _ _ = true + + let run c d = + match c with + | Try_push i -> + Res + ( bool, + match Bounded_queue.push_exn d i with + | () -> true + | exception Bounded_queue.Full -> + true (*Bounded_queue.try_push d i*) ) + | Pop_opt -> Res (option int, Bounded_queue.pop_opt d) + | Peek_opt -> Res (option int, Bounded_queue.peek_opt d) + | Drop_exn -> Res (result unit exn, protect Bounded_queue.drop_exn d) + | Is_empty -> Res (bool, Bounded_queue.is_empty d) + | Is_full -> Res (bool, Bounded_queue.is_full d) + | Length -> Res (int, Bounded_queue.length d) + + let postcond c ((capacity, size, content) : state) res = + match (c, res) with + | Try_push _, Res ((Bool, _), res) -> res = (size < capacity) + | (Pop_opt | Peek_opt), Res ((Option Int, _), res) -> ( + match List.rev content with + | [] -> res = None + | j :: _ -> res = Some j) + | Drop_exn, Res ((Result (Unit, Exn), _), res) -> ( + match List.rev content with + | [] -> res = Error Bounded_queue.Empty + | _ -> res = Ok ()) + | Is_empty, Res ((Bool, _), res) -> res = (content = []) + | Is_full, Res ((Bool, _), res) -> res = (size = capacity) + | Length, Res ((Int, _), res) -> res = size + | _, _ -> false + end + + let run () = Stm_run.run ~name:"Saturn.Bounded_queue" (module Spec) |> exit +end + +let () = + (* Since Bounded_queue and Bounded_queue_unsafe share the same implementation, it is not necessary + to test both of them. *) + Random.self_init (); + if Random.bool () then + let module Safe = STM_Bounded_queue (Bounded_queues.Bounded_queue) in + Safe.run () |> exit + else + let module Unsafe = STM_Bounded_queue (Bounded_queues.Bounded_queue_unsafe) in + Unsafe.run () |> exit