Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bounded queue #160

Merged
merged 19 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions bench/bench_bounded_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
open Multicore_bench

module type BENCH = sig
val run_suite : budgetf:float -> Metric.t list
end

module Make (Bounded_queue : Bounded_queue_intf.BOUNDED_QUEUE) : BENCH = struct
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Bounded_queue.create () in

let op push =
if push then Bounded_queue.try_push t 101 |> ignore
else Bounded_queue.pop_opt t |> ignore
in

let init _ =
assert (Bounded_queue.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message"
~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Bounded_queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Bounded_queue.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Bounded_queue.try_push t i |> ignore
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then begin
match Bounded_queue.pop_opt t with
| None ->
Domain.cpu_relax ();
loop n
| Some _ -> loop (n - 1)
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
end

module Safe = Make (Saturn.Bounded_queue)
module Unsafe = Make (Saturn.Bounded_queue_unsafe)
5 changes: 5 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ let () =
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(package saturn))

(rule
(action
(copy ../src/bounded_queue/bounded_queue_intf.mli bounded_queue_intf.ml))
(package saturn))

(test
(package saturn)
(name main)
Expand Down
2 changes: 2 additions & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ let benchmarks =
[
("Saturn Queue", Bench_queue.Safe.run_suite);
("Saturn Queue_unsafe", Bench_queue.Unsafe.run_suite);
("Saturn Bounded_Queue", Bench_bounded_queue.Safe.run_suite);
("Saturn Bounded_Queue_unsafe", Bench_bounded_queue.Unsafe.run_suite);
("Saturn Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn Size", Bench_size.run_suite);
("Saturn Skiplist", Bench_skiplist.run_suite);
Expand Down
190 changes: 190 additions & 0 deletions src/bounded_queue/bounded_queue.body.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
(* Copyright (c) 2023-2024, Vesa Karvonen <[email protected]>
Copyright (c) 2024, Carine Morel <[email protected]>

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE. *)

let[@inline] get_capacity (Node r : (_, [< `Node ]) node) = r.capacity

let[@inline] set_capacity (Node r : (_, [< `Node ]) node) value =
r.capacity <- value

let[@inline] get_counter (Node r : (_, [< `Node ]) node) = r.counter

let[@inline] set_counter (Node r : (_, [< `Node ]) node) value =
r.counter <- value

type 'a t = {
head : ('a, [ `Node ]) node Atomic.t;
capacity : int;
tail : ('a, [ `Node ]) node Atomic.t;
}

exception Full
exception Empty

let create ?(capacity = Int.max_int) () =
let value = Obj.magic () in
let node = make_node ~value ~capacity ~counter:0 Null in
let head = Atomic.make_contended node and tail = Atomic.make_contended node in
{ head; capacity; tail }

let of_list_exn ?(capacity = Int.max_int) list : 'a t =
let len = List.length list in
if len > capacity then raise Full
else
match list |> List.rev with
| [] -> create ~capacity ()
| hd :: tl ->
let tail =
make_node ~value:hd ~capacity:(capacity - len - 1) ~counter:len Null
in
let _, _, next =
List.fold_left
(fun (counter, capacity, next) value ->
( counter - 1,
capacity + 1,
make_node ~value ~capacity ~counter next ))
(len - 1, capacity - len, tail)
tl
in
let head =
Atomic.make_contended
(make_node ~value:(Obj.magic ()) ~capacity ~counter:0 next)
in

{ head; capacity; tail = Atomic.make tail }

let capacity_of t = t.capacity

let is_empty t =
let head = Atomic.get t.head in
fenceless_get_next head == Link Null

let is_full t =
let tail = Atomic.get t.tail in
let capacity = get_capacity tail in
if capacity = 0 then begin
let old_head = Atomic.get t.head in
let length = get_counter tail - get_counter old_head in
let capacity = t.capacity - length in
if 0 < capacity then begin
set_capacity tail capacity;
false
end
else true
end
else false

let rec snapshot t =
let head = Atomic.get t.head in
let tail = fenceless_get t.tail in
match fenceless_get_next tail with
| Link (Node _ as node) ->
Atomic.compare_and_set t.tail tail node |> ignore;
snapshot t
| Link Null -> if Atomic.get t.head != head then snapshot t else (head, tail)

let length t =
let head, tail = snapshot t in
get_counter tail - get_counter head

(* *)

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let rec peek_as : type a r. a t -> (a, r) poly -> r =
fun t poly ->
let old_head = Atomic.get t.head in
match fenceless_get_next old_head with
| Link Null -> ( match poly with Value -> raise Empty | Option -> None)
| Link (Node r) -> (
let value = r.value in
if Atomic.get t.head != old_head then peek_as t poly
else match poly with Value -> value | Option -> Some value)

(* *)

type ('a, _) poly2 =
| Option : ('a, 'a option) poly2
| Value : ('a, 'a) poly2
| Unit : ('a, unit) poly2

let rec pop_as : type a r. a t -> Backoff.t -> (a, r) poly2 -> r =
fun t backoff poly ->
let old_head = Atomic.get t.head in
match fenceless_get_next old_head with
| Link Null -> (
match poly with Option -> None | Value | Unit -> raise Empty)
| Link (Node node as new_head) ->
if Atomic.compare_and_set t.head old_head new_head then begin
let value = node.value in
node.value <- Obj.magic ();
match poly with Option -> Some value | Value -> value | Unit -> ()
end
else pop_as t (Backoff.once backoff) poly

(* *)

let rec fix_tail tail new_tail =
let old_tail = Atomic.get tail in
if
get_next new_tail == Link Null
&& not (Atomic.compare_and_set tail old_tail new_tail)
then fix_tail tail new_tail

(* *)

type _ mono = Bool : bool mono | Unit : unit mono

let rec push_as :
type r. 'a t -> ('a, [ `Node ]) node -> ('a, [ `Node ]) node -> r mono -> r
=
fun t new_node old_tail mono ->
let capacity = get_capacity old_tail in
if capacity = 0 then begin
let old_head = Atomic.get t.head in
let length = get_counter old_tail - get_counter old_head in
let capacity = t.capacity - length in
if 0 < capacity then begin
set_capacity old_tail capacity;
push_as t new_node old_tail mono
end
else match mono with Unit -> raise Full | Bool -> false
end
else begin
set_capacity new_node (capacity - 1);
set_counter new_node (get_counter old_tail + 1);
if not (compare_and_set_next old_tail (Link Null) (Link new_node)) then
push_as t new_node (link_as_node (get_next old_tail)) mono
else begin
if not (Atomic.compare_and_set t.tail old_tail new_node) then
fix_tail t.tail new_node;
match mono with Unit -> () | Bool -> true
end
end

(* *)

let[@inline] peek_opt t = peek_as t Option
let[@inline] peek_exn t = peek_as t Value
let[@inline] pop_opt t = pop_as t Backoff.default Option
let[@inline] pop_exn t = pop_as t Backoff.default Value
let[@inline] drop_exn t = pop_as t Backoff.default Unit

let[@inline] try_push t value =
let new_node = make_node ~value ~capacity:0 ~counter:0 Null in
push_as t new_node (Atomic.get t.tail) Bool

let[@inline] push_exn t value =
let new_node = make_node ~value ~capacity:0 ~counter:0 Null in
push_as t new_node (Atomic.get t.tail) Unit
28 changes: 28 additions & 0 deletions src/bounded_queue/bounded_queue.head_safe.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
type ('a, _) node =
| Null : ('a, [> `Null ]) node
| Node : {
next : 'a link Atomic.t;
mutable value : 'a;
mutable capacity : int;
mutable counter : int;
}
-> ('a, [> `Node ]) node

and 'a link = Link : ('a, [< `Null | `Node ]) node -> 'a link [@@unboxed]

let[@inline] make_node ~value ~capacity ~counter next =
Node { next = Atomic.make (Link next); value; capacity; counter }

let[@inline] link_as_node (Link n) : (_, [< `Node ]) node =
match n with Null -> assert false | Node _ as node -> node

let[@inline] get_next (Node node : (_, [< `Node ]) node) = Atomic.get node.next

let[@inline] fenceless_get_next (Node node : (_, [< `Node ]) node) =
Atomic.get node.next

let[@inline] compare_and_set_next (Node node : (_, [< `Node ]) node) before
after =
Atomic.compare_and_set node.next before after

let fenceless_get = Atomic.get
31 changes: 31 additions & 0 deletions src/bounded_queue/bounded_queue.head_unsafe.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module Atomic = Multicore_magic.Transparent_atomic

type ('a, _) node =
| Null : ('a, [> `Null ]) node
| Node : {
mutable _next : 'a link;
mutable value : 'a;
mutable capacity : int;
mutable counter : int;
}
-> ('a, [> `Node ]) node

and 'a link = Link : ('a, [< `Null | `Node ]) node -> 'a link [@@unboxed]

let[@inline] make_node ~value ~capacity ~counter next =
Node { _next = Link next; value; capacity; counter }

external link_as_node : 'a link -> ('a, [ `Node ]) node = "%identity"

external next_as_atomic : ('a, [< `Node ]) node -> 'a link Atomic.t
= "%identity"

let[@inline] get_next node = Atomic.get (next_as_atomic node)

let[@inline] fenceless_get_next node =
Atomic.fenceless_get (next_as_atomic node)

let[@inline] compare_and_set_next node before after =
Atomic.compare_and_set (next_as_atomic node) before after

let fenceless_get = Atomic.fenceless_get
1 change: 1 addition & 0 deletions src/bounded_queue/bounded_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include Bounded_queue_intf.BOUNDED_QUEUE
Loading
Loading