-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
129 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,106 +1,175 @@ | ||
module Atomic = Dscheck.TracedAtomic | ||
module Queue = Mpsc_queue | ||
|
||
let drain queue = | ||
let remaining = ref 0 in | ||
while not (Mpsc_queue.is_empty queue) do | ||
remaining := !remaining + 1; | ||
assert (Option.is_some (Mpsc_queue.pop_opt queue)) | ||
done; | ||
!remaining | ||
|
||
let producer_consumer () = | ||
let rec pop_until_empty acc = | ||
match Queue.pop_opt queue with | ||
| None -> acc |> List.rev | ||
| Some v -> pop_until_empty (v :: acc) | ||
in | ||
pop_until_empty [] | ||
|
||
let push_pop () = | ||
Atomic.trace (fun () -> | ||
let queue = Mpsc_queue.create () in | ||
let queue = Queue.create () in | ||
let items_total = 4 in | ||
|
||
(* producer *) | ||
Atomic.spawn (fun () -> | ||
for _ = 1 to items_total - 1 do | ||
Mpsc_queue.push queue 0 | ||
for i = 1 to items_total do | ||
Queue.push queue i | ||
done); | ||
|
||
(* consumer *) | ||
let popped = ref 0 in | ||
let popped = ref [] in | ||
Atomic.spawn (fun () -> | ||
Mpsc_queue.push_head queue 1; | ||
for _ = 1 to items_total do | ||
match Mpsc_queue.pop_opt queue with | ||
| None -> () | ||
| Some _ -> popped := !popped + 1 | ||
begin | ||
match Queue.pop_opt queue with | ||
| None -> () | ||
| Some v -> popped := v :: !popped | ||
end; | ||
(* Ensure is_empty does not interfere with other functions *) | ||
Queue.is_empty queue |> ignore | ||
done); | ||
|
||
(* checks*) | ||
Atomic.final (fun () -> | ||
Atomic.check (fun () -> | ||
let remaining = drain queue in | ||
!popped + remaining = items_total))) | ||
let pushed = List.init items_total (fun x -> x + 1) in | ||
List.sort Int.compare (!popped @ remaining) = pushed))) | ||
|
||
let producer_consumer_peek () = | ||
let is_empty () = | ||
Atomic.trace (fun () -> | ||
let queue = Mpsc_queue.create () in | ||
let items_total = 1 in | ||
let pushed = List.init items_total (fun i -> i) in | ||
let queue = Queue.create () in | ||
|
||
(* producer *) | ||
Atomic.spawn (fun () -> Queue.push queue 1); | ||
|
||
(* consumer *) | ||
let res = ref false in | ||
Atomic.spawn (fun () -> | ||
List.iter (fun elt -> Mpsc_queue.push queue elt) pushed); | ||
match Queue.pop_opt queue with | ||
| None -> res := true | ||
| Some _ -> res := Queue.is_empty queue); | ||
|
||
(* checks*) | ||
Atomic.final (fun () -> Atomic.check (fun () -> !res))) | ||
|
||
let push_drop () = | ||
Atomic.trace (fun () -> | ||
let queue = Queue.create () in | ||
let items_total = 4 in | ||
|
||
(* producer *) | ||
Atomic.spawn (fun () -> | ||
for i = 1 to items_total do | ||
Queue.push queue i | ||
done); | ||
|
||
(* consumer *) | ||
let popped = ref [] in | ||
let peeked = ref [] in | ||
let dropped = ref 0 in | ||
Atomic.spawn (fun () -> | ||
for _ = 1 to items_total do | ||
peeked := Mpsc_queue.peek_opt queue :: !peeked; | ||
popped := Mpsc_queue.pop_opt queue :: !popped | ||
match Queue.drop_exn queue with | ||
| () -> dropped := !dropped + 1 | ||
| exception Queue.Empty -> () | ||
done); | ||
|
||
(* checks*) | ||
Atomic.final (fun () -> | ||
Atomic.check (fun () -> | ||
let rec check pushed peeked popped = | ||
match (pushed, peeked, popped) with | ||
| _, [], [] -> true | ||
| _, None :: peeked, None :: popped -> | ||
check pushed peeked popped | ||
| push :: pushed, None :: peeked, Some pop :: popped | ||
when push = pop -> | ||
check pushed peeked popped | ||
| push :: pushed, Some peek :: peeked, Some pop :: popped | ||
when push = peek && push = pop -> | ||
check pushed peeked popped | ||
| _, _, _ -> false | ||
in | ||
check pushed (List.rev !peeked) (List.rev !popped)); | ||
Atomic.check (fun () -> | ||
let remaining = drain queue in | ||
let popped = List.filter Option.is_some !popped in | ||
List.length popped + remaining = items_total))) | ||
remaining | ||
= List.init (items_total - !dropped) (fun x -> x + !dropped + 1)))) | ||
|
||
let two_producers () = | ||
let push_push () = | ||
Atomic.trace (fun () -> | ||
let queue = Mpsc_queue.create () in | ||
let items_total = 4 in | ||
let queue = Queue.create () in | ||
let items_total = 6 in | ||
|
||
(* producers *) | ||
for _ = 1 to 2 do | ||
(* two producers *) | ||
for i = 0 to 1 do | ||
Atomic.spawn (fun () -> | ||
for _ = 1 to items_total / 2 do | ||
Mpsc_queue.push queue 0 | ||
for j = 1 to items_total / 2 do | ||
(* even nums belong to thr 1, odd nums to thr 2 *) | ||
Queue.push queue (i + (j * 2)) | ||
done) | ||
done; | ||
|
||
(* checks*) | ||
Atomic.final (fun () -> | ||
let items = drain queue in | ||
|
||
(* got the same number of items out as in *) | ||
Atomic.check (fun () -> items_total = List.length items); | ||
|
||
(* they are in fifo order *) | ||
let odd, even = List.partition (fun v -> v mod 2 == 0) items in | ||
|
||
Atomic.check (fun () -> List.sort Int.compare odd = odd); | ||
Atomic.check (fun () -> List.sort Int.compare even = even))) | ||
|
||
let two_producers_one_consumer () = | ||
Atomic.trace (fun () -> | ||
let ninit_push = 3 in | ||
let queue = Queue.of_list (List.init ninit_push (fun i -> i + 1)) in | ||
let nproducers = 3 in | ||
let ntotal = (2 * nproducers) + ninit_push in | ||
|
||
(* producer 1 *) | ||
Atomic.spawn (fun () -> | ||
for i = 1 to nproducers do | ||
Queue.push queue (i + ninit_push) | ||
done); | ||
|
||
(* producer 2 *) | ||
Atomic.spawn (fun () -> | ||
for i = 1 to nproducers do | ||
Queue.push queue (i + ninit_push + nproducers) | ||
done); | ||
|
||
(* consumer *) | ||
let popped = ref [] in | ||
Atomic.spawn (fun () -> | ||
for _ = 1 to 5 do | ||
match Queue.pop_opt queue with | ||
| None -> () | ||
| Some v -> popped := v :: !popped | ||
done); | ||
|
||
(* checks *) | ||
Atomic.final (fun () -> | ||
Atomic.check (fun () -> | ||
let remaining = drain queue in | ||
remaining = items_total))) | ||
let pushed = List.init ntotal (fun i -> i + 1) in | ||
List.sort Int.compare (!popped @ remaining) = pushed); | ||
|
||
Atomic.check (fun () -> | ||
let pushed_1 = | ||
List.filter (fun x -> x <= ninit_push + nproducers) !popped | ||
in | ||
let pushed_2 = | ||
List.filter (fun x -> x > ninit_push + nproducers) !popped | ||
in | ||
|
||
List.sort Int.compare pushed_1 = List.rev pushed_1 | ||
&& List.sort Int.compare pushed_2 = List.rev pushed_2))) | ||
|
||
let tests = | ||
let open Alcotest in | ||
[ | ||
( "basic", | ||
[ | ||
test_case "1-producer-1-consumer" `Slow push_pop; | ||
test_case "2-domains-is_empty" `Slow is_empty; | ||
test_case "1-push-1-drop" `Slow push_drop; | ||
test_case "2-producers" `Slow push_push; | ||
test_case "2-producers-1-consumer" `Slow two_producers_one_consumer; | ||
] ); | ||
] | ||
|
||
let () = | ||
let open Alcotest in | ||
run "mpsc_queue_dscheck" | ||
[ | ||
( "basic", | ||
[ | ||
test_case "1-producer-1-consumer" `Slow producer_consumer; | ||
test_case "1-producer-1-consumer-peek" `Slow producer_consumer_peek; | ||
test_case "2-producers" `Slow two_producers; | ||
] ); | ||
] | ||
run "dscheck_bounded_queue" tests |