Skip to content

Commit

Permalink
Improve documentation, add of_list and drop_exn.
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrm committed Nov 24, 2024
1 parent 2035010 commit 9c075df
Show file tree
Hide file tree
Showing 13 changed files with 447 additions and 148 deletions.
2 changes: 1 addition & 1 deletion bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let () =

(rule
(action
(copy ../src/michael_scott_queue/michael_scott_queue_intf.ml michael_scott_queue_intf.ml))
(copy ../src/michael_scott_queue/michael_scott_queue_intf.mli michael_scott_queue_intf.ml))
(package saturn))

(rule
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let () =
(library
(name saturn)
(public_name saturn)
(modules_without_implementation htbl_intf bounded_queue_intf)
(modules_without_implementation htbl_intf bounded_queue_intf michael_scott_queue_intf)
(libraries backoff multicore-magic |}
^ maybe_threads
^ {| ))
Expand Down
8 changes: 8 additions & 0 deletions src/michael_scott_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(mdx
(package saturn)
(enabled_if
(and
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files michael_scott_queue_intf.mli))
34 changes: 29 additions & 5 deletions src/michael_scott_queue/michael_scott_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,62 @@ let create () =

let is_empty { head; _ } = Atomic.get (Atomic.get head) == Nil

let of_list list =
let tail_ = Atomic.make Nil in
let tail = Atomic.make_contended tail_ in
let rec build_next = function
| [] -> tail_
| hd :: tl -> Atomic.make @@ Next (hd, build_next tl)
in
let head = Atomic.make_contended @@ build_next list in
{ head; tail }

(* *)

exception Empty

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

let rec pop_as :
type a r. a node Atomic.t Atomic.t -> Backoff.t -> (a, r) poly -> r =
fun head backoff poly ->
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next (value, next) ->
if Atomic.compare_and_set head old_head next then begin
match poly with Value -> value | Option -> Some value
match poly with Value -> value | Option -> Some value | Unit -> ()
end
else
let backoff = Backoff.once backoff in
pop_as head backoff poly

let pop_exn t = pop_as t.head Backoff.default Value
let pop_opt t = pop_as t.head Backoff.default Option
let drop_exn t = pop_as t.head Backoff.default Unit

(* *)

let peek_as : type a r. a node Atomic.t Atomic.t -> (a, r) poly -> r =
fun head poly ->
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next (value, _) -> (
match poly with Value -> value | Option -> Some value)
match poly with Value -> value | Option -> Some value | Unit -> ())

let peek_opt t = peek_as t.head Option
let peek_exn t = peek_as t.head Value

(* *)

let rec fix_tail tail new_tail =
let old_tail = Atomic.get tail in
if
Expand Down
61 changes: 0 additions & 61 deletions src/michael_scott_queue/michael_scott_queue_intf.ml

This file was deleted.

160 changes: 160 additions & 0 deletions src/michael_scott_queue/michael_scott_queue_intf.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
(*
* Copyright (c) 2015, Théo Laurent <[email protected]>
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

module type MS_QUEUE = sig
(**
Michael-Scott classic multi-producer multi-consumer queue.
All functions are lockfree. It is the recommended starting point
when needing FIFO structure. It is inspired by {{:
https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf}
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent
Queue Algorithms}.
If you need a [length] function, you can use the bounded queue
{!Saturn.Bounded_queue} instead with maximun capacity (default value).
However, this adds a general overhead to the operation.
*)

(** {1 API} *)

type 'a t
(** Represents a lock-free queue holding elements of type ['a]. *)

val create : unit -> 'a t
(** [create ()] returns a new queue, initially empty. *)

val of_list : 'a list -> 'a t
(** [of_list list] creates a new queue from a list.
{[
# open Saturn.Queue
# let t : int t = of_list [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# pop_opt t
- : int option = Some 2
]}
*)

val is_empty : 'a t -> bool
(** [is_empty q] returns [true] if [q] is empty and [false otherwise]. *)

(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop_exn}, {!peek_exn}, or {!drop_exn} is applied to an empty
stack. *)

val peek_exn : 'a t -> 'a
(** [peek_exn queue] returns the first element of the [queue] without removing it.
@raises Empty if the [queue] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt queue] returns [Some] of the first element of the [queue] without
removing it, or [None] if the [queue] is empty. *)

val pop_exn : 'a t -> 'a
(** [pop_exn queue] removes and returns the first element of the [queue].
@raises Empty if the [queue] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)

val drop_exn : 'a t -> unit
(** [drop_exn queue] removes the top element of the [queue].
@raises Empty if the [queue] is empty. *)

(** {2 Producer functions} *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. *)

(** {1 Examples} *)

(** {2 Sequential example}
An example top-level session:
{[
# open Saturn.Queue
# let t : int t = of_list [1;2;3]
val t : int t = <abstr>
# push t 42
- : unit = ()
# pop_exn t
- : int = 1
# peek_opt t
- : int option = Some 2
# drop_exn t
- : unit = ()
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = Some 42
# pop_exn t
Exception: Saturn__Michael_scott_queue.Empty.]}
*)

(** {2 Parallel example}
**Note** that the use of a barrier is only necessary to make the result of
this example interesting by improving the likelihood of parallelism.
Spawning a domain is a costly operation compared to the work actually run on them
here. In practice, you should not use a barrier.
{@ocaml non-deterministic[
# open Saturn.Queue
# let t : string t = create ()
val t : string t = <abstr>
# Random.self_init ()
- : unit = ()
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let work id =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do
Domain.cpu_relax ()
done;
for _ = 1 to 4 do
Domain.cpu_relax ();
if Random.bool () then push t id
else
match pop_opt t with
| None -> Format.printf "Domain %s sees an empty queue.\n%!" id
| Some v -> Format.printf "Domain %s pops values pushed by %s.\n%!" id v
done
val work : string -> unit = <fun>
# let domainA = Domain.spawn (fun () -> work "A")
val domainA : unit Domain.t = <abstr>
# let domainB = Domain.spawn (fun () -> work "B")
Domain B pops values pushed by B.
Domain A pops values pushed by A.
Domain B pops values pushed by A.
Domain B pops values pushed by A.
val domainB : unit Domain.t = <abstr>
# Domain.join domainA; Domain.join domainB
- : unit = ()
]}
*)
end
44 changes: 31 additions & 13 deletions src/michael_scott_queue/michael_scott_queue_unsafe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,49 +32,67 @@ let create () =

let is_empty t = Atomic.get (Node.as_atomic (Atomic.get t.head)) == Nil

let of_list list =
match list with
| [] -> create ()
| _ ->
let tail, head = Node.node_of_list list in
let head = Node.Next { next = head; value = Obj.magic () } in
let head = Atomic.make head |> Multicore_magic.copy_as_padded in
let tail = Atomic.make tail |> Multicore_magic.copy_as_padded in
{ head; tail } |> Multicore_magic.copy_as_padded

(* *)

exception Empty

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

let rec pop_as :
type a r. (a, [ `Next ]) Node.t Atomic.t -> Backoff.t -> (a, r) poly -> r =
fun head backoff poly ->
let old_head = Atomic.get head in
match Atomic.get (Node.as_atomic old_head) with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next r as new_head ->
if Atomic.compare_and_set head old_head new_head then begin
match poly with
| Value ->
let value = r.value in
r.value <- Obj.magic ();
value
| Option ->
let value = r.value in
r.value <- Obj.magic ();
Some value
let value = r.value in
r.value <- Obj.magic ();
match poly with Value -> value | Option -> Some value | Unit -> ()
end
else
let backoff = Backoff.once backoff in
pop_as head backoff poly

let pop_opt t = pop_as t.head Backoff.default Option
let pop_exn t = pop_as t.head Backoff.default Value
let drop_exn t = pop_as t.head Backoff.default Unit

(* *)

let rec peek_as : type a r. (a, [ `Next ]) Node.t Atomic.t -> (a, r) poly -> r =
fun head poly ->
let old_head = Atomic.get head in
match Atomic.get (Node.as_atomic old_head) with
| Nil -> begin match poly with Value -> raise Empty | Option -> None end
| Nil -> begin
match poly with Value | Unit -> raise Empty | Option -> None
end
| Next r ->
let value = r.value in
if Atomic.get head == old_head then
match poly with Value -> value | Option -> Some value
match poly with Value -> value | Option -> Some value | Unit -> ()
else peek_as head poly

let peek_opt t = peek_as t.head Option
let peek_exn t = peek_as t.head Value

(* *)

let rec fix_tail tail new_tail backoff =
let old_tail = Atomic.get tail in
if
Expand Down
Loading

0 comments on commit 9c075df

Please sign in to comment.