Skip to content

Commit

Permalink
Merge pull request #92 from lyrm/more_functions
Browse files Browse the repository at this point in the history
`pop_opt`, `peek`, `peek_opt` (and if possible `try_push`) functions for all queues.
  • Loading branch information
lyrm authored Nov 5, 2023
2 parents 49e4e59 + 6699df5 commit 5b5d644
Show file tree
Hide file tree
Showing 24 changed files with 850 additions and 170 deletions.
2 changes: 1 addition & 1 deletion bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let run () =
start_time)
in
for _ = 1 to item_count do
while Option.is_none (Spsc_queue.pop queue) do
while Option.is_none (Spsc_queue.pop_opt queue) do
()
done
done;
Expand Down
43 changes: 24 additions & 19 deletions src_lockfree/michael_scott_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ let create () =

let is_empty { head; _ } = Atomic.get (Atomic.get head) == Nil

let pop { head; _ } =
exception Empty

let pop_opt { head; _ } =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
Expand All @@ -45,6 +47,27 @@ let pop { head; _ } =
in
loop ()

let pop { head; _ } =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> raise Empty
| Next (value, next) when Atomic.compare_and_set head old_head next -> value
| _ ->
Backoff.once b;
loop ()
in
loop ()

let peek_opt { head; _ } =
let old_head = Atomic.get head in
match Atomic.get old_head with Nil -> None | Next (value, _) -> Some value

let peek { head; _ } =
let old_head = Atomic.get head in
match Atomic.get old_head with Nil -> raise Empty | Next (value, _) -> value

let rec fix_tail tail new_tail =
let old_tail = Atomic.get tail in
if
Expand All @@ -66,24 +89,6 @@ let push { tail; _ } value =
if not (Atomic.compare_and_set tail old_tail new_tail) then
fix_tail tail new_tail

let clean_until { head; _ } f =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> ()
| Next (value, next) ->
if not (f value) then
if Atomic.compare_and_set head old_head next then (
Backoff.reset b;
loop ())
else (
Backoff.once b;
loop ())
else ()
in
loop ()

type 'a cursor = 'a node

let snapshot { head; _ } = Atomic.get (Atomic.get head)
Expand Down
24 changes: 18 additions & 6 deletions src_lockfree/michael_scott_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,26 @@ val is_empty : 'a t -> bool
val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. *)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q], or
exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q].
@raise Empty if [q] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)

val clean_until : 'a t -> ('a -> bool) -> unit
(** [clean_until q f] drops the prefix of the queue until the element [e],
where [f e] is [true]. If no such element exists, then the queue is
emptied. *)
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q].
@raise Empty if [q] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q], or
returns [None] if the queue is empty. *)

type 'a cursor
(** The type of cursor. *)
Expand Down
29 changes: 28 additions & 1 deletion src_lockfree/mpsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ let rec close (t : 'a t) =
(* Retry *)
close t)

let pop t =
let pop_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Expand All @@ -115,6 +115,33 @@ let pop t =
(* So it can be GC'd *)
Some v)

exception Empty

let pop t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> raise Empty)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
v)

let peek_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value)

let peek t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value)

let is_empty t =
Node.fold (Atomic.get t.head.next)
~none:(fun () -> true)
Expand Down
51 changes: 38 additions & 13 deletions src_lockfree/mpsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,58 @@ exception Closed
val create : unit -> 'a t
(** [create ()] returns a new empty queue. *)

val is_empty : 'a t -> bool
(** [is_empty q] is [true] if calling [pop] would return [None].
@raise Closed if [q] is closed and empty. *)

val close : 'a t -> unit
(** [close q] marks [q] as closed, preventing any further items from
being pushed by the producers (i.e. with {!push}).
@raise Closed if [q] has already been closed. *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. This
(** [push q v] adds the element [v] at the end of the queue [q]. This
can be used safely by multiple producer domains, in parallel with
the other operations.
@raise Closed if [q] is closed. *)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q] or
(** {2 Consumer-only functions} *)

exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q].
@raise Empty if [q] is empty.
@raise Closed if [q] is closed and empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q] or
returns [None] if the queue is empty.
@raise Closed if [q] is closed and empty. *)

val push_head : 'a t -> 'a -> unit
(** [push_head q v] adds the element [v] at the head of the queue
[q]. This can only be used by the consumer (if run in parallel
with {!pop}, the item might be skipped).
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q].
@raise Empty if [q] is empty.
@raise Closed if [q] is closed and empty. *)

