-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
55 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
(* Copyright (c) 2023, Vesa Karvonen <[email protected]> | ||
(* Copyright (c) 2023-2024, Vesa Karvonen <[email protected]> | ||
Permission to use, copy, modify, and/or distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
|
@@ -12,7 +12,8 @@ | |
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR | ||
PERFORMANCE OF THIS SOFTWARE. *) | ||
|
||
external fenceless_get : 'a Atomic.t -> 'a = "%field0" | ||
open Picos | ||
module Atomic = Multicore_magic.Transparent_atomic | ||
|
||
type ('a, _) node = | ||
| Null : ('a, [> `Null ]) node | ||
|
@@ -42,16 +43,18 @@ let[@inline] set_counter (Node r : (_, [< `Node ]) node) value = | |
r.counter <- value | ||
|
||
let[@inline] get_next node = Atomic.get (next_as_atomic node) | ||
let[@inline] fenceless_get_next node = fenceless_get (next_as_atomic node) | ||
|
||
let[@inline] fenceless_get_next node = | ||
Atomic.fenceless_get (next_as_atomic node) | ||
|
||
let[@inline] compare_and_set_next node before after = | ||
Atomic.compare_and_set (next_as_atomic node) before after | ||
|
||
type 'a t = { | ||
head : ('a, [ `Node ]) node Atomic.t; | ||
head_waiters : (unit -> unit) list Atomic.t; | ||
head_waiters : Trigger.t list Atomic.t; | ||
capacity : int; | ||
tail_waiters : (unit -> unit) list Atomic.t; | ||
tail_waiters : Trigger.t list Atomic.t; | ||
tail : ('a, [ `Node ]) node Atomic.t; | ||
} | ||
|
||
|
@@ -72,54 +75,48 @@ let is_empty t = | |
|
||
let rec snapshot t = | ||
let head = Atomic.get t.head in | ||
let tail = fenceless_get t.tail in | ||
let tail = Atomic.fenceless_get t.tail in | ||
match fenceless_get_next tail with | ||
| Link (Node _ as node) -> | ||
Atomic.compare_and_set t.tail tail node |> ignore; | ||
snapshot t | ||
| Link Null -> | ||
(* The [Sys.opaque_identity] below prevents OCaml 5 from optimizing the | ||
repeated load away. *) | ||
if Atomic.get (Sys.opaque_identity t.head) != head then snapshot t | ||
else (head, tail) | ||
| Link Null -> if Atomic.get t.head != head then snapshot t else (head, tail) | ||
|
||
let length t = | ||
let head, tail = snapshot t in | ||
get_counter tail - get_counter head | ||
|
||
(* *) | ||
|
||
let rec release_all waiters = | ||
let releases = fenceless_get waiters in | ||
if releases != [] then | ||
if Atomic.compare_and_set waiters releases [] then | ||
List.iter (fun release -> release ()) releases | ||
else release_all waiters | ||
let rec signal_all waiters = | ||
let triggers = Atomic.fenceless_get waiters in | ||
if triggers != [] then | ||
if Atomic.compare_and_set waiters triggers [] then | ||
List.iter Trigger.signal triggers | ||
else signal_all waiters | ||
|
||
(* *) | ||
|
||
let rec peek t = | ||
let old_head = Atomic.get t.head in | ||
match fenceless_get_next old_head with | ||
| Link Null -> | ||
let dla = Domain_local_await.prepare_for_await () in | ||
let releases = Atomic.get t.tail_waiters in | ||
if Atomic.compare_and_set t.tail_waiters releases (dla.release :: releases) | ||
let trigger = Trigger.create () in | ||
let triggers = Atomic.get t.tail_waiters in | ||
if Atomic.compare_and_set t.tail_waiters triggers (trigger :: triggers) | ||
then begin | ||
if old_head != Atomic.get t.tail then release_all t.tail_waiters | ||
if old_head != Atomic.get t.tail then signal_all t.tail_waiters | ||
else | ||
try dla.await () | ||
with exn -> | ||
release_all t.tail_waiters; | ||
raise exn | ||
match Trigger.await trigger with | ||
| None -> () | ||
| Some (exn, bt) -> | ||
signal_all t.tail_waiters; | ||
Printexc.raise_with_backtrace exn bt | ||
end; | ||
peek t | ||
| Link (Node r) -> | ||
let value = r.value in | ||
(* The [Sys.opaque_identity] below prevents OCaml 5 from optimizing the | ||
repeated load away. *) | ||
if Atomic.get (Sys.opaque_identity t.head) != old_head then peek t | ||
else value | ||
if Atomic.get t.head != old_head then peek t else value | ||
|
||
let[@inline] peek t = peek t | ||
|
||
|
@@ -131,10 +128,7 @@ let rec peek_opt t = | |
| Link Null -> None | ||
| Link (Node r) -> | ||
let value = r.value in | ||
(* The [Sys.opaque_identity] below prevents OCaml 5 from optimizing the | ||
repeated load away. *) | ||
if Atomic.get (Sys.opaque_identity t.head) != head then peek_opt t | ||
else Some value | ||
if Atomic.get t.head != head then peek_opt t else Some value | ||
|
||
let[@inline] peek_opt t = peek_opt t | ||
|
||
|
@@ -144,23 +138,24 @@ let rec pop backoff t = | |
let old_head = Atomic.get t.head in | ||
match fenceless_get_next old_head with | ||
| Link Null -> | ||
let dla = Domain_local_await.prepare_for_await () in | ||
let releases = Atomic.get t.tail_waiters in | ||
if Atomic.compare_and_set t.tail_waiters releases (dla.release :: releases) | ||
let trigger = Trigger.create () in | ||
let triggers = Atomic.get t.tail_waiters in | ||
if Atomic.compare_and_set t.tail_waiters triggers (trigger :: triggers) | ||
then begin | ||
if old_head != Atomic.get t.tail then release_all t.tail_waiters | ||
if old_head != Atomic.get t.tail then signal_all t.tail_waiters | ||
else | ||
try dla.await () | ||
with exn -> | ||
release_all t.tail_waiters; | ||
raise exn | ||
match Trigger.await trigger with | ||
| None -> () | ||
| Some (exn, bt) -> | ||
signal_all t.tail_waiters; | ||
Printexc.raise_with_backtrace exn bt | ||
end; | ||
pop backoff t | ||
| Link (Node node as new_head) -> | ||
if Atomic.compare_and_set t.head old_head new_head then begin | ||
let value = node.value in | ||
node.value <- Obj.magic (); | ||
release_all t.head_waiters; | ||
signal_all t.head_waiters; | ||
value | ||
end | ||
else pop (Backoff.once backoff) t | ||
|
@@ -177,7 +172,7 @@ let rec pop_opt backoff t = | |
if Atomic.compare_and_set t.head old_head new_head then begin | ||
let value = node.value in | ||
node.value <- Obj.magic (); | ||
release_all t.head_waiters; | ||
signal_all t.head_waiters; | ||
Some value | ||
end | ||
else pop_opt (Backoff.once backoff) t | ||
|
@@ -206,16 +201,18 @@ let rec push t new_node old_tail = | |
push t new_node old_tail | ||
end | ||
else | ||
let dla = Domain_local_await.prepare_for_await () in | ||
let releases = Atomic.get t.head_waiters in | ||
if Atomic.compare_and_set t.head_waiters releases (dla.release :: releases) | ||
then ( | ||
if old_head != Atomic.get t.head then release_all t.head_waiters | ||
let trigger = Trigger.create () in | ||
let triggers = Atomic.get t.head_waiters in | ||
if Atomic.compare_and_set t.head_waiters triggers (trigger :: triggers) | ||
then begin | ||
if old_head != Atomic.get t.head then signal_all t.head_waiters | ||
else | ||
try dla.await () | ||
with exn -> | ||
release_all t.head_waiters; | ||
raise exn); | ||
match Trigger.await trigger with | ||
| None -> () | ||
| Some (exn, bt) -> | ||
signal_all t.head_waiters; | ||
Printexc.raise_with_backtrace exn bt | ||
end; | ||
push t new_node old_tail | ||
end | ||
else begin | ||
|
@@ -226,7 +223,7 @@ let rec push t new_node old_tail = | |
else begin | ||
if not (Atomic.compare_and_set t.tail old_tail new_node) then | ||
fix_tail t.tail new_node; | ||
release_all t.tail_waiters | ||
signal_all t.tail_waiters | ||
end | ||
end | ||
|
||
|
@@ -255,7 +252,7 @@ let rec try_push t new_node old_tail = | |
else begin | ||
if not (Atomic.compare_and_set t.tail old_tail new_node) then | ||
fix_tail t.tail new_node; | ||
release_all t.tail_waiters; | ||
signal_all t.tail_waiters; | ||
true | ||
end | ||
end | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
(* Copyright (c) 2023, Vesa Karvonen <[email protected]> | ||
(* Copyright (c) 2023-2024, Vesa Karvonen <[email protected]> | ||
Permission to use, copy, modify, and/or distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters