diff --git a/bench/bench_spsc_queue.ml b/bench/bench_spsc_queue.ml index 6d7a823c..f82ddcf2 100644 --- a/bench/bench_spsc_queue.ml +++ b/bench/bench_spsc_queue.ml @@ -16,7 +16,7 @@ let run () = start_time) in for _ = 1 to item_count do - while Option.is_none (Spsc_queue.pop queue) do + while Option.is_none (Spsc_queue.pop_opt queue) do () done done; diff --git a/src_lockfree/michael_scott_queue.ml b/src_lockfree/michael_scott_queue.ml index 867153ce..b1a20d74 100644 --- a/src_lockfree/michael_scott_queue.ml +++ b/src_lockfree/michael_scott_queue.ml @@ -31,7 +31,9 @@ let create () = let is_empty { head; _ } = Atomic.get (Atomic.get head) == Nil -let pop { head; _ } = +exception Empty + +let pop_opt { head; _ } = let b = Backoff.create () in let rec loop () = let old_head = Atomic.get head in @@ -45,6 +47,27 @@ let pop { head; _ } = in loop () +let pop { head; _ } = + let b = Backoff.create () in + let rec loop () = + let old_head = Atomic.get head in + match Atomic.get old_head with + | Nil -> raise Empty + | Next (value, next) when Atomic.compare_and_set head old_head next -> value + | _ -> + Backoff.once b; + loop () + in + loop () + +let peek_opt { head; _ } = + let old_head = Atomic.get head in + match Atomic.get old_head with Nil -> None | Next (value, _) -> Some value + +let peek { head; _ } = + let old_head = Atomic.get head in + match Atomic.get old_head with Nil -> raise Empty | Next (value, _) -> value + let rec fix_tail tail new_tail = let old_tail = Atomic.get tail in if @@ -66,24 +89,6 @@ let push { tail; _ } value = if not (Atomic.compare_and_set tail old_tail new_tail) then fix_tail tail new_tail -let clean_until { head; _ } f = - let b = Backoff.create () in - let rec loop () = - let old_head = Atomic.get head in - match Atomic.get old_head with - | Nil -> () - | Next (value, next) -> - if not (f value) then - if Atomic.compare_and_set head old_head next then ( - Backoff.reset b; - loop ()) - else ( - Backoff.once b; - loop ()) - else () - in - loop () - type 'a cursor = 'a node let snapshot { head; _ } = Atomic.get (Atomic.get head) diff --git a/src_lockfree/michael_scott_queue.mli b/src_lockfree/michael_scott_queue.mli index 88090256..f40458a8 100644 --- a/src_lockfree/michael_scott_queue.mli +++ b/src_lockfree/michael_scott_queue.mli @@ -37,14 +37,26 @@ val is_empty : 'a t -> bool val push : 'a t -> 'a -> unit (** [push q v] adds the element [v] at the end of the queue [q]. *) -val pop : 'a t -> 'a option -(** [pop q] removes and returns the first element in queue [q], or +exception Empty +(** Raised when {!pop} or {!peek} is applied to an empty queue. *) + +val pop : 'a t -> 'a +(** [pop q] removes and returns the first element in queue [q]. + + @raise Empty if [q] is empty. *) + +val pop_opt : 'a t -> 'a option +(** [pop_opt q] removes and returns the first element in queue [q], or returns [None] if the queue is empty. *) -val clean_until : 'a t -> ('a -> bool) -> unit -(** [clean_until q f] drops the prefix of the queue until the element [e], - where [f e] is [true]. If no such element exists, then the queue is - emptied. *) +val peek : 'a t -> 'a +(** [peek q] returns the first element in queue [q]. + + @raise Empty if [q] is empty. *) + +val peek_opt : 'a t -> 'a option +(** [peek_opt q] returns the first element in queue [q], or + returns [None] if the queue is empty. *) type 'a cursor (** The type of cursor. *) diff --git a/src_lockfree/mpsc_queue.ml b/src_lockfree/mpsc_queue.ml index 49621fa9..f0cc4b7a 100644 --- a/src_lockfree/mpsc_queue.ml +++ b/src_lockfree/mpsc_queue.ml @@ -102,7 +102,7 @@ let rec close (t : 'a t) = (* Retry *) close t) -let pop t = +let pop_opt t = let p = t.head in (* [p] is the previously-popped item. *) let node = Atomic.get p.next in @@ -115,6 +115,33 @@ let pop t = (* So it can be GC'd *) Some v) +exception Empty + +let pop t = + let p = t.head in + (* [p] is the previously-popped item. *) + let node = Atomic.get p.next in + Node.fold node + ~none:(fun () -> raise Empty) + ~some:(fun node -> + t.head <- node; + let v = node.value in + node.value <- Obj.magic (); + (* So it can be GC'd *) + v) + +let peek_opt t = + let p = t.head in + (* [p] is the previously-popped item. *) + let node = Atomic.get p.next in + Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value) + +let peek t = + let p = t.head in + (* [p] is the previously-popped item. *) + let node = Atomic.get p.next in + Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value) + let is_empty t = Node.fold (Atomic.get t.head.next) ~none:(fun () -> true) diff --git a/src_lockfree/mpsc_queue.mli b/src_lockfree/mpsc_queue.mli index bc7b3b32..216f4033 100644 --- a/src_lockfree/mpsc_queue.mli +++ b/src_lockfree/mpsc_queue.mli @@ -12,33 +12,58 @@ exception Closed val create : unit -> 'a t (** [create ()] returns a new empty queue. *) +val is_empty : 'a t -> bool +(** [is_empty q] is [true] if calling [pop] would return [None]. + + @raise Closed if [q] is closed and empty. *) + +val close : 'a t -> unit +(** [close q] marks [q] as closed, preventing any further items from + being pushed by the producers (i.e. with {!push}). + + @raise Closed if [q] has already been closed. *) + val push : 'a t -> 'a -> unit -(** [push q v] adds the element [v] at the end of the queue [q]. This +(** [push q v] adds the element [v] at the end of the queue [q]. This can be used safely by multiple producer domains, in parallel with the other operations. @raise Closed if [q] is closed. *) -val pop : 'a t -> 'a option -(** [pop q] removes and returns the first element in queue [q] or +(** {2 Consumer-only functions} *) + +exception Empty +(** Raised when {!pop} or {!peek} is applied to an empty queue. *) + +val pop : 'a t -> 'a +(** [pop q] removes and returns the first element in queue [q]. + + @raise Empty if [q] is empty. + + @raise Closed if [q] is closed and empty. *) + +val pop_opt : 'a t -> 'a option +(** [pop_opt q] removes and returns the first element in queue [q] or returns [None] if the queue is empty. @raise Closed if [q] is closed and empty. *) -val push_head : 'a t -> 'a -> unit -(** [push_head q v] adds the element [v] at the head of the queue - [q]. This can only be used by the consumer (if run in parallel - with {!pop}, the item might be skipped). +val peek : 'a t -> 'a +(** [peek q] returns the first element in queue [q]. + + @raise Empty if [q] is empty. @raise Closed if [q] is closed and empty. *) -val is_empty : 'a t -> bool -(** [is_empty q] is [true] if calling [pop] would return [None]. +val peek_opt : 'a t -> 'a option +(** [peek_opt q] returns the first element in queue [q] or + returns [None] if the queue is empty. @raise Closed if [q] is closed and empty. *) -val close : 'a t -> unit -(** [close q] marks [q] as closed, preventing any further items from - being pushed by the producers (i.e. with {!push}). +val push_head : 'a t -> 'a -> unit +(** [push_head q v] adds the element [v] at the head of the queue + [q]. This can only be used by the consumer (if run in parallel + with {!pop}, the item might be skipped). - @raise Closed if [q] has already been closed. *) + @raise Closed if [q] is closed and empty. *) diff --git a/src_lockfree/spsc_queue.ml b/src_lockfree/spsc_queue.ml index 7562294f..d660adb8 100644 --- a/src_lockfree/spsc_queue.ml +++ b/src_lockfree/spsc_queue.ml @@ -48,7 +48,31 @@ let push { array; head; tail; mask; _ } element = Array.set array (tail_val land mask) (Some element); Atomic.set tail (tail_val + 1)) +let try_push { array; head; tail; mask; _ } element = + let size = mask + 1 in + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + if head_val + size == tail_val then false + else ( + Array.set array (tail_val land mask) (Some element); + Atomic.set tail (tail_val + 1); + true) + +exception Empty + let pop { array; head; tail; mask; _ } = + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + if head_val == tail_val then raise Empty + else + let index = head_val land mask in + let v = Array.get array index in + (* allow gc to collect it *) + Array.set array index None; + Atomic.set head (head_val + 1); + match v with None -> assert false | Some v -> v + +let pop_opt { array; head; tail; mask; _ } = let head_val = Atomic.get head in let tail_val = Atomic.get tail in if head_val == tail_val then None @@ -61,4 +85,21 @@ let pop { array; head; tail; mask; _ } = assert (Option.is_some v); v +let peek_opt { array; head; tail; mask; _ } = + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + if head_val == tail_val then None + else + let v = Array.get array @@ (head_val land mask) in + assert (Option.is_some v); + v + +let peek { array; head; tail; mask; _ } = + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + if head_val == tail_val then raise Empty + else + let v = Array.get array @@ (head_val land mask) in + match v with None -> assert false | Some v -> v + let size { head; tail; _ } = Atomic.get tail - Atomic.get head diff --git a/src_lockfree/spsc_queue.mli b/src_lockfree/spsc_queue.mli index dc69b3a4..62c1a38c 100644 --- a/src_lockfree/spsc_queue.mli +++ b/src_lockfree/spsc_queue.mli @@ -4,26 +4,59 @@ type 'a t (** Type of single-producer single-consumer non-resizable domain-safe queue that works in FIFO order. *) -exception Full -(** Raised when {!push} is applied to a full queue. *) - val create : size_exponent:int -> 'a t (** [create ~size_exponent:int] returns a new queue of maximum size [2^size_exponent] and initially empty. *) +val size : 'a t -> int +(** [size] returns the size of the queue. This method linearizes only when called + from either consumer or producer domain. Otherwise, it is safe to call but + provides only an *indication* of the size of the structure. *) + +(** {1 Producer functions} *) + +exception Full +(** Raised when {!push} is applied to a full queue. *) + val push : 'a t -> 'a -> unit (** [push q v] adds the element [v] at the end of the queue [q]. This method can be used by at most one domain at the time. - @raise [Full] if the queue is full. + @raise Full if the queue is full. +*) + +val try_push : 'a t -> 'a -> bool +(** [try_push q v] tries to add the element [v] at the end of the + queue [q]. It fails it the queue [q] is full. This method can be + used by at most one domain at the time. +*) + +(** {2 Consumer functions} *) + +exception Empty +(** Raised when {!pop} or {!peek} is applied to an empty queue. *) + +val pop : 'a t -> 'a +(** [pop q] removes and returns the first element in queue [q]. This + method can be used by at most one domain at the time. + + @raise Empty if [q] is empty. *) -val pop : 'a t -> 'a option -(** [pop q] removes and returns the first element in queue [q], or +val pop_opt : 'a t -> 'a option +(** [pop_opt q] removes and returns the first element in queue [q], or returns [None] if the queue is empty. This method can be used by at most one domain at the time. *) -val size : 'a t -> int -(** [size] returns the size of the queue. This method linearizes only when called - from either consumer or producer domain. Otherwise, it is safe to call but - provides only an *indication* of the size of the structure. *) +val peek : 'a t -> 'a +(** [peek q] returns the first element in queue [q]. This method can be + used by at most one domain at the time. + + @raise Empty if [q] is empty. +*) + +val peek_opt : 'a t -> 'a option +(** [peek_opt q] returns the first element in queue [q], or [None] + if the queue is empty. This method can be used by at most one + domain at the time. +*) diff --git a/src_lockfree/treiber_stack.ml b/src_lockfree/treiber_stack.ml index 690e73c6..a940643f 100644 --- a/src_lockfree/treiber_stack.ml +++ b/src_lockfree/treiber_stack.ml @@ -28,7 +28,31 @@ let push q v = in loop b +exception Empty + let pop q = + let rec loop b = + let s = Atomic.get q.head in + match s with + | Nil -> raise Empty + | Next (v, next) -> + if Atomic.compare_and_set q.head s next then v + else ( + Backoff.once b; + loop b) + in + + let s = Atomic.get q.head in + match s with + | Nil -> raise Empty + | Next (v, next) -> + if Atomic.compare_and_set q.head s next then v + else + let b = Backoff.create () in + Backoff.once b; + loop b + +let pop_opt q = let rec loop b = let s = Atomic.get q.head in match s with diff --git a/src_lockfree/treiber_stack.mli b/src_lockfree/treiber_stack.mli index 6dc3644c..f1e1f04b 100644 --- a/src_lockfree/treiber_stack.mli +++ b/src_lockfree/treiber_stack.mli @@ -15,6 +15,17 @@ val is_empty : 'a t -> bool val push : 'a t -> 'a -> unit (** [push s v] adds the element [v] at the top of stack [s]. *) -val pop : 'a t -> 'a option -(** [pop a] removes and returns the topmost element in the - stack [s], or returns [None] if the stack is empty. *) +exception Empty +(** Raised when {!pop} or {!peek} is applied to an empty queue. *) + +val pop : 'a t -> 'a +(** [pop s] removes and returns the topmost element in the + stack [s]. + + @raise Empty if [a] is empty. +*) + +val pop_opt : 'a t -> 'a option +(** [pop_opt s] removes and returns the topmost element in the + stack [s], or returns [None] if the stack is empty. +*) diff --git a/src_lockfree/ws_deque.ml b/src_lockfree/ws_deque.ml index 6ca249a4..5ebbd02b 100644 --- a/src_lockfree/ws_deque.ml +++ b/src_lockfree/ws_deque.ml @@ -33,7 +33,9 @@ module type S = sig val create : unit -> 'a t val push : 'a t -> 'a -> unit val pop : 'a t -> 'a + val pop_opt : 'a t -> 'a option val steal : 'a t -> 'a + val steal_opt : 'a t -> 'a option end module CArray = struct @@ -169,6 +171,8 @@ module M : S = struct set_next_shrink q); release out) + let pop_opt q = try Some (pop q) with Exit -> None + let rec steal q = let t = Atomic.get q.top in let b = Atomic.get q.bottom in @@ -181,4 +185,6 @@ module M : S = struct else ( Domain.cpu_relax (); steal q) + + let steal_opt q = try Some (steal q) with Exit -> None end diff --git a/src_lockfree/ws_deque.mli b/src_lockfree/ws_deque.mli index ff1f2e09..d24a2184 100644 --- a/src_lockfree/ws_deque.mli +++ b/src_lockfree/ws_deque.mli @@ -34,6 +34,10 @@ module type S = sig @raise [Exit] if the queue is empty. *) + val pop_opt : 'a t -> 'a option + (** [pop_opt q] removes and returns the first element in queue [q], or + returns [None] if the queue is empty. *) + (** {1 Stealers function} *) val steal : 'a t -> 'a @@ -42,7 +46,12 @@ module type S = sig queue [q]. @raise [Exit] if the queue is empty. - *) + *) + + val steal_opt : 'a t -> 'a option + (** [steal_opt q] removes and returns the last element from queue + [q], or returns [None] if the queue is empty. It should only be + invoked by domain which doesn't own the queue [q]. *) end module M : S diff --git a/test/michael_scott_queue/michael_scott_queue_dscheck.ml b/test/michael_scott_queue/michael_scott_queue_dscheck.ml index 889305f3..15949111 100644 --- a/test/michael_scott_queue/michael_scott_queue_dscheck.ml +++ b/test/michael_scott_queue/michael_scott_queue_dscheck.ml @@ -2,7 +2,7 @@ let drain queue = let remaining = ref 0 in while not (Michael_scott_queue.is_empty queue) do remaining := !remaining + 1; - assert (Option.is_some (Michael_scott_queue.pop queue)) + assert (Option.is_some (Michael_scott_queue.pop_opt queue)) done; !remaining @@ -21,7 +21,7 @@ let producer_consumer () = let popped = ref 0 in Atomic.spawn (fun () -> for _ = 1 to items_total do - match Michael_scott_queue.pop queue with + match Michael_scott_queue.pop_opt queue with | None -> () | Some v -> assert (v == !popped + 1); @@ -34,6 +34,47 @@ let producer_consumer () = let remaining = drain queue in !popped + remaining = items_total))) +let producer_consumer_peek () = + Atomic.trace (fun () -> + let queue = Michael_scott_queue.create () in + let items_total = 1 in + let pushed = List.init items_total (fun i -> i) in + + (* producer *) + Atomic.spawn (fun () -> + List.iter (fun elt -> Michael_scott_queue.push queue elt) pushed); + + (* consumer *) + let popped = ref [] in + let peeked = ref [] in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + peeked := Michael_scott_queue.peek_opt queue :: !peeked; + popped := Michael_scott_queue.pop_opt queue :: !popped + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> + let rec check pushed peeked popped = + match (pushed, peeked, popped) with + | _, [], [] -> true + | _, None :: peeked, None :: popped -> + check pushed peeked popped + | push :: pushed, None :: peeked, Some pop :: popped + when push = pop -> + check pushed peeked popped + | push :: pushed, Some peek :: peeked, Some pop :: popped + when push = peek && push = pop -> + check pushed peeked popped + | _, _, _ -> false + in + check pushed (List.rev !peeked) (List.rev !popped)); + Atomic.check (fun () -> + let remaining = drain queue in + let popped = List.filter Option.is_some !popped in + List.length popped + remaining = items_total))) + let two_producers () = Atomic.trace (fun () -> let queue = Michael_scott_queue.create () in @@ -72,7 +113,8 @@ let two_domains () = (fun elt -> (* even nums belong to thr 1, odd nums to thr 2 *) Michael_scott_queue.push stack elt; - lpop := Option.get (Michael_scott_queue.pop stack) :: !lpop) + lpop := + Option.get (Michael_scott_queue.pop_opt stack) :: !lpop) lpush) |> ignore) lists; @@ -102,6 +144,7 @@ let () = ( "basic", [ test_case "1-producer-1-consumer" `Slow producer_consumer; + test_case "1-producer-1-consumer-peek" `Slow producer_consumer_peek; test_case "2-producers" `Slow two_producers; test_case "2-domains" `Slow two_domains; ] ); diff --git a/test/michael_scott_queue/qcheck_michael_scott_queue.ml b/test/michael_scott_queue/qcheck_michael_scott_queue.ml index 4b1add80..404348ff 100644 --- a/test/michael_scott_queue/qcheck_michael_scott_queue.ml +++ b/test/michael_scott_queue/qcheck_michael_scott_queue.ml @@ -13,7 +13,7 @@ let tests_sequential = (* Testing property *) not (is_empty queue)); (* TEST 2 - push, pop until empty *) - Test.make ~name:"push_pop_until_empty" (list int) (fun lpush -> + Test.make ~name:"push_pop_opt_until_empty" (list int) (fun lpush -> (* Building a random queue *) let queue = create () in List.iter (push queue) lpush; @@ -22,12 +22,12 @@ let tests_sequential = let count = ref 0 in while not (is_empty queue) do incr count; - ignore (pop queue) + ignore (pop_opt queue) done; (* Testing property *) - pop queue = None && !count = List.length lpush); - (* TEST 3 - push, pop, check FIFO *) + pop_opt queue = None && !count = List.length lpush); + (* TEST 3 - push, pop_opt, check FIFO *) Test.make ~name:"fifo" (list int) (fun lpush -> (* Building a random queue *) let queue = create () in @@ -37,33 +37,59 @@ let tests_sequential = let insert v = out := v :: !out in for _ = 1 to List.length lpush do - match pop queue with None -> assert false | Some v -> insert v + match pop_opt queue with None -> assert false | Some v -> insert v done; (* Testing property *) lpush = List.rev !out); + (* TEST 3 - push, pop_opt, peek_opt check FIFO *) + Test.make ~name:"fifo_peek_opt" (list int) (fun lpush -> + (* Building a random queue *) + let queue = create () in + List.iter (push queue) lpush; + + let pop = ref [] in + let peek = ref [] in + let insert out v = out := v :: !out in + + for _ = 1 to List.length lpush do + match peek_opt queue with + | None -> assert false + | Some v -> ( + insert peek v; + match pop_opt queue with + | None -> assert false + | Some v -> insert pop v) + done; + + (* Testing property *) + lpush = List.rev !pop && lpush = List.rev !peek); ] let tests_one_consumer_one_producer = QCheck. [ (* TEST 1 - one consumer one producer: - Parallel [push] and [pop]. *) - Test.make ~count:10_000 ~name:"parallel_fifo" (list int) (fun lpush -> + Parallel [push] and [pop_opt]. *) + Test.make ~name:"parallel_fifo" (list int) (fun lpush -> (* Initialization *) let queue = create () in + let barrier = Barrier.create 2 in (* Producer pushes. *) let producer = - Domain.spawn (fun () -> List.iter (push queue) lpush) + Domain.spawn (fun () -> + Barrier.await barrier; + List.iter (push queue) lpush) in + Barrier.await barrier; let fifo = List.fold_left (fun acc item -> let popped = ref None in while Option.is_none !popped do - popped := pop queue + popped := pop_opt queue done; acc && item = Option.get !popped) true lpush @@ -73,16 +99,53 @@ let tests_one_consumer_one_producer = (* Ensure nothing is left behind. *) Domain.join producer; fifo && empty); + (* TEST 2 - one consumer one producer: + Parallel [push] and [peek_opt] and [pop_opt]. *) + Test.make ~name:"parallel_peek" (list int) (fun pushed -> + (* Initialization *) + let npush = List.length pushed in + let queue = create () in + let barrier = Barrier.create 2 in + + (* Producer pushes. *) + let producer = + Domain.spawn (fun () -> + Barrier.await barrier; + List.iter (push queue) pushed) + in + + let peeked = ref [] in + let popped = ref [] in + Barrier.await barrier; + for _ = 1 to npush do + peeked := peek_opt queue :: !peeked; + popped := pop_opt queue :: !popped + done; + + Domain.join producer; + let rec check = function + | _, [], [] -> true + | pushed, None :: peeked, None :: popped -> + check (pushed, peeked, popped) + | push :: pushed, None :: peeked, Some pop :: popped when push = pop + -> + check (pushed, peeked, popped) + | push :: pushed, Some peek :: peeked, Some pop :: popped + when push = peek && push = pop -> + check (pushed, peeked, popped) + | _, _, _ -> false + in + check (pushed, List.rev @@ !peeked, List.rev @@ !popped)); ] let tests_two_domains = QCheck. [ - (* TEST 1 - two domains doing multiple times one push then one pop. - Parallel [push] and [pop]. + (* TEST 1 - two domains doing multiple times one push then one pop_opt. + Parallel [push] and [pop_opt]. *) - Test.make ~count:10_000 ~name:"parallel_pop_push" - (pair small_nat small_nat) (fun (npush1, npush2) -> + Test.make ~name:"parallel_pop_opt_push" (pair small_nat small_nat) + (fun (npush1, npush2) -> (* Initialization *) let queue = create () in let barrier = Barrier.create 2 in @@ -97,7 +160,7 @@ let tests_two_domains = (fun elt -> push queue elt; Domain.cpu_relax (); - pop queue) + pop_opt queue) lpush in @@ -134,13 +197,13 @@ let tests_two_domains = all_elt_in && is_sorted push1_pop1 && is_sorted push1_pop2 && is_sorted push2_pop1 && is_sorted push2_pop2); (* TEST 2 - - Parallel [push] and [pop] with two domains + Parallel [push] and [pop_opt] with two domains Two domains randomly pushs and pops in parallel. They stop as soon as they have finished pushing a list of element to push. *) - Test.make ~count:10_000 ~name:"parallel_pop_push_random" - (pair small_nat small_nat) (fun (npush1, npush2) -> + Test.make ~name:"parallel_pop_opt_push_random" (pair small_nat small_nat) + (fun (npush1, npush2) -> (* Initialization *) let queue = create () in let barrier = Barrier.create 2 in @@ -163,7 +226,7 @@ let tests_two_domains = loop xs popped) else ( incr consecutive_pop; - let p = pop queue in + let p = pop_opt queue in loop lpush (p :: popped)) in loop lpush [] @@ -193,7 +256,7 @@ let tests_two_domains = (* Pop everything that is still on the queue *) let popped3 = let rec loop popped = - match pop queue with + match pop_opt queue with | None -> popped | Some v -> loop (v :: popped) in diff --git a/test/michael_scott_queue/stm_michael_scott_queue.ml b/test/michael_scott_queue/stm_michael_scott_queue.ml index e36a464b..1efd129f 100644 --- a/test/michael_scott_queue/stm_michael_scott_queue.ml +++ b/test/michael_scott_queue/stm_michael_scott_queue.ml @@ -5,12 +5,13 @@ open STM module Ms_queue = Saturn.Queue module MSQConf = struct - type cmd = Push of int | Pop | Is_empty + type cmd = Push of int | Pop | Peek | Is_empty let show_cmd c = match c with | Push i -> "Push " ^ string_of_int i | Pop -> "Pop" + | Peek -> "Peek" | Is_empty -> "Is_empty" type state = int list @@ -23,6 +24,7 @@ module MSQConf = struct [ Gen.map (fun i -> Push i) int_gen; Gen.return Pop; + Gen.return Peek; Gen.return Is_empty; ]) @@ -34,21 +36,24 @@ module MSQConf = struct match c with | Push i -> i :: s | Pop -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s') - | Is_empty -> s + | Peek | Is_empty -> s let precond _ _ = true let run c d = match c with | Push i -> Res (unit, Ms_queue.push d i) - | Pop -> Res (option int, Ms_queue.pop d) + | Pop -> Res (result int exn, protect Ms_queue.pop d) + | Peek -> Res (result int exn, protect Ms_queue.peek d) | Is_empty -> Res (bool, Ms_queue.is_empty d) let postcond c (s : state) res = match (c, res) with | Push _, Res ((Unit, _), _) -> true - | Pop, Res ((Option Int, _), res) -> ( - match List.rev s with [] -> res = None | j :: _ -> res = Some j) + | (Pop | Peek), Res ((Result (Int, Exn), _), res) -> ( + match List.rev s with + | [] -> res = Error Ms_queue.Empty + | j :: _ -> res = Ok j) | Is_empty, Res ((Bool, _), res) -> res = (s = []) | _, _ -> false end diff --git a/test/mpsc_queue/mpsc_queue_dscheck.ml b/test/mpsc_queue/mpsc_queue_dscheck.ml index d46f45b9..8edf203d 100644 --- a/test/mpsc_queue/mpsc_queue_dscheck.ml +++ b/test/mpsc_queue/mpsc_queue_dscheck.ml @@ -2,7 +2,7 @@ let drain queue = let remaining = ref 0 in while not (Mpsc_queue.is_empty queue) do remaining := !remaining + 1; - assert (Option.is_some (Mpsc_queue.pop queue)) + assert (Option.is_some (Mpsc_queue.pop_opt queue)) done; !remaining @@ -22,7 +22,7 @@ let producer_consumer () = Atomic.spawn (fun () -> Mpsc_queue.push_head queue 1; for _ = 1 to items_total do - match Mpsc_queue.pop queue with + match Mpsc_queue.pop_opt queue with | None -> () | Some _ -> popped := !popped + 1 done); @@ -33,6 +33,47 @@ let producer_consumer () = let remaining = drain queue in !popped + remaining = items_total))) +let producer_consumer_peek () = + Atomic.trace (fun () -> + let queue = Mpsc_queue.create () in + let items_total = 1 in + let pushed = List.init items_total (fun i -> i) in + + (* producer *) + Atomic.spawn (fun () -> + List.iter (fun elt -> Mpsc_queue.push queue elt) pushed); + + (* consumer *) + let popped = ref [] in + let peeked = ref [] in + Atomic.spawn (fun () -> + for _ = 1 to items_total do + peeked := Mpsc_queue.peek_opt queue :: !peeked; + popped := Mpsc_queue.pop_opt queue :: !popped + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> + let rec check pushed peeked popped = + match (pushed, peeked, popped) with + | _, [], [] -> true + | _, None :: peeked, None :: popped -> + check pushed peeked popped + | push :: pushed, None :: peeked, Some pop :: popped + when push = pop -> + check pushed peeked popped + | push :: pushed, Some peek :: peeked, Some pop :: popped + when push = peek && push = pop -> + check pushed peeked popped + | _, _, _ -> false + in + check pushed (List.rev !peeked) (List.rev !popped)); + Atomic.check (fun () -> + let remaining = drain queue in + let popped = List.filter Option.is_some !popped in + List.length popped + remaining = items_total))) + let two_producers () = Atomic.trace (fun () -> let queue = Mpsc_queue.create () in @@ -59,6 +100,7 @@ let () = ( "basic", [ test_case "1-producer-1-consumer" `Slow producer_consumer; + test_case "1-producer-1-consumer-peek" `Slow producer_consumer_peek; test_case "2-producers" `Slow two_producers; ] ); ] diff --git a/test/mpsc_queue/qcheck_mpsc_queue.ml b/test/mpsc_queue/qcheck_mpsc_queue.ml index be2af596..95a9df41 100644 --- a/test/mpsc_queue/qcheck_mpsc_queue.ml +++ b/test/mpsc_queue/qcheck_mpsc_queue.ml @@ -7,6 +7,7 @@ module Mpsc_queue = Saturn.Single_consumer_queue - [close] *) (* Consumer can use the functions - [pop], + - [push], - [push_head], - [is_empty], - [close] *) @@ -17,7 +18,7 @@ let extract_n q n close = | m -> if m = n - close then Mpsc_queue.close q; let res = - match Mpsc_queue.pop q with + match Mpsc_queue.pop_opt q with | Some elt -> `Some elt | None -> `None | exception Mpsc_queue.Closed -> `Closed @@ -28,10 +29,34 @@ let extract_n q n close = if n < 0 then failwith "Number of pop should be positive."; loop [] n |> List.rev +let extract_n_with_peek q n close = + let rec loop peeked popped = function + | 0 -> (peeked, popped) + | m -> + if m = n - close then Mpsc_queue.close q; + let peek = + match Mpsc_queue.peek_opt q with + | Some elt -> `Some elt + | None -> `None + | exception Mpsc_queue.Closed -> `Closed + in + let pop = + match Mpsc_queue.pop_opt q with + | Some elt -> `Some elt + | None -> `None + | exception Mpsc_queue.Closed -> `Closed + in + Domain.cpu_relax (); + loop (peek :: peeked) (pop :: popped) (m - 1) + in + if n < 0 then failwith "Number of pop should be positive."; + let peeked, popped = loop [] [] n in + (List.rev peeked, List.rev popped) + let popped_until_empty_and_closed q = let rec loop acc = try - let popped = Mpsc_queue.pop q in + let popped = Mpsc_queue.pop_opt q in Domain.cpu_relax (); loop (popped :: acc) with Mpsc_queue.Closed -> acc @@ -47,7 +72,19 @@ let tests_one_consumer = QCheck. [ (* TEST 1 - single consumer no producer: - forall q and n, pop (push queue i; queue) = Some i*) + forall q and n, pop_opt (push_head q i; q) = Some i*) + Test.make ~name:"push_head_pop_opt" + (pair (list int) int) + (fun (lpush, i) -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + + (* Testing property *) + Mpsc_queue.push_head queue i; + Mpsc_queue.pop_opt queue = Some i); + (* TEST 1b - single consumer no producer: + forall q and n, pop (push_head q i; q) = i*) Test.make ~name:"push_head_pop" (pair (list int) int) (fun (lpush, i) -> @@ -57,9 +94,48 @@ let tests_one_consumer = (* Testing property *) Mpsc_queue.push_head queue i; - Mpsc_queue.pop queue = Some i); + try Mpsc_queue.pop queue = i with Mpsc_queue.Empty -> false); + (* TEST 1c - single consumer no producer: + forall q and n, peek_opt (push_head q i; q) = Some i*) + Test.make ~name:"push_head_peek_opt" + (pair (list int) int) + (fun (lpush, i) -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + + (* Testing property *) + Mpsc_queue.push_head queue i; + Mpsc_queue.peek_opt queue = Some i); + (* TEST 1d - single consumer no producer: + forall q and n, peek (push_head q i; q) = Some i*) + Test.make ~name:"push_head_peek" + (pair (list int) int) + (fun (lpush, i) -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + + (* Testing property *) + Mpsc_queue.push_head queue i; + try Mpsc_queue.peek queue = i with Mpsc_queue.Empty -> false); (* TEST 2 - single consumer no producer: - forall q, if is_empty q then pop queue = None *) + forall q, if is_empty q then pop_opt queue = None *) + Test.make ~name:"pop_opt_empty" (list int) (fun lpush -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + (* Popping until [is_empty q] is true*) + let count = ref 0 in + while not (Mpsc_queue.is_empty queue) do + incr count; + ignore (Mpsc_queue.pop_opt queue) + done; + + (* Testing property *) + Mpsc_queue.pop_opt queue = None && !count = List.length lpush); + (* TEST 2b - single consumer no producer: + forall q, if is_empty q then pop queue raises Empty *) Test.make ~name:"pop_empty" (list int) (fun lpush -> (* Building a random queue *) let queue = Mpsc_queue.create () in @@ -68,11 +144,49 @@ let tests_one_consumer = let count = ref 0 in while not (Mpsc_queue.is_empty queue) do incr count; - ignore (Mpsc_queue.pop queue) + ignore (Mpsc_queue.pop_opt queue) done; (* Testing property *) - Mpsc_queue.pop queue = None && !count = List.length lpush); + (try + ignore (Mpsc_queue.pop queue); + false + with Mpsc_queue.Empty -> true) + && !count = List.length lpush); + (* TEST 2 - single consumer no producer: + forall q, if is_empty q then peek_opt queue = None *) + Test.make ~name:"peek_opt_empty" (list int) (fun lpush -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + (* Popping until [is_empty q] is true*) + let count = ref 0 in + while not (Mpsc_queue.is_empty queue) do + incr count; + ignore (Mpsc_queue.pop_opt queue) + done; + + (* Testing property *) + Mpsc_queue.peek_opt queue = None && !count = List.length lpush); + (* TEST 2b - single consumer no producer: + forall q, if is_empty q then peek queue raises Empty *) + Test.make ~name:"peek_empty" (list int) (fun lpush -> + (* Building a random queue *) + let queue = Mpsc_queue.create () in + List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); + (* Popping until [is_empty q] is true*) + let count = ref 0 in + while not (Mpsc_queue.is_empty queue) do + incr count; + ignore (Mpsc_queue.pop_opt queue) + done; + + (* Testing property *) + (try + ignore (Mpsc_queue.peek queue); + false + with Mpsc_queue.Empty -> true) + && !count = List.length lpush); (* TEST 3 - single consumer no producer: forall q and i, push_head q i; is_empty q = false*) Test.make ~name:"push_head_not_empty" (list int) (fun lpush -> @@ -131,7 +245,7 @@ let tests_one_consumer = with Mpsc_queue.Closed -> false); (* TEST 6 - single consumer no producer: forall q and i, [close q; pop q] raises Closed <=> q is empty *) - Test.make ~name:"close_pop" (list int) (fun lpush -> + Test.make ~name:"close_pop_opt" (list int) (fun lpush -> (* Building a random queue *) let queue = Mpsc_queue.create () in List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); @@ -142,21 +256,21 @@ let tests_one_consumer = (* Testing property *) if is_empty then try - ignore (Mpsc_queue.pop queue); + ignore (Mpsc_queue.pop_opt queue); false with Mpsc_queue.Closed -> true else - try Mpsc_queue.pop queue = Some (List.hd lpush) + try Mpsc_queue.pop_opt queue = Some (List.hd lpush) with Mpsc_queue.Closed -> false); (* TEST 7 - single consumer no producer: More complex test. Maybe redondant with tests 1 to 6. - Sequentially does n [push_head] then m [pops], [close] and may call [pop] again. + Sequentially does n [push_head] then m [pop_opt], [close] and may call [pop] again. Checks : - - that closing the queue does not prevent [pop] - - [pop] order (it's a LIFO) - - [pop] on a [close]d and empty queue raises [Closed] + - that closing the queue does not prevent [pop_opt] + - [pop_opt] order (it's a LIFO) + - [pop_opt] on a [close]d and empty queue raises [Closed] *) - Test.make ~name:"pop_order" + Test.make ~name:"pop_opt_order" (pair (list int) (pair small_nat small_nat)) (fun (lpush, (npop, when_close)) -> (* Initialisation*) @@ -166,7 +280,7 @@ let tests_one_consumer = (* Sequential [push_head] *) List.iter (fun elt -> Mpsc_queue.push_head queue elt) (List.rev lpush); - (* Call [pop] [npop] times and [close] after [when_close] pops. *) + (* Call [pop_opt] [npop] times and [close] after [when_close] pops. *) let popped = extract_n queue npop when_close in let expected = @@ -187,11 +301,11 @@ let tests_one_consumer = expected = popped); (* TEST 8 - single consumer no producer: More complex test. Maybe redondant with tests 1 to 6. - Sequentially does n [push_head], followed by m [pop] and n' more [push_head]. + Sequentially does n [push_head], followed by m [pop_opt] and n' more [push_head]. Checks : - - order of [pop] and [push_head] -> LIFO + - order of [pop_opt] and [push_head] -> LIFO *) - Test.make ~name:"seq_push_pop" + Test.make ~name:"seq_push_pop_opt" (pair small_nat (pair (list int) (list int))) (fun (npop, (lpush1, lpush2)) -> (* Initialisation*) @@ -199,7 +313,7 @@ let tests_one_consumer = (* Sequential [push_head] *) List.iter (fun elt -> Mpsc_queue.push_head queue elt) lpush1; - (* Call [pop] [npop] times without closing. *) + (* Call [pop_opt] [npop] times without closing. *) let popped = extract_n queue npop (npop + 1) in (* Sequential [push_head] *) List.iter (fun elt -> Mpsc_queue.push_head queue elt) lpush2; @@ -264,9 +378,9 @@ let tests_one_consumer_one_producer = QCheck. [ (* TEST 1 - one consumer one producer: - Sequential [push] then several [pop]. - Checks [pop] order. *) - Test.make ~name:"seq_push_pop" + Sequential [push] then several [peek_opt] followed by [pop_opt]. + Checks [peek_opt] and [pop_opt] are in FIFO order. *) + Test.make ~name:"seq_push_pop_opt_peek_opt" (pair (list int) small_nat) (fun (lpush, npop) -> (* Initialization *) @@ -280,16 +394,16 @@ let tests_one_consumer_one_producer = (* Sequential test: we wait for the producer to be finished *) let () = Domain.join producer in - let popped = extract_n queue npop (npop + 1) in + let peeked, popped = extract_n_with_peek queue npop (npop + 1) in (* Testing property *) let expected = (keep_n_first npop lpush |> list_some) @ List.init (Int.max 0 (npop - List.length lpush)) (fun _ -> `None) in - popped = expected); + popped = expected && peeked = expected); (* TEST 2 - one consumer one producer: - Parallel [push] and [pop]. *) + Parallel [push], [pop_opt], [peek_opt]. *) Test.make ~name:"par_push_pop" (pair (list int) small_nat) (fun (lpush, npop) -> @@ -315,12 +429,25 @@ let tests_one_consumer_one_producer = Barrier.await barrier; (* Consumer pops. *) - let popped = extract_n queue npop (npop + 1) in + let peeked, popped = extract_n_with_peek queue npop (npop + 1) in let closed = Domain.join producer in let popped_value = List.filter (function `Some _ -> true | _ -> false) popped in + let rec check pushed peeked popped = + match (pushed, peeked, popped) with + | _, [], [] -> true + | _, `None :: peeked, `None :: popped -> check pushed peeked popped + | push :: pushed, `None :: peeked, `Some pop :: popped + when pop = push -> + check pushed peeked popped + | push :: pushed, `Some peek :: peeked, `Some pop :: popped + when pop = push && pop = peek -> + check pushed peeked popped + | _, _, _ -> false + in + (* Testing property *) (not closed) && List.length popped = npop @@ -329,7 +456,8 @@ let tests_one_consumer_one_producer = && (List.for_all (function | `Some _ | `None -> true | `Closed -> false)) - popped); + popped + && check lpush peeked popped); (* TEST 3 - one consumer one producer: Parallel [push] and [push_head]. *) Test.make ~name:"par_push_push_head" @@ -370,9 +498,9 @@ let tests_one_consumer_one_producer = = list_some (lpush_head |> List.rev) && keep_n_last (List.length lpush) all_pushed = list_some lpush); (* TEST 4 - one consumer one producer - Consumer push then close while consumer pop until the queue + Consumer push then close while consumer pop_opt until the queue is empty and closed. *) - Test.make ~name:"par_pop_push2" (list int) (fun lpush -> + Test.make ~name:"par_pop_opt_push2" (list int) (fun lpush -> (* Initialisation*) let queue = Mpsc_queue.create () in let barrier = Barrier.create 2 in @@ -480,7 +608,7 @@ let tests_one_consumer_two_producers = Checks that closing the queue prevent other producers to push and that popping at the same time works. *) - Test.make ~name:"par_push_close_pop" + Test.make ~name:"par_push_close_pop_opt" (pair (list int) (list int)) (fun (lpush1, lpush2) -> (* Initialization *) @@ -512,7 +640,7 @@ let tests_one_consumer_two_producers = let producer2 = Domain.spawn (fun () -> guard_push lpush2) in (* Waiting to make sure the producers have time to - start. However, as the consumer will [pop] until one of + start. However, as the consumer will [pop_opt] until one of the producer closes the queue, it is not a requirement to wait here. *) Barrier.await barrier; diff --git a/test/mpsc_queue/stm_mpsc_queue.ml b/test/mpsc_queue/stm_mpsc_queue.ml index 0a981991..7ec3018c 100644 --- a/test/mpsc_queue/stm_mpsc_queue.ml +++ b/test/mpsc_queue/stm_mpsc_queue.ml @@ -6,12 +6,13 @@ open Util module Mpsc_queue = Saturn.Single_consumer_queue module MPSCConf = struct - type cmd = Push of int | Pop | Push_head of int | Is_empty | Close + type cmd = Push of int | Pop | Peek | Push_head of int | Is_empty | Close let show_cmd c = match c with | Push i -> "Push " ^ string_of_int i | Pop -> "Pop" + | Peek -> "Peek" | Push_head i -> "Push_head" ^ string_of_int i | Is_empty -> "Is_empty" | Close -> "Close" @@ -35,6 +36,8 @@ module MPSCConf = struct (Gen.oneof [ Gen.return Pop; + Gen.return Peek; + Gen.map (fun i -> Push i) int_gen; Gen.map (fun i -> Push_head i) int_gen; Gen.return Is_empty; Gen.return Close; @@ -51,6 +54,7 @@ module MPSCConf = struct | Push_head i -> (is_closed, if not (is_closed && s = []) then i :: s else s) | Is_empty -> (is_closed, s) | Pop -> (is_closed, match s with [] -> s | _ :: s' -> s') + | Peek -> (is_closed, s) | Close -> (true, s) let precond _ _ = true @@ -58,7 +62,8 @@ module MPSCConf = struct let run c d = match c with | Push i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push d i) d) - | Pop -> Res (result (option int) exn, protect Mpsc_queue.pop d) + | Pop -> Res (result int exn, protect Mpsc_queue.pop d) + | Peek -> Res (result int exn, protect Mpsc_queue.peek d) | Push_head i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push_head d i) d) | Is_empty -> Res (result bool exn, protect Mpsc_queue.is_empty d) @@ -71,11 +76,12 @@ module MPSCConf = struct | Push_head _, Res ((Result (Unit, Exn), _), res) -> if is_closed && s = [] then res = Error Mpsc_queue.Closed else res = Ok () - | Pop, Res ((Result (Option Int, Exn), _), res) -> ( + | (Pop | Peek), Res ((Result (Int, Exn), _), res) -> ( match s with | [] -> - if is_closed then res = Error Mpsc_queue.Closed else res = Ok None - | x :: _ -> res = Ok (Some x)) + if is_closed then res = Error Mpsc_queue.Closed + else res = Error Mpsc_queue.Empty + | x :: _ -> res = Ok x) | Is_empty, Res ((Result (Bool, Exn), _), res) -> if is_closed && s = [] then res = Error Mpsc_queue.Closed else res = Ok (s = []) diff --git a/test/spsc_queue/qcheck_spsc_queue.ml b/test/spsc_queue/qcheck_spsc_queue.ml index 069ca398..c67dcd24 100644 --- a/test/spsc_queue/qcheck_spsc_queue.ml +++ b/test/spsc_queue/qcheck_spsc_queue.ml @@ -3,21 +3,65 @@ module Spsc_queue = Saturn.Single_prod_single_cons_queue let keep_some l = List.filter Option.is_some l |> List.map Option.get let keep_n_first n = List.filteri (fun i _ -> i < n) -let pop_n_times q n = +let pop_opt_n_times q n = let rec loop count acc = if count = 0 then acc else - let v = Spsc_queue.pop q in + let v = Spsc_queue.pop_opt q in Domain.cpu_relax (); loop (count - 1) (v :: acc) in loop n [] |> List.rev +let pop_n_times q n = + let rec loop count acc = + if count = 0 then acc + else + try + let v = Spsc_queue.pop q in + Domain.cpu_relax (); + loop (count - 1) (Some v :: acc) + with Spsc_queue.Empty -> loop (count - 1) (None :: acc) + in + loop n [] |> List.rev + let tests = [ (* TEST 1 - one producer, one consumer: Sequential pushes then pops. Checks that the behaviour is similar to one of a FIFO queue. *) + QCheck.( + Test.make ~name:"seq_pop_opt_push" + (pair (list int) small_nat) + (fun (l, npop) -> + (* Making sure we do not create a too big queue. Other + tests are checking the behaviour of a full queue.*) + let size_exponent = 8 in + let size_max = Int.shift_left 1 size_exponent in + assume (List.length l < size_max); + + (* Initialization *) + let q = Spsc_queue.create ~size_exponent in + + (* Sequential pushed : not Full exception should be + raised. *) + let not_full_queue = + try + List.iter (Spsc_queue.push q) l; + true + with Spsc_queue.Full -> false + in + + (* Consumer domain pops *) + let consumer = Domain.spawn (fun () -> pop_opt_n_times q npop) in + let pops = Domain.join consumer in + + (* Property *) + not_full_queue + && List.length pops = npop + && keep_some pops = keep_n_first (min (List.length l) npop) l)); + (* TEST 1b - one producer, one consumer: + Same than previous with pop instead of pop_opt *) QCheck.( Test.make ~name:"seq_pop_push" (pair (list int) small_nat) @@ -44,6 +88,33 @@ let tests = let consumer = Domain.spawn (fun () -> pop_n_times q npop) in let pops = Domain.join consumer in + (* Property *) + not_full_queue + && List.length pops = npop + && keep_some pops = keep_n_first (min (List.length l) npop) l)); + (* TEST 1b - one producer, one consumer: + Same than TEST1 with try_push instead of push *) + QCheck.( + Test.make ~name:"seq_pop_try_push" + (pair (list int) small_nat) + (fun (l, npop) -> + (* Making sure we do not create a too big queue. Other + tests are checking the behaviour of a full queue.*) + let size_exponent = 8 in + let size_max = Int.shift_left 1 size_exponent in + assume (List.length l < size_max); + + (* Initialization *) + let q = Spsc_queue.create ~size_exponent in + + (* Sequential pushed : [try_push] should always returns + true. *) + let not_full_queue = List.for_all (Spsc_queue.try_push q) l in + + (* Consumer domain pops *) + let consumer = Domain.spawn (fun () -> pop_opt_n_times q npop) in + let pops = Domain.join consumer in + (* Property *) not_full_queue && List.length pops = npop @@ -70,7 +141,7 @@ let tests = let consumer = Domain.spawn (fun () -> Barrier.await barrier; - pop_n_times q npop) + pop_opt_n_times q npop) in let producer = @@ -92,7 +163,7 @@ let tests = List.length popped = npop && popped_val = keep_n_first (List.length popped_val) (l @ l'))); (* TEST 3 - one producer, one consumer: - Checks that pushing too much raise exception Full. *) + Checks that pushing too many elements raise exception Full. *) QCheck.( Test.make ~name:"push_full" (list int) (fun l -> let size_exponent = 4 in @@ -110,6 +181,130 @@ let tests = (* Property *) (List.length l > size_max && is_full) || (List.length l <= size_max && not is_full))); + (* TEST 4 - one producer, one consumer: + Sequential checks that [peek_opt] read the next value. *) + QCheck.( + Test.make ~name:"seq_peek_opt" (list int) (fun l -> + let size_exponent = 10 in + let size_max = Int.shift_left 1 size_exponent in + assume (size_max > List.length l); + + (* Initialisation : pushing l in a new spsc queue. *) + let q = Spsc_queue.create ~size_exponent in + List.iter (Spsc_queue.push q) l; + + (* Test : we consecutively peek and pop and check both + matches with pushed elements. *) + let rec loop pushed = + match (pushed, Spsc_queue.peek_opt q) with + | [], None -> ( + match Spsc_queue.pop_opt q with None -> true | Some _ -> false) + | x :: pushed, Some y when x = y -> ( + match Spsc_queue.pop_opt q with + | None -> false + | Some z when y = z -> loop pushed + | _ -> false) + | _, _ -> false + in + loop l)); + (* TEST 4b - one producer, one consumer: + Same then previous one for [peek] instead of [peek_one]. *) + QCheck.( + Test.make ~name:"seq_peek" (list int) (fun l -> + let size_exponent = 10 in + let size_max = Int.shift_left 1 size_exponent in + assume (size_max > List.length l); + + (* Initialisation : pushing l in a new spsc queue. *) + let q = Spsc_queue.create ~size_exponent in + List.iter (Spsc_queue.push q) l; + + (* Test : we consecutively peek and pop and check both + matches with pushed elements. *) + let rec loop pushed = + let peeked = + try Some (Spsc_queue.peek q) with Spsc_queue.Empty -> None + in + match (pushed, peeked) with + | [], None -> ( + match Spsc_queue.pop_opt q with None -> true | Some _ -> false) + | x :: pushed, Some y when x = y -> ( + match Spsc_queue.pop_opt q with + | None -> false + | Some z when y = z -> loop pushed + | _ -> false) + | _, _ -> false + in + loop l)); + (* TEST 5 - one producer, one consumer: + Parallel test of [peek_opt] with [try_push]. *) + QCheck.( + Test.make ~name:"par_peek_opt" (list int) (fun pushed -> + let size_exponent = 10 in + let size_max = Int.shift_left 1 size_exponent in + let npush = List.length pushed in + assume (size_max > npush); + let barrier = Barrier.create 2 in + + (* Initialisation : pushing l in a new spsc queue. *) + let q = Spsc_queue.create ~size_exponent in + + (* Test : + - domain1 pushes a list of element + - in parallel, domain2 peeks then pops. *) + let domain1 = + Domain.spawn (fun () -> + Barrier.await barrier; + List.iter + (fun elt -> + Domain.cpu_relax (); + Spsc_queue.push q elt) + pushed) + in + + let domain2 = + Domain.spawn (fun () -> + let peeked = ref [] in + let popped = ref [] in + Barrier.await barrier; + for _ = 0 to npush - 1 do + Domain.cpu_relax (); + (* peek then pop *) + let peek = Spsc_queue.peek_opt q in + let pop = Spsc_queue.pop_opt q in + peeked := peek :: !peeked; + popped := pop :: !popped + done; + (!peeked, !popped)) + in + Domain.join domain1; + let peeked, popped = Domain.join domain2 in + let peeked = List.rev peeked in + let popped = List.rev popped in + + let rec check pushed peeked popped = + match (pushed, peeked, popped) with + | _, [], [] -> + (* pushed can not be empty if the consumer + finished before the producer *) + true + | _, None :: peeked, None :: popped -> + (* consumer tries to peek then pop when the queue was empty *) + check pushed peeked popped + | push :: pushed, Some peek :: peeked, Some pop :: popped + when push = pop && push = peek -> + (* consumer peeks and pops on an non-empty queue. The + peeked and the popped element must be the same. *) + check pushed peeked popped + | push :: pushed, None :: peeked, Some pop :: popped when push = pop + -> + (* consumer peeks when the queue was empty, then + producer pushes at least once and then consumer + pops. *) + check pushed peeked popped + | _, _, _ -> false + in + check pushed peeked popped)); ] let main () = diff --git a/test/spsc_queue/spsc_queue_dscheck.ml b/test/spsc_queue/spsc_queue_dscheck.ml index 85135f76..2d7af042 100644 --- a/test/spsc_queue/spsc_queue_dscheck.ml +++ b/test/spsc_queue/spsc_queue_dscheck.ml @@ -5,7 +5,7 @@ let create_test ~shift_by () = (* shift the queue, that helps testing overlap handling *) for _ = 1 to shift_by do Spsc_queue.push queue (-1); - assert (Option.is_some (Spsc_queue.pop queue)) + assert (Option.is_some (Spsc_queue.pop_opt queue)) done; (* enqueuer *) @@ -18,10 +18,12 @@ let create_test ~shift_by () = let dequeued = ref 0 in Atomic.spawn (fun () -> for _ = 1 to items_count + 1 do - match Spsc_queue.pop queue with - | None -> () - | Some v -> + let peeked = Spsc_queue.peek_opt queue in + match Spsc_queue.pop_opt queue with + | None -> assert (peeked = None) + | Some v as popped -> assert (v = !dequeued + 1); + assert (popped = peeked || peeked = None); dequeued := v done); @@ -44,7 +46,7 @@ let size_linearizes_with_1_thr () = let size = ref 0 in Atomic.spawn (fun () -> - assert (Option.is_some (Spsc_queue.pop queue)); + assert (Option.is_some (Spsc_queue.pop_opt queue)); size := Spsc_queue.size queue); Atomic.final (fun () -> Atomic.check (fun () -> 1 <= !size && !size <= 5))) diff --git a/test/spsc_queue/stm_spsc_queue.ml b/test/spsc_queue/stm_spsc_queue.ml index d581872d..43f7f429 100644 --- a/test/spsc_queue/stm_spsc_queue.ml +++ b/test/spsc_queue/stm_spsc_queue.ml @@ -6,10 +6,13 @@ open Util module Spsc_queue = Saturn.Single_prod_single_cons_queue module SPSCConf = struct - type cmd = Push of int | Pop + type cmd = Push of int | Pop | Peek let show_cmd c = - match c with Push i -> "Push " ^ string_of_int i | Pop -> "Pop" + match c with + | Push i -> "Push " ^ string_of_int i + | Pop -> "Pop" + | Peek -> "Peek" type state = int * int list type sut = int Spsc_queue.t @@ -18,12 +21,14 @@ module SPSCConf = struct let int_gen = Gen.nat in QCheck.make ~print:show_cmd (Gen.map (fun i -> Push i) int_gen) - let consumer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Pop) + let consumer_cmd _s = + QCheck.make ~print:show_cmd (Gen.oneof [ Gen.return Pop; Gen.return Peek ]) let arb_cmd _s = let int_gen = Gen.nat in QCheck.make ~print:show_cmd - (Gen.oneof [ Gen.return Pop; Gen.map (fun i -> Push i) int_gen ]) + (Gen.oneof + [ Gen.return Pop; Gen.return Peek; Gen.map (fun i -> Push i) int_gen ]) let size_exponent = 4 let max_size = Int.shift_left 1 size_exponent @@ -36,13 +41,15 @@ module SPSCConf = struct | Push i -> if n = max_size then (n, s) else (n + 1, i :: s) | Pop -> ( match List.rev s with [] -> (0, s) | _ :: s' -> (n - 1, List.rev s')) + | Peek -> (n, s) let precond _ _ = true let run c d = match c with | Push i -> Res (result unit exn, protect (fun d -> Spsc_queue.push d i) d) - | Pop -> Res (result (option int) exn, protect Spsc_queue.pop d) + | Pop -> Res (result int exn, protect Spsc_queue.pop d) + | Peek -> Res (result int exn, protect Spsc_queue.peek d) let postcond c ((n, s) : state) res = match (c, res) with @@ -51,10 +58,10 @@ module SPSCConf = struct | Error Spsc_queue.Full -> n = max_size | Ok () -> n < max_size | _ -> false) - | Pop, Res ((Result (Option Int, Exn), _), res) -> ( + | (Pop | Peek), Res ((Result (Int, Exn), _), res) -> ( match (res, List.rev s) with - | Ok None, [] -> true - | Ok (Some j), x :: _ -> x = j + | Error Spsc_queue.Empty, [] -> true + | Ok popped, x :: _ -> x = popped | _ -> false) | _, _ -> false end diff --git a/test/spsc_queue/test_spsc_queue.ml b/test/spsc_queue/test_spsc_queue.ml index e2c9d481..1e65a8ee 100644 --- a/test/spsc_queue/test_spsc_queue.ml +++ b/test/spsc_queue/test_spsc_queue.ml @@ -3,7 +3,7 @@ module Spsc_queue = Saturn.Single_prod_single_cons_queue let test_empty () = let q = Spsc_queue.create ~size_exponent:3 in - assert (Option.is_none (Spsc_queue.pop q)); + assert (Option.is_none (Spsc_queue.pop_opt q)); assert (Spsc_queue.size q == 0); print_string "test_spsc_queue_empty: ok\n" @@ -39,13 +39,13 @@ let test_parallel () = (* consumer *) let last_num = ref 0 in while !last_num < count do - match Spsc_queue.pop q with + match Spsc_queue.pop_opt q with | None -> () | Some v -> assert (v == !last_num + 1); last_num := v done; - assert (Option.is_none (Spsc_queue.pop q)); + assert (Option.is_none (Spsc_queue.pop_opt q)); assert (Spsc_queue.size q == 0); Domain.join producer; Printf.printf "test_spsc_queue_parallel: ok (transferred = %d)\n" !last_num diff --git a/test/treiber_stack/qcheck_treiber_stack.ml b/test/treiber_stack/qcheck_treiber_stack.ml index 4497d1fe..c1e78df1 100644 --- a/test/treiber_stack/qcheck_treiber_stack.ml +++ b/test/treiber_stack/qcheck_treiber_stack.ml @@ -22,12 +22,12 @@ let tests_sequential = let count = ref 0 in while not (is_empty stack) do incr count; - ignore (pop stack) + ignore (pop_opt stack) done; (* Testing property *) - pop stack = None && !count = List.length lpush); - (* TEST 3 - push, pop, check LIFO *) + pop_opt stack = None && !count = List.length lpush); + (* TEST 3 - push, pop_opt, check LIFO *) Test.make ~name:"lifo" (list int) (fun lpush -> (* Building a random stack *) let stack = create () in @@ -37,15 +37,9 @@ let tests_sequential = let insert v = out := v :: !out in for _ = 1 to List.length lpush do - match pop stack with None -> assert false | Some v -> insert v + match pop_opt stack with None -> assert false | Some v -> insert v done; - Printf.printf "------\n"; - Printf.printf "lpush: %s\n" - (List.map Int.to_string lpush |> String.concat ","); - Printf.printf "out: %s\n" - (List.map Int.to_string !out |> String.concat ","); - (* Testing property *) lpush = !out); ] @@ -54,8 +48,8 @@ let tests_one_consumer_one_producer = QCheck. [ (* TEST 1 - one consumer one producer: - Parallel [push] and [pop]. *) - Test.make ~count:10_000 ~name:"parallel" (list int) (fun lpush -> + Parallel [push] and [pop_opt]. *) + Test.make ~name:"parallel" (list int) (fun lpush -> (* Initialization *) let stack = create () in let barrier = Barrier.create 2 in @@ -69,7 +63,7 @@ let tests_one_consumer_one_producer = Barrier.await barrier; for _ = 1 to List.length lpush do - while Option.is_none (pop stack) do + while Option.is_none (pop_opt stack) do () done done; @@ -85,8 +79,8 @@ let tests_two_domains = (* TEST 1 - two domains doing multiple times one push then one pop. Parallel [push] and [pop]. *) - Test.make ~count:10_000 ~name:"parallel_pop_push" - (pair small_nat small_nat) (fun (npush1, npush2) -> + Test.make ~name:"parallel_pop_push" (pair small_nat small_nat) + (fun (npush1, npush2) -> (* Initialization *) let stack = create () in let barrier = Barrier.create 2 in @@ -99,7 +93,7 @@ let tests_two_domains = (fun elt -> push stack elt; Domain.cpu_relax (); - pop stack) + pop_opt stack) lpush in @@ -132,8 +126,8 @@ let tests_two_domains = Two domains randomly pushs and pops in parallel. They stop as soon as they have finished pushing a list of element to push. *) - Test.make ~count:10_000 ~name:"parallel_pop_push_random" - (pair small_nat small_nat) (fun (npush1, npush2) -> + Test.make ~name:"parallel_pop_push_random" (pair small_nat small_nat) + (fun (npush1, npush2) -> (* Initialization *) let stack = create () in @@ -156,7 +150,7 @@ let tests_two_domains = loop xs popped) else ( incr consecutive_pop; - let p = pop stack in + let p = pop_opt stack in loop lpush (p :: popped)) in loop lpush [] @@ -186,7 +180,7 @@ let tests_two_domains = (* Pop everything that is still on the queue *) let popped3 = let rec loop popped = - match pop stack with + match pop_opt stack with | None -> popped | Some v -> loop (v :: popped) in diff --git a/test/treiber_stack/stm_treiber_stack.ml b/test/treiber_stack/stm_treiber_stack.ml index b02bdcef..8c3386fb 100644 --- a/test/treiber_stack/stm_treiber_stack.ml +++ b/test/treiber_stack/stm_treiber_stack.ml @@ -41,14 +41,16 @@ module TSConf = struct let run c d = match c with | Push i -> Res (unit, Treiber_stack.push d i) - | Pop -> Res (option int, Treiber_stack.pop d) + | Pop -> Res (result int exn, protect Treiber_stack.pop d) | Is_empty -> Res (bool, Treiber_stack.is_empty d) let postcond c (s : state) res = match (c, res) with | Push _, Res ((Unit, _), _) -> true - | Pop, Res ((Option Int, _), res) -> ( - match s with [] -> res = None | j :: _ -> res = Some j) + | Pop, Res ((Result (Int, Exn), _), res) -> ( + match s with + | [] -> res = Error Treiber_stack.Empty + | j :: _ -> res = Ok j) | Is_empty, Res ((Bool, _), res) -> res = (s = []) | _, _ -> false end diff --git a/test/treiber_stack/treiber_stack_dscheck.ml b/test/treiber_stack/treiber_stack_dscheck.ml index 87d270c0..48b7065f 100644 --- a/test/treiber_stack/treiber_stack_dscheck.ml +++ b/test/treiber_stack/treiber_stack_dscheck.ml @@ -2,7 +2,7 @@ let drain stack = let remaining = ref 0 in while not (Treiber_stack.is_empty stack) do remaining := !remaining + 1; - assert (Option.is_some (Treiber_stack.pop stack)) + assert (Option.is_some (Treiber_stack.pop_opt stack)) done; !remaining @@ -21,7 +21,7 @@ let producer_consumer () = let popped = ref 0 in Atomic.spawn (fun () -> for _ = 1 to items_total do - match Treiber_stack.pop stack with + match Treiber_stack.pop_opt stack with | None -> () | Some _ -> popped := !popped + 1 done); @@ -51,7 +51,7 @@ let two_producers () = let rec get_items s = if Treiber_stack.is_empty s then [] else - let item = Option.get (Treiber_stack.pop s) in + let item = Option.get (Treiber_stack.pop_opt s) in item :: get_items s in let items = get_items stack in @@ -81,7 +81,7 @@ let two_consumers () = Atomic.spawn (fun () -> for _ = 1 to items_total / 2 do (* even nums belong to thr 1, odd nums to thr 2 *) - list := Option.get (Treiber_stack.pop stack) :: !list + list := Option.get (Treiber_stack.pop_opt stack) :: !list done) |> ignore) lists; @@ -117,7 +117,7 @@ let two_domains () = (fun elt -> (* even nums belong to thr 1, odd nums to thr 2 *) Treiber_stack.push stack elt; - lpop := Option.get (Treiber_stack.pop stack) :: !lpop) + lpop := Option.get (Treiber_stack.pop_opt stack) :: !lpop) lpush) |> ignore) lists; @@ -154,8 +154,8 @@ let two_domains_more_pop () = List.iter (fun elt -> Treiber_stack.push stack elt; - lpop := Treiber_stack.pop stack :: !lpop; - lpop := Treiber_stack.pop stack :: !lpop) + lpop := Treiber_stack.pop_opt stack :: !lpop; + lpop := Treiber_stack.pop_opt stack :: !lpop) lpush) |> ignore) lists;