val is_empty : 'a t -> bool
(** [is_empty q] is [true] if calling [pop] would return [None].
val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q] or
returns [None] if the queue is empty.
@raise Closed if [q] is closed and empty. *)

val close : 'a t -> unit
(** [close q] marks [q] as closed, preventing any further items from
being pushed by the producers (i.e. with {!push}).
val push_head : 'a t -> 'a -> unit
(** [push_head q v] adds the element [v] at the head of the queue
[q]. This can only be used by the consumer (if run in parallel
with {!pop}, the item might be skipped).
@raise Closed if [q] has already been closed. *)
@raise Closed if [q] is closed and empty. *)
41 changes: 41 additions & 0 deletions src_lockfree/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,31 @@ let push { array; head; tail; mask; _ } element =
Array.set array (tail_val land mask) (Some element);
Atomic.set tail (tail_val + 1))

let try_push { array; head; tail; mask; _ } element =
let size = mask + 1 in
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val + size == tail_val then false
else (
Array.set array (tail_val land mask) (Some element);
Atomic.set tail (tail_val + 1);
true)

exception Empty

let pop { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then raise Empty
else
let index = head_val land mask in
let v = Array.get array index in
(* allow gc to collect it *)
Array.set array index None;
Atomic.set head (head_val + 1);
match v with None -> assert false | Some v -> v

let pop_opt { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then None
Expand All @@ -61,4 +85,21 @@ let pop { array; head; tail; mask; _ } =
assert (Option.is_some v);
v

let peek_opt { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then None
else
let v = Array.get array @@ (head_val land mask) in
assert (Option.is_some v);
v

let peek { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then raise Empty
else
let v = Array.get array @@ (head_val land mask) in
match v with None -> assert false | Some v -> v

let size { head; tail; _ } = Atomic.get tail - Atomic.get head
53 changes: 43 additions & 10 deletions src_lockfree/spsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,59 @@ type 'a t
(** Type of single-producer single-consumer non-resizable domain-safe
queue that works in FIFO order. *)

exception Full
(** Raised when {!push} is applied to a full queue. *)

val create : size_exponent:int -> 'a t
(** [create ~size_exponent:int] returns a new queue of maximum size
[2^size_exponent] and initially empty. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when called
from either consumer or producer domain. Otherwise, it is safe to call but
provides only an *indication* of the size of the structure. *)

(** {1 Producer functions} *)

exception Full
(** Raised when {!push} is applied to a full queue. *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. This
method can be used by at most one domain at the time.
@raise [Full] if the queue is full.
@raise Full if the queue is full.
*)

val try_push : 'a t -> 'a -> bool
(** [try_push q v] tries to add the element [v] at the end of the
queue [q]. It fails it the queue [q] is full. This method can be
used by at most one domain at the time.
*)

(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q]. This
method can be used by at most one domain at the time.
@raise Empty if [q] is empty.
*)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q], or
val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. This method can be used by
at most one domain at the time. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when called
from either consumer or producer domain. Otherwise, it is safe to call but
provides only an *indication* of the size of the structure. *)
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q]. This method can be
used by at most one domain at the time.
@raise Empty if [q] is empty.
*)

val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q], or [None]
if the queue is empty. This method can be used by at most one
domain at the time.
*)
24 changes: 24 additions & 0 deletions src_lockfree/treiber_stack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,31 @@ let push q v =
in
loop b

exception Empty

let pop q =
let rec loop b =
let s = Atomic.get q.head in
match s with
| Nil -> raise Empty
| Next (v, next) ->
if Atomic.compare_and_set q.head s next then v
else (
Backoff.once b;
loop b)
in

let s = Atomic.get q.head in
match s with
| Nil -> raise Empty
| Next (v, next) ->
if Atomic.compare_and_set q.head s next then v
else
let b = Backoff.create () in
Backoff.once b;
loop b

let pop_opt q =
let rec loop b =
let s = Atomic.get q.head in
match s with
Expand Down
17 changes: 14 additions & 3 deletions src_lockfree/treiber_stack.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ val is_empty : 'a t -> bool
val push : 'a t -> 'a -> unit
(** [push s v] adds the element [v] at the top of stack [s]. *)

val pop : 'a t -> 'a option
(** [pop a] removes and returns the topmost element in the
stack [s], or returns [None] if the stack is empty. *)
exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop s] removes and returns the topmost element in the
stack [s].
@raise Empty if [a] is empty.
*)

val pop_opt : 'a t -> 'a option
(** [pop_opt s] removes and returns the topmost element in the
stack [s], or returns [None] if the stack is empty.
*)
Loading

0 comments on commit 5b5d644

Please sign in to comment.