Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uniformization of Michael Scott queue with other queues #167

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let () =

(rule
(action
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(copy ../src/michael_scott_queue/michael_scott_queue_intf.mli michael_scott_queue_intf.ml))
(package saturn))

(rule
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let () =
(library
(name saturn)
(public_name saturn)
(modules_without_implementation htbl_intf bounded_queue_intf spsc_queue_intf)
(modules_without_implementation htbl_intf bounded_queue_intf spsc_queue_intf michael_scott_queue_intf)
(libraries backoff multicore-magic |}
^ maybe_threads
^ {| ))
Expand Down
8 changes: 8 additions & 0 deletions src/michael_scott_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(mdx
(package saturn)
(enabled_if
(and
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files michael_scott_queue_intf.mli))
34 changes: 29 additions & 5 deletions src/michael_scott_queue/michael_scott_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,62 @@ let create () =

let is_empty { head; _ } = Atomic.get (Atomic.get head) == Nil

let of_list list =
let tail_ = Atomic.make Nil in
let tail = Atomic.make_contended tail_ in
let rec build_next = function
| [] -> tail_
| hd :: tl -> Atomic.make @@ Next (hd, build_next tl)
in
let head = Atomic.make_contended @@ build_next list in
{ head; tail }

(* *)

exception Empty

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

let rec pop_as :
type a r. a node Atomic.t Atomic.t -> Backoff.t -> (a, r) poly -> r =
fun head backoff poly ->
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next (value, next) ->
if Atomic.compare_and_set head old_head next then begin
match poly with Value -> value | Option -> Some value
match poly with Value -> value | Option -> Some value | Unit -> ()
end
else
let backoff = Backoff.once backoff in
pop_as head backoff poly

let pop_exn t = pop_as t.head Backoff.default Value
let pop_opt t = pop_as t.head Backoff.default Option
let drop_exn t = pop_as t.head Backoff.default Unit

(* *)

let peek_as : type a r. a node Atomic.t Atomic.t -> (a, r) poly -> r =
fun head poly ->
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next (value, _) -> (
match poly with Value -> value | Option -> Some value)
match poly with Value -> value | Option -> Some value | Unit -> ())

let peek_opt t = peek_as t.head Option
let peek_exn t = peek_as t.head Value

(* *)

let rec fix_tail tail new_tail =
let old_tail = Atomic.get tail in
if
Expand Down
61 changes: 0 additions & 61 deletions src/michael_scott_queue/michael_scott_queue_intf.ml

This file was deleted.

163 changes: 163 additions & 0 deletions src/michael_scott_queue/michael_scott_queue_intf.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
(*
* Copyright (c) 2015, Théo Laurent <[email protected]>
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

module type MS_QUEUE = sig
(**
Michael-Scott classic multi-producer multi-consumer queue.

All functions are lockfree. It is the recommended starting point
when needing FIFO structure. It is inspired by {{:
https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf}
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent
Queue Algorithms}.

If you need a [length] function, you can use the bounded queue
{!Saturn.Bounded_queue} instead with maximun capacity (default value).
However, this adds a general overhead to the operation.
*)

(** {1 API} *)

type 'a t
(** Represents a lock-free queue holding elements of type ['a]. *)

val create : unit -> 'a t
(** [create ()] returns a new queue, initially empty. *)

val of_list : 'a list -> 'a t
(** [of_list list] creates a new queue from a list.

🐌 This is a linear-time operation.

{[
# open Saturn.Queue
# let t : int t = of_list [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# pop_opt t
- : int option = Some 2
]}
*)

val is_empty : 'a t -> bool
(** [is_empty q] returns [true] if [q] is empty and [false] otherwise. *)

(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop_exn}, {!peek_exn}, or {!drop_exn} is applied to an empty
queue. *)

val peek_exn : 'a t -> 'a
(** [peek_exn queue] returns the first element of the [queue] without removing it.

@raises Empty if the [queue] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt queue] returns [Some] of the first element of the [queue] without
removing it, or [None] if the [queue] is empty. *)

val pop_exn : 'a t -> 'a
(** [pop_exn queue] removes and returns the first element of the [queue].

@raises Empty if the [queue] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)

val drop_exn : 'a t -> unit
(** [drop_exn queue] removes the top element of the [queue].

@raises Empty if the [queue] is empty. *)

(** {2 Producer functions} *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. *)

(** {1 Examples} *)

(** {2 Sequential example}

An example top-level session:
{[
# open Saturn.Queue
# let t : int t = of_list [1;2;3]
val t : int t = <abstr>
# push t 42
- : unit = ()
# pop_exn t
- : int = 1
# peek_opt t
- : int option = Some 2
# drop_exn t
- : unit = ()
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = Some 42
# pop_exn t
Exception: Saturn__Michael_scott_queue.Empty.]}
*)

(** {2 Parallel example}
Note: The barrier is used in this example solely to make the results more
interesting by increasing the likelihood of parallelism. Spawning a domain is
a costly operation, especially compared to the relatively small amount of work
being performed here. In practice, using a barrier in this manner is unnecessary.


{@ocaml non-deterministic=command[
# open Saturn.Queue
# let t : string t = create ()
val t : string t = <abstr>
# Random.self_init ()
- : unit = ()
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>

# let work id =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do
Domain.cpu_relax ()
done;
for _ = 1 to 4 do
Domain.cpu_relax ();
if Random.bool () then push t id
else
match pop_opt t with
| None -> Format.printf "Domain %s sees an empty queue.\n%!" id
| Some v -> Format.printf "Domain %s pops values pushed by %s.\n%!" id v
done
val work : string -> unit = <fun>

# let domainA = Domain.spawn (fun () -> work "A")
val domainA : unit Domain.t = <abstr>
# let domainB = Domain.spawn (fun () -> work "B")
Domain B pops values pushed by B.
Domain A pops values pushed by A.
Domain B pops values pushed by A.
Domain B pops values pushed by A.
val domainB : unit Domain.t = <abstr>

# Domain.join domainA; Domain.join domainB
- : unit = ()
]}
*)
end
Loading