diff --git a/bench/bench_mpsc.ml b/bench/bench_mpsc.ml new file mode 100644 index 00000000..24f238e5 --- /dev/null +++ b/bench/bench_mpsc.ml @@ -0,0 +1,79 @@ +open Multicore_bench +module Queue = Saturn.Single_consumer_queue + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Queue.create () in + + let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in + + let init _ = + assert (Queue.is_empty t); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) + ?(n_msgs = 50 * Util.iter_factor) () = + let n_domains = n_adders + n_takers in + + let t = Queue.create () in + + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + assert (Queue.is_empty t); + Atomic.set n_msgs_to_take n_msgs; + Atomic.set n_msgs_to_add n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Queue.push t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if n <> 0 then + let rec loop n = + if 0 < n then begin + match Queue.pop_opt t with + | None -> + Domain.cpu_relax (); + loop n + | Some _ -> loop (n - 1) + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2; 3; 4 ] [ 1 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/main.ml b/bench/main.ml index 4cf7f0dd..a872190e 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -5,6 +5,7 @@ let benchmarks = ("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite); ("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite); ("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite); + ("Saturn Single_consumer_queue", Bench_mpsc.run_suite); ("Saturn Size", Bench_size.run_suite); ("Saturn Skiplist", Bench_skiplist.run_suite); ("Saturn Htbl", Bench_htbl.Safe.run_suite); diff --git a/src/dune b/src/dune index fa8a6d27..cdadc797 100644 --- a/src/dune +++ b/src/dune @@ -42,5 +42,5 @@ let () = (<> %{os_type} Win32) (>= %{ocaml_version} 5.0.0))) (libraries saturn) - (files treiber_stack.mli bounded_stack.mli)) + (files treiber_stack.mli bounded_stack.mli mpsc_queue.mli)) |} diff --git a/src/mpsc_queue.ml b/src/mpsc_queue.ml index f0cc4b7a..75818764 100644 --- a/src/mpsc_queue.ml +++ b/src/mpsc_queue.ml @@ -1,152 +1,129 @@ (* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. - See: "Implementing lock-free queues" - https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf - - It is simplified slightly because we don't need multiple consumers. - Therefore [head] is not atomic. *) + Based on Vesa Karvonen's example at: + https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml +*) exception Closed +exception Empty -module Node : sig - type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } - and +'a opt - - val make : next:'a opt -> 'a -> 'a t - - val none : 'a opt - (** [t.next = none] means that [t] is currently the last node. *) - - val closed : 'a opt - (** [t.next = closed] means that [t] will always be the last node. *) - - val some : 'a t -> 'a opt - val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b -end = struct - (* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *) - - type +'a opt (* special | 'a t *) - type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } - type special = Nothing | Closed - - let none : 'a. 'a opt = Obj.magic Nothing - let closed : 'a. 'a opt = Obj.magic Closed - let some (t : 'a t) : 'a opt = Obj.magic t - - let fold (opt : 'a opt) ~none:n ~some = - if opt == none then n () - else if opt == closed then raise Closed - else some (Obj.magic opt : 'a t) - - let make ~next value = { value; next = Atomic.make next } -end - -type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t } -(* [head] is the last node dequeued (or a dummy node, initially). - [head.next] gives the real first node, if not [Node.none]. - If [tail.next] is [none] then it is the last node in the queue. - Otherwise, [tail.next] is a node that is closer to the tail. *) - -let push t x = - let node = Node.(make ~next:none) x in - let rec aux () = - let p = Atomic.get t.tail in - (* While [p.next == none], [p] is the last node in the queue. *) - if Atomic.compare_and_set p.next Node.none (Node.some node) then - (* [node] has now been added to the queue (and possibly even consumed). - Update [tail], unless someone else already did it for us. *) - ignore (Atomic.compare_and_set t.tail p node : bool) - else - (* Someone else added a different node first ([p.next] is not [none]). - Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *) - Node.fold (Atomic.get p.next) - ~none:(fun () -> assert false) - ~some:(fun p_next -> - ignore (Atomic.compare_and_set t.tail p p_next : bool); - aux ()) - in - aux () - -let rec push_head t x = - let p = t.head in - let next = Atomic.get p.next in - if next == Node.closed then raise Closed; - let node = Node.make ~next x in - if Atomic.compare_and_set p.next next (Node.some node) then - if - (* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *) - next == Node.none - then ignore (Atomic.compare_and_set t.tail p node : bool) - else - ( (* If the queue wasn't empty, there's nothing to do. - Either tail isn't at head or there is some [push] thread working to update it. - Either [push] will update it directly to the new tail, or will update it to [node] - and then retry. Either way, it ends up at the real tail. *) ) - else ( - (* Someone else changed it first. This can only happen if the queue was empty. *) - assert (next == Node.none); - push_head t x) - -let rec close (t : 'a t) = - (* Mark the tail node as final. *) - let p = Atomic.get t.tail in - if not (Atomic.compare_and_set p.next Node.none Node.closed) then - (* CAS failed because [p] is no longer the tail (or is already closed). *) - Node.fold (Atomic.get p.next) - ~none:(fun () -> assert false) - (* Can't switch from another state to [none] *) - ~some:(fun p_next -> - (* Make [tail] more up-to-date if it hasn't changed already *) - ignore (Atomic.compare_and_set t.tail p p_next : bool); - (* Retry *) - close t) - -let pop_opt t = - let p = t.head in - (* [p] is the previously-popped item. *) - let node = Atomic.get p.next in - Node.fold node - ~none:(fun () -> None) - ~some:(fun node -> - t.head <- node; - let v = node.value in - node.value <- Obj.magic (); - (* So it can be GC'd *) - Some v) +(* A list where the end indicates whether the queue is closed. *) +type 'a clist = ( :: ) of 'a * 'a clist | Open | Closed -exception Empty +(* [rev_append l1 l2] is like [rev l1 @ l2] *) +let rec rev_append l1 l2 = + match l1 with + | a :: l -> rev_append l (a :: l2) + | Open -> l2 + | Closed -> assert false -let pop t = - let p = t.head in - (* [p] is the previously-popped item. *) - let node = Atomic.get p.next in - Node.fold node - ~none:(fun () -> raise Empty) - ~some:(fun node -> - t.head <- node; - let v = node.value in - node.value <- Obj.magic (); - (* So it can be GC'd *) - v) - -let peek_opt t = - let p = t.head in - (* [p] is the previously-popped item. *) - let node = Atomic.get p.next in - Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value) - -let peek t = - let p = t.head in - (* [p] is the previously-popped item. *) - let node = Atomic.get p.next in - Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value) +let[@tail_mod_cons] rec ( @ ) l1 l2 = + match l1 with + | h1 :: tl -> h1 :: (tl @ l2) + | Open -> l2 + | Closed -> assert false -let is_empty t = - Node.fold (Atomic.get t.head.next) - ~none:(fun () -> true) - ~some:(fun _ -> false) +(* *) + +(* The queue contains [head @ rev tail]. + If [tail] is non-empty then it ends in [Open]. *) +type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t } -let create () = - let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in - { tail = Atomic.make dummy; head = dummy } +let create () = { head = Open; tail = Atomic.make_contended Open } + +let[@tail_mod_cons] rec append_list_to_clist l l' = + match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l' + +let of_list l = + { head = append_list_to_clist l Open; tail = Atomic.make_contended Open } + +(* *) + +let is_empty t = + match t.head with + | _ :: _ -> false + | Closed -> raise Closed + | Open -> ( match Atomic.get t.tail with _ :: _ -> false | _ -> true) + +let close t = + match Atomic.exchange t.tail Closed with + | Closed -> raise Closed + | xs -> t.head <- t.head @ rev_append xs Closed + +(* *) + +let rec push t x = + match Atomic.get t.tail with + | Closed -> raise Closed + | before -> + let after = x :: before in + if not (Atomic.compare_and_set t.tail before after) then push t x + +let push_head t x = + match t.head with Closed -> raise Closed | before -> t.head <- x :: before + +let[@tail_mod_cons] rec append_list_to_clist l l' = + match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l' + +let rec push_all t values = + match Atomic.get t.tail with + | Closed -> raise Closed + | before -> + let after = append_list_to_clist (List.rev values) before in + if not (Atomic.compare_and_set t.tail before after) then push_all t values + +(* *) + +type ('a, _) poly = + | Option : ('a, 'a option) poly + | Value : ('a, 'a) poly + | Unit : ('a, unit) poly + +let rec pop_as : type a r. a t -> (a, r) poly -> r = + fun t poly -> + match t.head with + | x :: xs -> begin + t.head <- xs; + match poly with Option -> Some x | Value -> x | Unit -> () + end + | Closed -> raise Closed + | Open -> ( + (* We know the tail is open because we just saw the head was open + and we don't run concurrently with [close]. *) + match Atomic.exchange t.tail Open with + | Closed -> assert false + | Open -> ( + match poly with + | Option -> None + | Value | Unit -> raise Empty (* Optimise the common case *)) + | tail -> + t.head <- rev_append tail Open; + pop_as t poly) + +(* *) + +type ('a, _) poly2 = Option : ('a, 'a option) poly2 | Value : ('a, 'a) poly2 + +let rec peek_as : type a r. a t -> (a, r) poly2 -> r = + fun t poly -> + match t.head with + | x :: _ -> ( match poly with Option -> Some x | Value -> x) + | Closed -> raise Closed + | Open -> ( + (* We know the tail is open because we just saw the head was open + and we don't run concurrently with [close]. *) + match Atomic.exchange t.tail Open with + | Closed -> assert false + | Open -> ( match poly with Option -> None | Value -> raise Empty) + | tail -> + t.head <- rev_append tail Open; + peek_as t poly) + +(* *) + +let pop_opt t = pop_as t Option +let pop_exn t = pop_as t Value +let drop_exn t = pop_as t Unit +let peek_exn t = peek_as t Value +let peek_opt t = peek_as t Option diff --git a/src/mpsc_queue.mli b/src/mpsc_queue.mli index 216f4033..b9d427bd 100644 --- a/src/mpsc_queue.mli +++ b/src/mpsc_queue.mli @@ -1,27 +1,43 @@ (** Lock-free multi-producer, single-consumer, domain-safe queue without support for cancellation. - This makes a good data structure for a scheduler's run queue and - is currently (September 2022) used for Eio's scheduler. *) + This data structure is well-suited for use as a scheduler's run queue. + + **Note**: This queue does not include safety mechanisms to prevent + misuse. If consumer-only functions are called concurrently by multiple + domains, the queue may enter an unexpected state, due to data races + and a lack of linearizability. +*) + +(** {1 API} *) type 'a t -(** A queue of items of type ['a]. *) +(** Represents a single-consumer queue of items of type ['a]. *) exception Closed val create : unit -> 'a t -(** [create ()] returns a new empty queue. *) - -val is_empty : 'a t -> bool -(** [is_empty q] is [true] if calling [pop] would return [None]. - - @raise Closed if [q] is closed and empty. *) - -val close : 'a t -> unit -(** [close q] marks [q] as closed, preventing any further items from - being pushed by the producers (i.e. with {!push}). - - @raise Closed if [q] has already been closed. *) +(** [create ()] returns a new empty single-consumer queue. *) + +val of_list : 'a list -> 'a t +(** [of_list l] creates a new single-consumer queue from list [l]. + + {[ + # open Saturn.Single_consumer_queue + # let t : int t = of_list [1; 2; 3] + val t : int t = + # pop_opt t + - : int option = Some 1 + # peek_opt t + - : int option = Some 2 + # pop_opt t + - : int option = Some 2 + # pop_opt t + - : int option = Some 3 + ]} +*) + +(** {2 Producer-only functions} *) val push : 'a t -> 'a -> unit (** [push q v] adds the element [v] at the end of the queue [q]. This @@ -30,13 +46,53 @@ val push : 'a t -> 'a -> unit @raise Closed if [q] is closed. *) +val push_all : 'a t -> 'a list -> unit +(** [push_all q vs] adds all the elements [vs] at the end of the queue [q]. + This can be used safely by multiple producer domains, in parallel with + the other operations. + + @raise Closed if [q] is closed. + + {[ + # open Saturn.Single_consumer_queue + # let t : int t = create () + val t : int t = + # push_all t [1; 2; 3] + - : unit = () + # pop_opt t + - : int option = Some 1 + # peek_opt t + - : int option = Some 2 + # pop_opt t + - : int option = Some 2 + # pop_opt t + - : int option = Some 3 + # pop_exn t + Exception: Saturn__Mpsc_queue.Empty. + ]} + *) + (** {2 Consumer-only functions} *) exception Empty (** Raised when {!pop} or {!peek} is applied to an empty queue. *) -val pop : 'a t -> 'a -(** [pop q] removes and returns the first element in queue [q]. +val is_empty : 'a t -> bool +(** [is_empty q] is [true] if calling [pop] would return [None]. + This can only be used by the consumer. + + @raise Closed if [q] is closed and empty. *) + +val close : 'a t -> unit +(** [close q] marks [q] as closed, preventing any further items from + being pushed by the producers (i.e. with {!push}). This can only + be used by the consumer. + + @raise Closed if [q] has already been closed. *) + +val pop_exn : 'a t -> 'a +(** [pop_exn q] removes and returns the first element in queue [q]. This + can only be used by the consumer. @raise Empty if [q] is empty. @@ -44,20 +100,30 @@ val pop : 'a t -> 'a val pop_opt : 'a t -> 'a option (** [pop_opt q] removes and returns the first element in queue [q] or - returns [None] if the queue is empty. + returns [None] if the queue is empty. This can only be used by the + consumer. + + @raise Closed if [q] is closed and empty. *) +val drop_exn : 'a t -> unit +(** [drop_exn q] removes the first element in queue [q]. This can only be used by the consumer. + + @raise Empty if [q] is empty. + @raise Closed if [q] is closed and empty. *) -val peek : 'a t -> 'a -(** [peek q] returns the first element in queue [q]. +val peek_exn : 'a t -> 'a +(** [peek_exn q] returns the first element in queue [q]. This can only + be used by the consumer @raise Empty if [q] is empty. @raise Closed if [q] is closed and empty. *) val peek_opt : 'a t -> 'a option -(** [peek_opt q] returns the first element in queue [q] or - returns [None] if the queue is empty. +(** [peek_opt q] returns the first element in queue [q] or returns + [None] if the queue is empty. This can only be used by the + consumer. @raise Closed if [q] is closed and empty. *) @@ -67,3 +133,80 @@ val push_head : 'a t -> 'a -> unit with {!pop}, the item might be skipped). @raise Closed if [q] is closed and empty. *) + +(** {1 Examples} + An example top-level session: + {[ + # open Saturn.Single_consumer_queue + # let t : int t = create () + val t : int t = + # push t 1 + - : unit = () + # push t 42 + - : unit = () + # pop_opt t + - : int option = Some 1 + # peek_opt t + - : int option = Some 42 + # drop_exn t + - : unit = () + # pop_exn t + Exception: Saturn__Mpsc_queue.Empty.]} + + A multicore example: + {@ocaml non-deterministic[ + # open Saturn.Single_consumer_queue + # let t : (string * int) t = create () + val t : (string * int) t = + # let barrier = Atomic.make 3 + val barrier : int Atomic.t = + # let n = 10 + val n : int = 10 + + # let work_consumer () = + Atomic.decr barrier; + while Atomic.get barrier <> 0 do Domain.cpu_relax () done; + for i = 1 to n do + begin + match pop_opt t with + | None -> Printf.printf "Empty.\n%!" + | Some (s, n) -> + Printf.printf "Consumed ressource #%d from %s.\n%!" n s + end; + Domain.cpu_relax () + done; + val work_consumer : unit -> unit = + + # let work_producer id () = + Atomic.decr barrier; + while Atomic.get barrier <> 0 do Domain.cpu_relax () done; + List.init n Fun.id + |> List.iter (fun i -> push t (id , i); + Domain.cpu_relax ()) + val work_producer : string -> unit -> unit = + + # let consumer = Domain.spawn work_consumer + val consumer : unit Domain.t = + # let producerA = Domain.spawn (work_producer "A") + val producerA : unit Domain.t = + # let producerB = Domain.spawn (work_producer "B") + Empty. + Consumed ressource #0 from A. + Consumed ressource #0 from B. + Consumed ressource #1 from B. + Consumed ressource #2 from B. + Consumed ressource #3 from B. + Consumed ressource #4 from B. + Consumed ressource #5 from B. + Consumed ressource #6 from B. + Consumed ressource #7 from B. + val producerB : unit Domain.t = + + # Domain.join consumer + - : unit = () + # Domain.join producerA + - : unit = () + # Domain.join producerB + - : unit = () + ]} + *) diff --git a/test/mpsc_queue/mpsc_queue_dscheck.ml b/test/mpsc_queue/mpsc_queue_dscheck.ml index 8edf203d..152de14b 100644 --- a/test/mpsc_queue/mpsc_queue_dscheck.ml +++ b/test/mpsc_queue/mpsc_queue_dscheck.ml @@ -1,106 +1,175 @@ +module Atomic = Dscheck.TracedAtomic +module Queue = Mpsc_queue + let drain queue = - let remaining = ref 0 in - while not (Mpsc_queue.is_empty queue) do - remaining := !remaining + 1; - assert (Option.is_some (Mpsc_queue.pop_opt queue)) - done; - !remaining - -let producer_consumer () = + let rec pop_until_empty acc = + match Queue.pop_opt queue with + | None -> acc |> List.rev + | Some v -> pop_until_empty (v :: acc) + in + pop_until_empty [] + +let push_pop () = Atomic.trace (fun () -> - let queue = Mpsc_queue.create () in + let queue = Queue.create () in let items_total = 4 in (* producer *) Atomic.spawn (fun () -> - for _ = 1 to items_total - 1 do - Mpsc_queue.push queue 0 + for i = 1 to items_total do + Queue.push queue i done); (* consumer *) - let popped = ref 0 in + let popped = ref [] in Atomic.spawn (fun () -> - Mpsc_queue.push_head queue 1; for _ = 1 to items_total do - match Mpsc_queue.pop_opt queue with - | None -> () - | Some _ -> popped := !popped + 1 + begin + match Queue.pop_opt queue with + | None -> () + | Some v -> popped := v :: !popped + end; + (* Ensure is_empty does not interfere with other functions *) + Queue.is_empty queue |> ignore done); (* checks*) Atomic.final (fun () -> Atomic.check (fun () -> let remaining = drain queue in - !popped + remaining = items_total))) + let pushed = List.init items_total (fun x -> x + 1) in + List.sort Int.compare (!popped @ remaining) = pushed))) -let producer_consumer_peek () = +let is_empty () = Atomic.trace (fun () -> - let queue = Mpsc_queue.create () in - let items_total = 1 in - let pushed = List.init items_total (fun i -> i) in + let queue = Queue.create () in (* producer *) + Atomic.spawn (fun () -> Queue.push queue 1); + + (* consumer *) + let res = ref false in Atomic.spawn (fun () -> - List.iter (fun elt -> Mpsc_queue.push queue elt) pushed); + match Queue.pop_opt queue with + | None -> res := true + | Some _ -> res := Queue.is_empty queue); + + (* checks*) + Atomic.final (fun () -> Atomic.check (fun () -> !res))) + +let push_drop () = + Atomic.trace (fun () -> + let queue = Queue.create () in + let items_total = 4 in + + (* producer *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + Queue.push queue i + done); (* consumer *) - let popped = ref [] in - let peeked = ref [] in + let dropped = ref 0 in Atomic.spawn (fun () -> for _ = 1 to items_total do - peeked := Mpsc_queue.peek_opt queue :: !peeked; - popped := Mpsc_queue.pop_opt queue :: !popped + match Queue.drop_exn queue with + | () -> dropped := !dropped + 1 + | exception Queue.Empty -> () done); (* checks*) Atomic.final (fun () -> - Atomic.check (fun () -> - let rec check pushed peeked popped = - match (pushed, peeked, popped) with - | _, [], [] -> true - | _, None :: peeked, None :: popped -> - check pushed peeked popped - | push :: pushed, None :: peeked, Some pop :: popped - when push = pop -> - check pushed peeked popped - | push :: pushed, Some peek :: peeked, Some pop :: popped - when push = peek && push = pop -> - check pushed peeked popped - | _, _, _ -> false - in - check pushed (List.rev !peeked) (List.rev !popped)); Atomic.check (fun () -> let remaining = drain queue in - let popped = List.filter Option.is_some !popped in - List.length popped + remaining = items_total))) + remaining + = List.init (items_total - !dropped) (fun x -> x + !dropped + 1)))) -let two_producers () = +let push_push () = Atomic.trace (fun () -> - let queue = Mpsc_queue.create () in - let items_total = 4 in + let queue = Queue.create () in + let items_total = 6 in - (* producers *) - for _ = 1 to 2 do + (* two producers *) + for i = 0 to 1 do Atomic.spawn (fun () -> - for _ = 1 to items_total / 2 do - Mpsc_queue.push queue 0 + for j = 1 to items_total / 2 do + (* even nums belong to thr 1, odd nums to thr 2 *) + Queue.push queue (i + (j * 2)) done) done; (* checks*) + Atomic.final (fun () -> + let items = drain queue in + + (* got the same number of items out as in *) + Atomic.check (fun () -> items_total = List.length items); + + (* they are in fifo order *) + let odd, even = List.partition (fun v -> v mod 2 == 0) items in + + Atomic.check (fun () -> List.sort Int.compare odd = odd); + Atomic.check (fun () -> List.sort Int.compare even = even))) + +let two_producers_one_consumer () = + Atomic.trace (fun () -> + let ninit_push = 3 in + let queue = Queue.of_list (List.init ninit_push (fun i -> i + 1)) in + let nproducers = 3 in + let ntotal = (2 * nproducers) + ninit_push in + + (* producer 1 *) + Atomic.spawn (fun () -> + for i = 1 to nproducers do + Queue.push queue (i + ninit_push) + done); + + (* producer 2 *) + Atomic.spawn (fun () -> + for i = 1 to nproducers do + Queue.push queue (i + ninit_push + nproducers) + done); + + (* consumer *) + let popped = ref [] in + Atomic.spawn (fun () -> + for _ = 1 to 5 do + match Queue.pop_opt queue with + | None -> () + | Some v -> popped := v :: !popped + done); + + (* checks *) Atomic.final (fun () -> Atomic.check (fun () -> let remaining = drain queue in - remaining = items_total))) + let pushed = List.init ntotal (fun i -> i + 1) in + List.sort Int.compare (!popped @ remaining) = pushed); + + Atomic.check (fun () -> + let pushed_1 = + List.filter (fun x -> x <= ninit_push + nproducers) !popped + in + let pushed_2 = + List.filter (fun x -> x > ninit_push + nproducers) !popped + in + + List.sort Int.compare pushed_1 = List.rev pushed_1 + && List.sort Int.compare pushed_2 = List.rev pushed_2))) + +let tests = + let open Alcotest in + [ + ( "basic", + [ + test_case "1-producer-1-consumer" `Slow push_pop; + test_case "2-domains-is_empty" `Slow is_empty; + test_case "1-push-1-drop" `Slow push_drop; + test_case "2-producers" `Slow push_push; + test_case "2-producers-1-consumer" `Slow two_producers_one_consumer; + ] ); + ] let () = let open Alcotest in - run "mpsc_queue_dscheck" - [ - ( "basic", - [ - test_case "1-producer-1-consumer" `Slow producer_consumer; - test_case "1-producer-1-consumer-peek" `Slow producer_consumer_peek; - test_case "2-producers" `Slow two_producers; - ] ); - ] + run "dscheck_bounded_queue" tests diff --git a/test/mpsc_queue/qcheck_mpsc_queue.ml b/test/mpsc_queue/qcheck_mpsc_queue.ml index 95a9df41..7369223f 100644 --- a/test/mpsc_queue/qcheck_mpsc_queue.ml +++ b/test/mpsc_queue/qcheck_mpsc_queue.ml @@ -2,9 +2,8 @@ module Mpsc_queue = Saturn.Single_consumer_queue (* Mpsc_queue is a multiple producers, single consumer queue. *) (* Producers can use the functions - - [push], - - [is_empty], - - [close] *) + - [push] +*) (* Consumer can use the functions - [pop], - [push], @@ -53,16 +52,6 @@ let extract_n_with_peek q n close = let peeked, popped = loop [] [] n in (List.rev peeked, List.rev popped) -let popped_until_empty_and_closed q = - let rec loop acc = - try - let popped = Mpsc_queue.pop_opt q in - Domain.cpu_relax (); - loop (popped :: acc) - with Mpsc_queue.Closed -> acc - in - loop [] |> List.rev - let keep_n_first n = List.filteri (fun i _ -> i < n) let keep_n_last n l = List.filteri (fun i _ -> i >= List.length l - n) l let list_some = List.map (fun elt -> `Some elt) @@ -94,7 +83,7 @@ let tests_one_consumer = (* Testing property *) Mpsc_queue.push_head queue i; - try Mpsc_queue.pop queue = i with Mpsc_queue.Empty -> false); + try Mpsc_queue.pop_exn queue = i with Mpsc_queue.Empty -> false); (* TEST 1c - single consumer no producer: forall q and n, peek_opt (push_head q i; q) = Some i*) Test.make ~name:"push_head_peek_opt" @@ -118,7 +107,7 @@ let tests_one_consumer = (* Testing property *) Mpsc_queue.push_head queue i; - try Mpsc_queue.peek queue = i with Mpsc_queue.Empty -> false); + try Mpsc_queue.peek_exn queue = i with Mpsc_queue.Empty -> false); (* TEST 2 - single consumer no producer: forall q, if is_empty q then pop_opt queue = None *) Test.make ~name:"pop_opt_empty" (list int) (fun lpush -> @@ -149,7 +138,7 @@ let tests_one_consumer = (* Testing property *) (try - ignore (Mpsc_queue.pop queue); + ignore (Mpsc_queue.pop_exn queue); false with Mpsc_queue.Empty -> true) && !count = List.length lpush); @@ -183,7 +172,7 @@ let tests_one_consumer = (* Testing property *) (try - ignore (Mpsc_queue.peek queue); + ignore (Mpsc_queue.peek_exn queue); false with Mpsc_queue.Empty -> true) && !count = List.length lpush); @@ -497,213 +486,6 @@ let tests_one_consumer_one_producer = && keep_n_first (List.length lpush_head) all_pushed = list_some (lpush_head |> List.rev) && keep_n_last (List.length lpush) all_pushed = list_some lpush); - (* TEST 4 - one consumer one producer - Consumer push then close while consumer pop_opt until the queue - is empty and closed. *) - Test.make ~name:"par_pop_opt_push2" (list int) (fun lpush -> - (* Initialisation*) - let queue = Mpsc_queue.create () in - let barrier = Barrier.create 2 in - - (* Sequential [push_head] *) - let producer = - Domain.spawn (fun () -> - Barrier.await barrier; - let res = - try - List.iter - (fun elt -> - Mpsc_queue.push queue elt; - Domain.cpu_relax ()) - lpush; - false - with Mpsc_queue.Closed -> true - in - Mpsc_queue.close queue; - res) - in - - Barrier.await barrier; - let popped = popped_until_empty_and_closed queue in - let unexpected_closed = Domain.join producer in - let popped_value = - List.filter (function Some _ -> true | _ -> false) popped - in - - (not unexpected_closed) - && lpush |> List.map (fun elt -> Some elt) = popped_value); - ] - -let tests_one_consumer_two_producers = - QCheck. - [ - (* TEST 1 - one consumer two producers: - Two producers push at the same time. - Checks that producers do not erase each other [pushes]. *) - Test.make ~name:"par_push" - (pair (list int) (list int)) - (fun (lpush1, lpush2) -> - (* Initialization *) - let npush1, npush2 = (List.length lpush1, List.length lpush2) in - let queue = Mpsc_queue.create () in - let barrier = Barrier.create 2 in - - let multi_push lpush = - Barrier.await barrier; - try - List.iter - (fun elt -> - Mpsc_queue.push queue elt; - Domain.cpu_relax ()) - lpush; - false - with Mpsc_queue.Closed -> true - in - - (* Producers pushes. *) - let producer1 = Domain.spawn (fun () -> multi_push lpush1) in - let producer2 = Domain.spawn (fun () -> multi_push lpush2) in - - let closed1 = Domain.join producer1 in - let closed2 = Domain.join producer2 in - - Mpsc_queue.close queue; - - (* Retrieve pushed values. *) - let popped = popped_until_empty_and_closed queue in - - let popped_value = - List.fold_left - (fun acc elt -> - match elt with Some elt -> elt :: acc | _ -> acc) - [] popped - |> List.rev - in - - let rec compare l l1 l2 = - match (l, l1, l2) with - | [], [], [] -> true - | [], _, _ -> false - | _, [], _ -> l = l2 - | _, _, [] -> l = l1 - | x :: l', y :: l1', z :: l2' -> - if x = y && x = z then compare l' l1 l2' || compare l' l1' l2 - else if x = y then compare l' l1' l2 - else if x = z then compare l' l1 l2' - else false - in - - (* Testing property : - - no Close exception raised before the queue being actually closed - - all pushed values are in the queue - *) - (not closed1) && (not closed2) - && List.length popped_value = npush1 + npush2 - && compare popped_value lpush1 lpush2); - (* TEST 2 - one consumer two producers: - - Two producers push and close the queue when one has finished - pushing. At the same time a consumer popes. - - Checks that closing the queue prevent other producers to push - and that popping at the same time works. - *) - Test.make ~name:"par_push_close_pop_opt" - (pair (list int) (list int)) - (fun (lpush1, lpush2) -> - (* Initialization *) - let npush1, npush2 = (List.length lpush1, List.length lpush2) in - let queue = Mpsc_queue.create () in - let barrier = Barrier.create 3 in - - let guard_push lpush = - Barrier.await barrier; - let closed_when_pushing = - try - List.iter - (fun elt -> - Mpsc_queue.push queue elt; - Domain.cpu_relax ()) - lpush; - false - with Mpsc_queue.Closed -> true - in - ( closed_when_pushing, - try - Mpsc_queue.close queue; - true - with Mpsc_queue.Closed -> false ) - in - - (* Producers pushes. *) - let producer1 = Domain.spawn (fun () -> guard_push lpush1) in - let producer2 = Domain.spawn (fun () -> guard_push lpush2) in - - (* Waiting to make sure the producers have time to - start. However, as the consumer will [pop_opt] until one of - the producer closes the queue, it is not a requirement to wait here. *) - Barrier.await barrier; - - let popped = popped_until_empty_and_closed queue in - - let closed_when_pushing1, has_closed1 = Domain.join producer1 in - let closed_when_pushing2, has_closed2 = Domain.join producer2 in - - let popped_value = - List.fold_left - (fun acc elt -> - match elt with Some elt -> elt :: acc | _ -> acc) - [] popped - |> List.rev - in - - let rec compare l l1 l2 = - match (l, l1, l2) with - | [], [], [] -> true - | [], _, _ -> false - | _, [], _ -> l = l2 - | _, _, [] -> l = l1 - | x :: l', y :: l1', z :: l2' -> - if x = y && x = z then compare l' l1 l2' || compare l' l1' l2 - else if x = y then compare l' l1' l2 - else if x = z then compare l' l1 l2' - else false - in - - (* Testing property : - - there should be only 4 workings combinaisons for the boolean values - [closed_when_pushing] and [has_closed] : - + CASE 1 : if producer 1 closed the queue before producer 2 have finised - pushing. In this case returned values will be: - 1 -> false, true / 2 -> true, false - + CASE 2 : if producer 1 closed the queue and producer 2 have finised - pushing but have not closed the queue. - 1 -> false, true / 2 -> false, false - + two symetrical cases. - - in case 1, the closing producer should have pushed everything but not - the other. - - in case 2, both queues should have finished pushing. *) - match - ( closed_when_pushing1, - has_closed1, - closed_when_pushing2, - has_closed2 ) - with - | false, true, true, false -> - (* CASE 1 *) - let real_npush2 = List.length popped_value - npush1 in - real_npush2 < npush2 - && compare popped_value lpush1 (keep_n_first real_npush2 lpush2) - | true, false, false, true -> - (* CASE 1, sym *) - let real_npush1 = List.length popped_value - npush2 in - real_npush1 < npush1 - && compare popped_value (keep_n_first real_npush1 lpush1) lpush2 - | false, true, false, false | false, false, false, true -> - (* CASE 2*) - List.length popped_value = npush1 + npush2 - && compare popped_value lpush1 lpush2 - | _, _, _, _ -> false); ] let main () = @@ -713,7 +495,6 @@ let main () = ("one_consumer", to_alcotest tests_one_consumer); ("one_producer", to_alcotest tests_one_producer); ("one_cons_one_prod", to_alcotest tests_one_consumer_one_producer); - ("one_cons_two_prod", to_alcotest tests_one_consumer_two_producers); ] ;; diff --git a/test/mpsc_queue/stm_mpsc_queue.ml b/test/mpsc_queue/stm_mpsc_queue.ml index 21899ce1..82c5a3f3 100644 --- a/test/mpsc_queue/stm_mpsc_queue.ml +++ b/test/mpsc_queue/stm_mpsc_queue.ml @@ -6,13 +6,24 @@ open Util module Mpsc_queue = Saturn.Single_consumer_queue module Spec = struct - type cmd = Push of int | Pop | Peek | Push_head of int | Is_empty | Close + type cmd = + | Push of int + | Push_all of int list + | Pop_exn + | Peek_exn + | Push_head of int + | Is_empty + | Close + + let string_of_int_list l = + "[" ^ String.concat "; " (List.map string_of_int l) ^ "]" let show_cmd c = match c with | Push i -> "Push " ^ string_of_int i - | Pop -> "Pop" - | Peek -> "Peek" + | Push_all l -> "Push_all " ^ string_of_int_list l + | Pop_exn -> "Pop_exn" + | Peek_exn -> "Peek_exn" | Push_head i -> "Push_head" ^ string_of_int i | Is_empty -> "Is_empty" | Close -> "Close" @@ -26,8 +37,9 @@ module Spec = struct (Gen.oneof [ Gen.map (fun i -> Push i) int_gen; - Gen.return Is_empty; - Gen.return Close; + Gen.map (fun l -> Push_all l) (Gen.small_list int_gen); + (* Gen.return Is_empty; *) + (* Gen.return Close; *) ]) let arb_cmd _s = @@ -35,8 +47,8 @@ module Spec = struct QCheck.make ~print:show_cmd (Gen.oneof [ - Gen.return Pop; - Gen.return Peek; + Gen.return Pop_exn; + Gen.return Peek_exn; Gen.map (fun i -> Push i) int_gen; Gen.map (fun i -> Push_head i) int_gen; Gen.return Is_empty; @@ -51,10 +63,11 @@ module Spec = struct match c with | Push i -> (is_closed, if not is_closed then i :: List.rev s |> List.rev else s) + | Push_all l -> (is_closed, if not is_closed then s @ l else s) | Push_head i -> (is_closed, if not (is_closed && s = []) then i :: s else s) | Is_empty -> (is_closed, s) - | Pop -> (is_closed, match s with [] -> s | _ :: s' -> s') - | Peek -> (is_closed, s) + | Pop_exn -> (is_closed, match s with [] -> s | _ :: s' -> s') + | Peek_exn -> (is_closed, s) | Close -> (true, s) let precond _ _ = true @@ -62,8 +75,10 @@ module Spec = struct let run c d = match c with | Push i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push d i) d) - | Pop -> Res (result int exn, protect Mpsc_queue.pop d) - | Peek -> Res (result int exn, protect Mpsc_queue.peek d) + | Push_all l -> + Res (result unit exn, protect (fun d -> Mpsc_queue.push_all d l) d) + | Pop_exn -> Res (result int exn, protect Mpsc_queue.pop_exn d) + | Peek_exn -> Res (result int exn, protect Mpsc_queue.peek_exn d) | Push_head i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push_head d i) d) | Is_empty -> Res (result bool exn, protect Mpsc_queue.is_empty d) @@ -71,12 +86,12 @@ module Spec = struct let postcond c ((is_closed, s) : state) res = match (c, res) with - | Push _, Res ((Result (Unit, Exn), _), res) -> + | (Push _ | Push_all _), Res ((Result (Unit, Exn), _), res) -> if is_closed then res = Error Mpsc_queue.Closed else res = Ok () | Push_head _, Res ((Result (Unit, Exn), _), res) -> if is_closed && s = [] then res = Error Mpsc_queue.Closed else res = Ok () - | (Pop | Peek), Res ((Result (Int, Exn), _), res) -> ( + | (Pop_exn | Peek_exn), Res ((Result (Int, Exn), _), res) -> ( match s with | [] -> if is_closed then res = Error Mpsc_queue.Closed @@ -98,7 +113,7 @@ let () = and type Spec.sut = Spec.sut) = (* [arb_cmds_par] differs in what each triple component generates: "Consumer domain" cmds can't be [Push] (but can be [Pop], [Is_empty], [Close] or [Push_head]), - "producer domain" cmds can't be [Push_head] or [Pop] (but can be [Push], [Is_empty] or [Close]). *) + "producer domain" cmds can only by [Push]). *) let arb_cmds_par = Dom.arb_triple 20 12 Spec.arb_cmd Spec.arb_cmd Spec.producer_cmd in @@ -111,7 +126,7 @@ let () = in [ agree_test_par_asym ~count ~name:(name ^ " parallel"); - Dom.neg_agree_test_par ~count ~name:(name ^ " parallel, negative"); + (* Dom.neg_agree_test_par ~count ~name:(name ^ " parallel, negative"); *) ] in Stm_run.run ~name:"Saturn.Mpsc_queue" ~make_domain (module Spec) |> exit