Skip to content

Commit

Permalink
Merge pull request #647 from talex5/new-lfqueue
Browse files Browse the repository at this point in the history
Faster and simpler Lf_queue
  • Loading branch information
talex5 authored Nov 19, 2023
2 parents bdc4282 + 5ab8302 commit 8a71a90
Showing 1 changed file with 69 additions and 131 deletions.
200 changes: 69 additions & 131 deletions lib_eio/utils/lf_queue.ml
Original file line number Diff line number Diff line change
@@ -1,142 +1,80 @@
(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.
See: "Implementing lock-free queues"
https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf
It is simplified slightly because we don't need multiple consumers.
Therefore [head] is not atomic. *)
Based on Vesa Karvonen's examaple at:
https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml
*)

exception Closed

module Node : sig
type 'a t = {
next : 'a opt Atomic.t;
mutable value : 'a;
}
and +'a opt

val make : next:'a opt -> 'a -> 'a t

val none : 'a opt
(** [t.next = none] means that [t] is currently the last node. *)

val closed : 'a opt
(** [t.next = closed] means that [t] will always be the last node. *)

val some : 'a t -> 'a opt
val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b
end = struct
(* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)

type +'a opt (* special | 'a t *)

type 'a t = {
next : 'a opt Atomic.t;
mutable value : 'a;
}

type special =
| Nothing
| Closed

let none : 'a. 'a opt = Obj.magic Nothing
let closed : 'a. 'a opt = Obj.magic Closed
let some (t : 'a t) : 'a opt = Obj.magic t

let fold (opt : 'a opt) ~none:n ~some =
if opt == none then n ()
else if opt == closed then raise Closed
else some (Obj.magic opt : 'a t)

let make ~next value = { value; next = Atomic.make next }
end

(* A list where the end indicates whether the queue is closed. *)
type 'a clist =
| (::) of 'a * 'a clist
| Open
| Closed

(* [rev_append l1 l2] is like [rev l1 @ l2] *)
let rec rev_append l1 l2 =
match l1 with
| a :: l -> rev_append l (a :: l2)
| Open -> l2
| Closed -> assert false

let[@tail_mod_cons] rec ( @ ) l1 l2 =
match l1 with
| h1 :: tl -> h1 :: (tl @ l2)
| Open -> l2
| Closed -> assert false

(* The queue contains [head @ rev tail].
If [tail] is non-empty then it ends in [Open]. *)
type 'a t = {
tail : 'a Node.t Atomic.t;
mutable head : 'a Node.t;
mutable head : 'a clist;
tail : 'a clist Atomic.t;
}
(* [head] is the last node dequeued (or a dummy node, initially).
[head.next] gives the real first node, if not [Node.none].
If [tail.next] is [none] then it is the last node in the queue.
Otherwise, [tail.next] is a node that is closer to the tail. *)

let push t x =
let node = Node.(make ~next:none) x in
let rec aux () =
let p = Atomic.get t.tail in
(* While [p.next == none], [p] is the last node in the queue. *)
if Atomic.compare_and_set p.next Node.none (Node.some node) then (
(* [node] has now been added to the queue (and possibly even consumed).
Update [tail], unless someone else already did it for us. *)
ignore (Atomic.compare_and_set t.tail p node : bool)
) else (
(* Someone else added a different node first ([p.next] is not [none]).
Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false)
~some:(fun p_next ->
ignore (Atomic.compare_and_set t.tail p p_next : bool);
aux ()
)
)
in
aux ()

let rec push_head t x =
let p = t.head in
let next = Atomic.get p.next in
if next == Node.closed then raise Closed;
let node = Node.make ~next x in
if Atomic.compare_and_set p.next next (Node.some node) then (
(* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)
if next == Node.none then (
ignore (Atomic.compare_and_set t.tail p node : bool);
) else (
(* If the queue wasn't empty, there's nothing to do.
Either tail isn't at head or there is some [push] thread working to update it.
Either [push] will update it directly to the new tail, or will update it to [node]
and then retry. Either way, it ends up at the real tail. *)
)
) else (
(* Someone else changed it first. This can only happen if the queue was empty. *)
assert (next == Node.none);
push_head t x
)

let rec close (t:'a t) =
(* Mark the tail node as final. *)
let p = Atomic.get t.tail in
if not (Atomic.compare_and_set p.next Node.none Node.closed) then (
(* CAS failed because [p] is no longer the tail (or is already closed). *)
Node.fold (Atomic.get p.next)
~none:(fun () -> assert false) (* Can't switch from another state to [none] *)
~some:(fun p_next ->
(* Make [tail] more up-to-date if it hasn't changed already *)
ignore (Atomic.compare_and_set t.tail p p_next : bool);
(* Retry *)
close t
)
)

let pop t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> None)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic (); (* So it can be GC'd *)
Some v
)
let rec push t x =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = x :: before in
if not (Atomic.compare_and_set t.tail before after) then
push t x

let push_head t x =
match t.head with
| Closed -> raise Closed
| before -> t.head <- x :: before

let rec pop t =
match t.head with
| x :: xs -> t.head <- xs; Some x
| Closed -> raise Closed
| Open ->
(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)
match Atomic.exchange t.tail Open with
| Closed -> assert false
| Open -> None (* Optimise the common case *)
| tail ->
t.head <- rev_append tail Open;
pop t

let close t =
match Atomic.exchange t.tail Closed with
| Closed -> invalid_arg "Lf_queue already closed!"
| xs -> t.head <- t.head @ rev_append xs Closed

let is_empty t =
Node.fold (Atomic.get t.head.next)
~none:(fun () -> true)
~some:(fun _ -> false)

let create () =
let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in
{ tail = Atomic.make dummy; head = dummy }
match t.head with
| _ :: _ -> false
| Closed -> raise Closed
| Open ->
match Atomic.get t.tail with
| _ :: _ -> false
| _ -> true

let create () = {
head = Open;
tail = Atomic.make Open;
}

0 comments on commit 8a71a90

Please sign in to comment.