diff --git a/README.md b/README.md index 32680abf9..bf1ba503c 100644 --- a/README.md +++ b/README.md @@ -226,7 +226,7 @@ We can run the previous code with tracing enabled (writing to a new `trace.ctf` ```ocaml # let () = - Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () -> + Eio_unix.Trace.with_tracing "trace.ctf" @@ fun () -> Eio_main.run main;; +x = 1 +y = 1 diff --git a/lib_eio/core/cancel.ml b/lib_eio/core/cancel.ml index 855e7691c..3d477d2a4 100644 --- a/lib_eio/core/cancel.ml +++ b/lib_eio/core/cancel.ml @@ -22,7 +22,7 @@ type t = { domain : Domain.id; (* Prevent access from other domains *) } and fiber_context = { - tid : Ctf.id; + tid : Trace.id; mutable cancel_context : t; mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *) mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *) @@ -194,8 +194,8 @@ module Fiber_context = struct t.cancel_fn <- ignore let make ~cc ~vars = - let tid = Ctf.mint_id () in - Ctf.note_created tid Ctf.Task; + let tid = Trace.mint_id () in + Trace.create tid Fiber; let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers); t diff --git a/lib_eio/core/ctf.mli b/lib_eio/core/ctf.mli deleted file mode 100644 index 8055cd756..000000000 --- a/lib_eio/core/ctf.mli +++ /dev/null @@ -1,110 +0,0 @@ -(** This library is used to write event traces in mirage-profile's CTF format. *) - -type id = private int -(** Each thread/fiber/promise is identified by a unique ID. *) - -(** {2 Recording events} - Libraries and applications can use these functions to make the traces more useful. *) - -val label : string -> unit -(** [label msg] attaches text [msg] to the current thread. *) - -val note_increase : string -> int -> unit -(** [note_increase counter delta] records that [counter] increased by [delta]. - If [delta] is negative, this records a decrease. *) - -val note_counter_value : string -> int -> unit -(** [note_counter_value counter value] records that [counter] is now [value]. *) - -val should_resolve : id -> unit -(** [should_resolve id] records that [id] is expected to resolve, and should be highlighted if it doesn't. *) - -(** {2 Recording system events} - These are normally only called by the scheduler. *) - -type hiatus_reason = - | Wait_for_work - | Suspend - | Hibernate - -type event = - | Wait - | Task - | Bind - | Try - | Choose - | Pick - | Join - | Map - | Condition - | On_success - | On_failure - | On_termination - | On_any - | Ignore_result - | Async - | Promise - | Semaphore - | Switch - | Stream - | Mutex -(** Types of threads or other recorded objects. *) - -val mint_id : unit -> id -(** [mint_id ()] is a fresh unique [id]. *) - -val note_created : ?label:string -> id -> event -> unit -(** [note_created t id ty] records the creation of [id]. *) - -val note_read : ?reader:id -> id -> unit -(** [note_read src] records that promise [src]'s value was read. - @param reader The thread doing the read (default is the current thread). *) - -val note_try_read : id -> unit -(** [note_try_read src] records that the current thread wants to read from [src] (which is not currently ready). *) - -val note_switch : id -> unit -(** [note_switch id] records that [id] is now the current thread. *) - -val note_hiatus : hiatus_reason -> unit -(** [note_hiatus r] records that the system will sleep for reason [r]. *) - -val note_resume : id -> unit -(** [note_resume id] records that the system has resumed (used after {!note_hiatus}), - and is now running [id]. *) - -val note_fork : unit -> id -(** [note_fork ()] records that a new thread has been forked and returns a fresh ID for it. *) - -val note_resolved : id -> ex:exn option -> unit -(** [note_resolved id ~ex] records that [id] is now resolved. - If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *) - -val note_signal : ?src:id -> id -> unit -(** [note_signal ~src dst] records that [dst] was signalled. - @param src The thread sending the signal (default is the current thread). *) - -(** {2 Controlling tracing} *) - -type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t - -module Control : sig - type t - - val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t - (** [make ~timestamper b] is a trace buffer that record events in [b]. - In most cases, the {!Ctf_unix} module provides a simpler interface. *) - - val start : t -> unit - (** [start t] begins recording events in [t]. *) - - val stop : t -> unit - (** [stop t] stops recording to [t] (which must be the current trace buffer). *) -end - -(**/**) - -module BS : sig - val set_int8 : Cstruct.buffer -> int -> int -> unit - val set_int64_le : Cstruct.buffer -> int -> int64 -> unit -end diff --git a/lib_eio/core/debug.ml b/lib_eio/core/debug.ml index a4b11696a..6665479ef 100644 --- a/lib_eio/core/debug.ml +++ b/lib_eio/core/debug.ml @@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt = Format.pp_close_box f (); Format.pp_print_flush f (); let msg = Buffer.contents b in - Ctf.label msg; + Trace.label msg; let lines = String.split_on_char '\n' msg in Mutex.lock traceln_mutex; Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () -> diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index daf97efdc..2c8f20536 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -7,7 +7,7 @@ module Private = struct module Suspend = Suspend module Cells = Cells module Broadcast = Broadcast - module Ctf = Ctf + module Trace = Trace module Fiber_context = Cancel.Fiber_context module Debug = Debug diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index 85f2690a5..0356d66ba 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -571,7 +571,7 @@ end (** @canonical Eio.Private *) module Private : sig - module Ctf = Ctf + module Trace = Trace module Cells = Cells module Broadcast = Broadcast @@ -586,7 +586,7 @@ module Private : sig val destroy : t -> unit (** [destroy t] removes [t] from its cancellation context. *) - val tid : t -> Ctf.id + val tid : t -> Trace.id (** {2 Cancellation} diff --git a/lib_eio/core/fiber.ml b/lib_eio/core/fiber.ml index 617590499..0785e8a12 100644 --- a/lib_eio/core/fiber.ml +++ b/lib_eio/core/fiber.ml @@ -19,10 +19,10 @@ let fork ~sw f = Switch.with_op sw @@ fun () -> match f () with | () -> - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception ex -> Switch.fail sw ex; (* The [with_op] ensures this will succeed *) - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) + Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_daemon ~sw f = @@ -35,13 +35,13 @@ let fork_daemon ~sw f = match f () with | `Stop_daemon -> (* The daemon asked to stop. *) - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) -> (* The daemon was cancelled because all non-daemon fibers are finished. *) - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception ex -> Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *) - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) + Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_promise ~sw f = diff --git a/lib_eio/core/promise.ml b/lib_eio/core/promise.ml index 3b2fdcaa4..ca15c74b0 100644 --- a/lib_eio/core/promise.ml +++ b/lib_eio/core/promise.ml @@ -3,7 +3,7 @@ type 'a state = | Unresolved of Broadcast.t type !'a promise = { - id : Ctf.id; + id : Trace.id; state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *) } @@ -25,20 +25,20 @@ let create_with_id id = to_public_promise t, to_public_resolver t let create ?label () = - let id = Ctf.mint_id () in - Ctf.note_created ?label id Ctf.Promise; + let id = Trace.mint_id () in + Trace.create ?label id Promise; create_with_id id let create_resolved x = - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Promise; + let id = Trace.mint_id () in + Trace.create id Promise; to_public_promise { id; state = Atomic.make (Resolved x) } let await t = let t = of_public_promise t in match Atomic.get t.state with | Resolved x -> - Ctf.note_read t.id; + Trace.read t.id; x | Unresolved b -> Suspend.enter (fun ctx enqueue -> @@ -53,7 +53,7 @@ let await t = | Unresolved _ -> (* We observed the promise to be still unresolved after registering a waiter. Therefore any resolution must happen after we were registered and we will be notified. *) - Ctf.note_try_read t.id; + Trace.try_read t.id; Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> if Broadcast.cancel request then enqueue (Error ex) (* else already resumed *) @@ -61,7 +61,7 @@ let await t = ); match Atomic.get t.state with | Resolved x -> - Ctf.note_read t.id; + Trace.read t.id; x | Unresolved _ -> assert false @@ -76,7 +76,7 @@ let resolve t v = | Resolved _ -> invalid_arg "Can't resolve already-resolved promise" | Unresolved b as prev -> if Atomic.compare_and_set t.state prev (Resolved v) then ( - Ctf.note_resolved t.id ~ex:None; + Trace.resolve t.id ~ex:None; Broadcast.resume_all b ) else ( (* Otherwise, the promise was already resolved. Retry (to get the error). *) diff --git a/lib_eio/core/single_waiter.ml b/lib_eio/core/single_waiter.ml index 0c6c29283..aa27d7b37 100644 --- a/lib_eio/core/single_waiter.ml +++ b/lib_eio/core/single_waiter.ml @@ -19,6 +19,6 @@ let await t id = t.wake <- (fun x -> Cancel.Fiber_context.clear_cancel_fn ctx; t.wake <- ignore; - Ctf.note_read ~reader:id ctx.tid; + Trace.read ~reader:id ctx.tid; enqueue x ) diff --git a/lib_eio/core/switch.ml b/lib_eio/core/switch.ml index dd6d06d23..891f7db7f 100644 --- a/lib_eio/core/switch.ml +++ b/lib_eio/core/switch.ml @@ -1,5 +1,5 @@ type t = { - id : Ctf.id; + id : Trace.id; mutable fibers : int; (* Total, including daemon_fibers and the main function *) mutable daemon_fibers : int; mutable exs : (exn * Printexc.raw_backtrace) option; @@ -51,7 +51,7 @@ let combine_exn ex = function let fail ?(bt=Printexc.get_raw_backtrace ()) t ex = check_our_domain t; if t.exs = None then - Ctf.note_resolved t.id ~ex:(Some ex); + Trace.resolve t.id ~ex:(Some ex); t.exs <- Some (combine_exn (ex, bt) t.exs); try Cancel.cancel t.cancel ex @@ -91,7 +91,7 @@ let or_raise = function let rec await_idle t = (* Wait for fibers to finish: *) while t.fibers > 0 do - Ctf.note_try_read t.id; + Trace.try_read t.id; Single_waiter.await t.waiter t.id done; (* Call on_release handlers: *) @@ -118,8 +118,8 @@ let maybe_raise_exs t = | Some (ex, bt) -> Printexc.raise_with_backtrace ex bt let create cancel = - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Switch; + let id = Trace.mint_id () in + Trace.create id Switch; { id; fibers = 1; (* The main function counts as a fiber *) @@ -135,7 +135,7 @@ let run_internal t fn = | v -> dec_fibers t; await_idle t; - Ctf.note_read t.id; + Trace.read t.id; maybe_raise_exs t; (* Check for failure while finishing *) (* Success. *) v @@ -146,7 +146,7 @@ let run_internal t fn = dec_fibers t; fail ~bt t ex; await_idle t; - Ctf.note_read t.id; + Trace.read t.id; maybe_raise_exs t; assert false diff --git a/lib_eio/core/ctf.ml b/lib_eio/core/trace.ml similarity index 85% rename from lib_eio/core/ctf.ml rename to lib_eio/core/trace.ml index e6f0d6b28..d4987beb4 100644 --- a/lib_eio/core/ctf.ml +++ b/lib_eio/core/trace.ml @@ -47,27 +47,8 @@ let mint_id () = Domain.DLS.set next_id_key next_id_local_succ; next_id_local -type hiatus_reason = - | Wait_for_work - | Suspend - | Hibernate - -type event = - | Wait - | Task - | Bind - | Try - | Choose - | Pick - | Join - | Map - | Condition - | On_success - | On_failure - | On_termination - | On_any - | Ignore_result - | Async +type ty = + | Fiber | Promise | Semaphore | Switch @@ -80,21 +61,7 @@ let current_thread = ref (-1) let int_of_thread_type t = match t with - | Wait -> 0 - | Task -> 1 - | Bind -> 2 - | Try -> 3 - | Choose -> 4 - | Pick -> 5 - | Join -> 6 - | Map -> 7 - | Condition -> 8 - | On_success -> 9 - | On_failure -> 10 - | On_termination -> 11 - | On_any -> 12 - | Ignore_result -> 13 - | Async -> 14 + | Fiber -> 1 | Promise -> 15 | Semaphore -> 16 | Switch -> 17 @@ -207,12 +174,12 @@ module Control = struct let op_fails = 3 (* let op_becomes = 4 *) let op_label = 5 - let op_increase = 6 + (* let op_increase = 6 *) let op_switch = 7 (* let op_gc = 8 *) (* let op_old_signal = 9 *) let op_try_read = 10 - let op_counter_value = 11 + (* let op_counter_value = 11 *) let op_read_later = 12 let op_signal = 13 @@ -329,20 +296,6 @@ module Control = struct |> write_string log.log msg |> end_event - let note_increase log counter amount = - add_event log op_increase (17 + String.length counter) - |> write_tid log.log !current_thread - |> write64 log.log (Int64.of_int amount) - |> write_string log.log counter - |> end_event - - let note_counter_value log counter value = - add_event log op_counter_value (17 + String.length counter) - |> write_tid log.log !current_thread - |> write64 log.log (Int64.of_int value) - |> write_string log.log counter - |> end_event - let note_switch log new_current = if new_current <> !current_thread then ( current_thread := new_current; @@ -397,42 +350,34 @@ let label name = | None -> () | Some log -> Control.note_label log !current_thread name -let note_fork () = - let child = mint_id () in - begin match !Control.event_log with - | None -> () - | Some log -> Control.note_created log child Task - end; - child - -let note_created ?label id ty = +let create ?label id ty = match !Control.event_log with | None -> () | Some log -> Control.note_created log id ty; Option.iter (Control.note_label log id) label -let note_switch new_current = +let fiber new_current = match !Control.event_log with | None -> () | Some log -> Control.note_switch log new_current -let note_hiatus _reason = +let hiatus () = match !Control.event_log with | None -> () | Some log -> Control.note_suspend log () -let note_resume new_current = +let resume new_current = match !Control.event_log with | None -> () | Some log -> Control.note_switch log new_current -let note_try_read input = +let try_read input = match !Control.event_log with | None -> () | Some log -> Control.note_try_read log !current_thread input -let note_read ?reader input = +let read ?reader input = match !Control.event_log with | None -> () | Some log -> @@ -443,12 +388,12 @@ let note_read ?reader input = in Control.note_read log ~reader input -let note_resolved id ~ex = +let resolve id ~ex = match !Control.event_log with | None -> () | Some log -> Control.note_resolved log id ~ex -let note_signal ?src dst = +let signal ?src dst = match !Control.event_log with | None -> () | Some log -> @@ -458,18 +403,3 @@ let note_signal ?src dst = | Some x -> x in Control.note_signal ~src log dst - -let note_increase counter amount = - match !Control.event_log with - | None -> () - | Some log -> Control.note_increase log counter amount - -let note_counter_value counter value = - match !Control.event_log with - | None -> () - | Some log -> Control.note_counter_value log counter value - -let should_resolve thread = - match !Control.event_log with - | None -> () - | Some log -> Control.note_label log thread "__should_resolve" (* Hack! *) diff --git a/lib_eio/core/trace.mli b/lib_eio/core/trace.mli new file mode 100644 index 000000000..6e50b0cf1 --- /dev/null +++ b/lib_eio/core/trace.mli @@ -0,0 +1,78 @@ +(** This library is used to write event traces in mirage-profile's CTF format. *) + +type id = private int +(** Each thread/fiber/promise is identified by a unique ID. *) + +(** {2 Recording events} + Libraries and applications can use these functions to make the traces more useful. *) + +val label : string -> unit +(** [label msg] attaches text [msg] to the current thread. *) + +(** {2 Recording system events} + These are normally only called by the scheduler. *) + +type ty = + | Fiber + | Promise + | Semaphore + | Switch + | Stream + | Mutex +(** Types of recorded objects. *) + +val mint_id : unit -> id +(** [mint_id ()] is a fresh unique [id]. *) + +val create : ?label:string -> id -> ty -> unit +(** [create t id ty] records the creation of [id]. *) + +val read : ?reader:id -> id -> unit +(** [read src] records that promise [src]'s value was read. + @param reader The thread doing the read (default is the current thread). *) + +val try_read : id -> unit +(** [try_read src] records that the current thread wants to read from [src] (which is not currently ready). *) + +val fiber : id -> unit +(** [fiber id] records that fiber [id] is now running. *) + +val hiatus : unit -> unit +(** [hiatus ()] records that the system will sleep for reason [r]. *) + +val resume : id -> unit +(** [resume id] records that the system has resumed (used after {!hiatus}), + and is now running [id]. *) + +val resolve : id -> ex:exn option -> unit +(** [resolve id ~ex] records that [id] is now resolved. + If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *) + +val signal : ?src:id -> id -> unit +(** [signal ~src dst] records that [dst] was signalled. + @param src The thread sending the signal (default is the current thread). *) + +(** {2 Controlling tracing} *) + +type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + +module Control : sig + type t + + val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t + (** [make ~timestamper b] is a trace buffer that record events in [b]. + In most cases, the {!Eio_unix.Trace} module provides a simpler interface. *) + + val start : t -> unit + (** [start t] begins recording events in [t]. *) + + val stop : t -> unit + (** [stop t] stops recording to [t] (which must be the current trace buffer). *) +end + +(**/**) + +module BS : sig + val set_int8 : Cstruct.buffer -> int -> int -> unit + val set_int64_le : Cstruct.buffer -> int -> int64 -> unit +end diff --git a/lib_eio/eio_mutex.ml b/lib_eio/eio_mutex.ml index 699ddf36b..18f7a5de5 100644 --- a/lib_eio/eio_mutex.ml +++ b/lib_eio/eio_mutex.ml @@ -6,7 +6,7 @@ type state = exception Poisoned of exn type t = { - id : Ctf.id; + id : Trace.id; mutex : Mutex.t; mutable state : state; (* Owned by [t.mutex] *) waiters : [`Take | `Error of exn] Waiters.t; (* Owned by [t.mutex] *) @@ -19,8 +19,8 @@ type t = { (* {R} t = create () {mutex t R} *) let create () = - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Mutex; + let id = Trace.mint_id () in + Trace.create id Mutex; { id; mutex = Mutex.create (); @@ -33,7 +33,7 @@ let create () = let unlock t = Mutex.lock t.mutex; (* We now have ownership of [t.state] and [t.waiters]. *) - Ctf.note_signal t.id; + Trace.signal t.id; match t.state with | Unlocked -> Mutex.unlock t.mutex; @@ -55,7 +55,7 @@ let lock t = Mutex.lock t.mutex; match t.state with | Locked -> - Ctf.note_try_read t.id; + Trace.try_read t.id; begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with | `Error ex -> raise ex (* Poisoned; stop waiting *) | `Take -> @@ -64,7 +64,7 @@ let lock t = () end | Unlocked -> - Ctf.note_read t.id; + Trace.read t.id; t.state <- Locked; (* We transfer R from the state to our caller. *) (* {locked t * R} *) Mutex.unlock t.mutex @@ -77,11 +77,11 @@ let try_lock t = Mutex.lock t.mutex; match t.state with | Locked -> - Ctf.note_try_read t.id; + Trace.try_read t.id; Mutex.unlock t.mutex; false | Unlocked -> - Ctf.note_read t.id; + Trace.read t.id; t.state <- Locked; (* We transfer R from the state to our caller. *) Mutex.unlock t.mutex; (* {locked t * R} *) diff --git a/lib_eio/semaphore.ml b/lib_eio/semaphore.ml index 2733be594..8846ec375 100644 --- a/lib_eio/semaphore.ml +++ b/lib_eio/semaphore.ml @@ -1,18 +1,18 @@ type t = { - id : Ctf.id; + id : Trace.id; state : Sem_state.t; } let make n = - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Semaphore; + let id = Trace.mint_id () in + Trace.create id Semaphore; { id; state = Sem_state.create n; } let release t = - Ctf.note_signal t.id; + Trace.signal t.id; Sem_state.release t.state let acquire t = @@ -24,7 +24,7 @@ let acquire t = match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with | None -> () (* Already resumed *) | Some request -> - Ctf.note_try_read t.id; + Trace.try_read t.id; match Fiber_context.get_error ctx with | Some ex -> if Sem_state.cancel request then enqueue (Error ex); @@ -36,7 +36,7 @@ let acquire t = ) ) ); - Ctf.note_read t.id + Trace.read t.id let get_value t = max 0 (Atomic.get t.state.state) diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index 974cfa3b7..eba7514b4 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -2,7 +2,7 @@ module Locking = struct type 'a t = { mutex : Mutex.t; - id : Ctf.id; + id : Trace.id; capacity : int; (* [capacity > 0] *) items : 'a Queue.t; @@ -29,8 +29,8 @@ module Locking = struct let create capacity = assert (capacity > 0); - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Stream; + let id = Trace.mint_id () in + Trace.create id Stream; { mutex = Mutex.create (); id; diff --git a/lib_eio/tests/ctf.md b/lib_eio/tests/trace.md similarity index 67% rename from lib_eio/tests/ctf.md rename to lib_eio/tests/trace.md index 652b019f2..16f18f72b 100644 --- a/lib_eio/tests/ctf.md +++ b/lib_eio/tests/trace.md @@ -4,7 +4,7 @@ ```ocaml # #require "eio";; # for _ = 1 to 5 do - Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int) + Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int) done;; 1 2 @@ -20,7 +20,7 @@ A new domain gets a new chunk: # Domain.join @@ Domain.spawn (fun () -> for _ = 1 to 5 do - Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int) + Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int) done);; 1024 1025 @@ -34,12 +34,12 @@ When the original domain exhausts its chunk, it jumps to the next free chunk: ```ocaml # for _ = 1 to 1024 - 9 do - Eio.Private.Ctf.mint_id () |> ignore + Eio.Private.Trace.mint_id () |> ignore done;; - : unit = () # for _ = 1 to 5 do - Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int) + Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int) done;; 1021 1022 diff --git a/lib_eio/unix/ctf_unix.ml b/lib_eio/unix/ctf_unix.ml index 9bba82c12..214f9bc80 100644 --- a/lib_eio/unix/ctf_unix.ml +++ b/lib_eio/unix/ctf_unix.ml @@ -1,10 +1,10 @@ open Bigarray -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace let timestamper log_buffer ofs = let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in - Ctf.BS.set_int64_le log_buffer ofs ns + Trace.BS.set_int64_le log_buffer ofs ns let mmap_buffer ~size path = let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in @@ -16,6 +16,6 @@ let mmap_buffer ~size path = let with_tracing ?(size=0x100000) path fn = let buffer = mmap_buffer ~size path in - let trace_config = Ctf.Control.make ~timestamper buffer in - Ctf.Control.start trace_config; - Fun.protect fn ~finally:(fun () -> Ctf.Control.stop trace_config) + let trace_config = Trace.Control.make ~timestamper buffer in + Trace.Control.start trace_config; + Fun.protect fn ~finally:(fun () -> Trace.Control.stop trace_config) diff --git a/lib_eio/unix/ctf_unix.mli b/lib_eio/unix/ctf_unix.mli index 7d15fe0e6..7b0a7e2a1 100644 --- a/lib_eio/unix/ctf_unix.mli +++ b/lib_eio/unix/ctf_unix.mli @@ -1,9 +1,9 @@ -val timestamper : Eio.Private.Ctf.log_buffer -> int -> unit +val timestamper : Eio.Private.Trace.log_buffer -> int -> unit (** Uses [Mtime_clock] to write timestamps. *) -val mmap_buffer : size:int -> string -> Eio.Private.Ctf.log_buffer +val mmap_buffer : size:int -> string -> Eio.Private.Trace.log_buffer (** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *) val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a (** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer, - calls {!Ctf.Control.start} to start recording, runs [fn], and then stops recording. *) + calls {!Trace.Control.start} to start recording, runs [fn], and then stops recording. *) diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 9e8c978cf..41d2c59fa 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -26,7 +26,7 @@ let run_in_systhread = Private.run_in_systhread module Ipaddr = Net.Ipaddr -module Ctf = Ctf_unix +module Trace = Ctf_unix module Process = Process module Net = Net diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index f5ec0e1aa..5da4c3272 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -97,6 +97,6 @@ module Private : sig module Fork_action = Fork_action end -module Ctf = Ctf_unix +module Trace = Ctf_unix module Pi = Pi diff --git a/lib_eio/utils/suspended.ml b/lib_eio/utils/suspended.ml index 4bec1f9cc..e12a3f131 100644 --- a/lib_eio/utils/suspended.ml +++ b/lib_eio/utils/suspended.ml @@ -1,7 +1,7 @@ (** A suspended fiber with its context. *) open Effect.Deep -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace type 'a t = { fiber : Eio.Private.Fiber_context.t; @@ -11,9 +11,9 @@ type 'a t = { let tid t = Eio.Private.Fiber_context.tid t.fiber let continue t v = - Ctf.note_switch (tid t); + Trace.fiber (tid t); continue t.k v let discontinue t ex = - Ctf.note_switch (tid t); + Trace.fiber (tid t); discontinue t.k ex diff --git a/lib_eio/waiters.ml b/lib_eio/waiters.ml index c0cbd4624..9bd4e0208 100644 --- a/lib_eio/waiters.ml +++ b/lib_eio/waiters.ml @@ -47,7 +47,7 @@ let await_internal ~mutex (t:'a t) id ctx enqueue = let resolved_waiter = ref Hook.null in let finished = Atomic.make false in let enqueue x = - Ctf.note_read ~reader:id (Fiber_context.tid ctx); + Trace.read ~reader:id (Fiber_context.tid ctx); enqueue x in let cancel ex = diff --git a/lib_eio/waiters.mli b/lib_eio/waiters.mli index 724cf96e7..f3c39fe0c 100644 --- a/lib_eio/waiters.mli +++ b/lib_eio/waiters.mli @@ -21,7 +21,7 @@ val is_empty : 'a t -> bool val await : mutex:Mutex.t option -> - 'a t -> Ctf.id -> 'a + 'a t -> Trace.id -> 'a (** [await ~mutex t id] suspends the current fiber and adds its continuation to [t]. When the waiter is woken, the fiber is resumed and returns the result. If [t] can be used from multiple domains: @@ -32,7 +32,7 @@ val await : val await_internal : mutex:Mutex.t option -> - 'a t -> Ctf.id -> Fiber_context.t -> + 'a t -> Trace.id -> Fiber_context.t -> (('a, exn) result -> unit) -> unit (** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber. This also allows wrapping the [enqueue] function. diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 93abbf218..41c994f90 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -20,7 +20,7 @@ open Eio.Std module Fiber_context = Eio.Private.Fiber_context -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace module Fd = Eio_unix.Fd module Suspended = Eio_utils.Suspended diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index c1ed4f873..d3a194d24 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -2,7 +2,7 @@ open Eio.Std -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace module Fd = Eio_unix.Fd type dir_fd = @@ -20,12 +20,12 @@ let file_offset t = function let enqueue_read st action (file_offset,fd,buf,len) = let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in - Ctf.label "read"; + Trace.label "read"; Sched.submit_rw_req st req let rec enqueue_writev args st action = let (file_offset,fd,bufs) = args in - Ctf.label "writev"; + Trace.label "writev"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.writev st.uring ~file_offset fd bufs (Job action) ) @@ -35,11 +35,11 @@ let rec enqueue_writev args st action = let enqueue_write st action (file_offset,fd,buf,len) = let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in - Ctf.label "write"; + Trace.label "write"; Sched.submit_rw_req st req let rec enqueue_splice ~src ~dst ~len st action = - Ctf.label "splice"; + Trace.label "splice"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.splice st.uring (Job action) ~src ~dst ~len ) @@ -48,7 +48,7 @@ let rec enqueue_splice ~src ~dst ~len st action = Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action = - Ctf.label "openat2"; + Trace.label "openat2"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action) ) @@ -57,7 +57,7 @@ let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st ac Queue.push (fun st -> enqueue_openat2 args st action) st.io_q let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action = - Ctf.label "statx"; + Trace.label "statx"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.statx st.uring ?fd ~mask path buf flags (Job action) ) @@ -66,7 +66,7 @@ let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action = Queue.push (fun st -> enqueue_statx args st action) st.io_q let rec enqueue_unlink ((dir, fd, path) as args) st action = - Ctf.label "unlinkat"; + Trace.label "unlinkat"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.unlink st.uring ~dir ~fd path (Job action) ) @@ -75,7 +75,7 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action = Queue.push (fun st -> enqueue_unlink args st action) st.io_q let rec enqueue_connect fd addr st action = - Ctf.label "connect"; + Trace.label "connect"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.connect st.uring fd addr (Job action) ) @@ -84,7 +84,7 @@ let rec enqueue_connect fd addr st action = Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q let rec enqueue_send_msg fd ~fds ~dst buf st action = - Ctf.label "send_msg"; + Trace.label "send_msg"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.send_msg st.uring fd ~fds ?dst buf (Job action) ) @@ -93,7 +93,7 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action = Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q let rec enqueue_recv_msg fd msghdr st action = - Ctf.label "recv_msg"; + Trace.label "recv_msg"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.recv_msg st.uring fd msghdr (Job action); ) @@ -102,7 +102,7 @@ let rec enqueue_recv_msg fd msghdr st action = Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q let rec enqueue_accept fd client_addr st action = - Ctf.label "accept"; + Trace.label "accept"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.accept st.uring fd client_addr (Job action) ) in @@ -112,7 +112,7 @@ let rec enqueue_accept fd client_addr st action = ) let rec enqueue_noop t action = - Ctf.label "noop"; + Trace.label "noop"; let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in if job = None then ( (* wait until an sqe is available *) @@ -147,7 +147,7 @@ let read_upto ?file_offset fd buf len = let rec enqueue_readv args st action = let (file_offset,fd,bufs) = args in - Ctf.label "readv"; + Trace.label "readv"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.readv st.uring ~file_offset fd bufs (Job action)) in @@ -465,7 +465,7 @@ let shutdown socket command = | Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg let accept ~sw fd = - Ctf.label "accept"; + Trace.label "accept"; Fd.use_exn "accept" fd @@ fun fd -> let client_addr = Uring.Sockaddr.create () in let res = Sched.enter (enqueue_accept fd client_addr) in diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index af6c58e4d..27b90ba26 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -3,13 +3,13 @@ open Eio.Std module Fiber_context = Eio.Private.Fiber_context -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace module Suspended = Eio_utils.Suspended module Zzz = Eio_utils.Zzz module Lf_queue = Eio_utils.Lf_queue -let system_thread = Ctf.mint_id () +let system_thread = Trace.mint_id () let statx_works = ref false (* Before Linux 5.18, statx is unreliable *) @@ -119,7 +119,7 @@ let rec enqueue_job t fn = (* Cancellations always come from the same domain, so no need to send wake events here. *) let rec enqueue_cancel job t = - Ctf.label "cancel"; + Trace.label "cancel"; match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q | Some _ -> () @@ -162,7 +162,7 @@ let submit_pending_io st = match Queue.take_opt st.io_q with | None -> () | Some fn -> - Ctf.label "submit_pending_io"; + Trace.label "submit_pending_io"; fn st let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) = @@ -183,7 +183,7 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as re ) in if retry then ( - Ctf.label "await-sqe"; + Trace.label "await-sqe"; (* wait until an sqe is available *) Queue.push (fun st -> submit_rw_req st req) io_q ) @@ -244,9 +244,9 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Ctf.(note_hiatus Wait_for_work); + Trace.hiatus (); let result = Uring.wait ?timeout uring in - Ctf.note_resume system_thread; + Trace.resume system_thread; Atomic.set st.need_wakeup false; Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) match result with @@ -338,7 +338,7 @@ let free_buf st buf = | Some k -> enqueue_thread st k buf let rec enqueue_poll_add fd poll_mask st action = - Ctf.label "poll_add"; + Trace.label "poll_add"; let retry = with_cancel_hook ~action st (fun () -> Uring.poll_add st.uring fd poll_mask (Job action) ) @@ -347,7 +347,7 @@ let rec enqueue_poll_add fd poll_mask st action = Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q let rec enqueue_poll_add_unix fd poll_mask st action cb = - Ctf.label "poll_add"; + Trace.label "poll_add"; let retry = with_cancel_hook ~action st (fun () -> Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb)) ) @@ -357,7 +357,7 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb = let rec enqueue_readv args st action = let (file_offset,fd,bufs) = args in - Ctf.label "readv"; + Trace.label "readv"; let retry = with_cancel_hook ~action st (fun () -> Uring.readv st.uring ~file_offset fd bufs (Job action)) in @@ -388,7 +388,7 @@ let monitor_event_fd t = let run ~extra_effects st main arg = let rec fork ~new_fiber:fiber fn = let open Effect.Deep in - Ctf.note_switch (Fiber_context.tid fiber); + Trace.fiber (Fiber_context.tid fiber); match_with fn () { retc = (fun () -> Fiber_context.destroy fiber; schedule st); exnc = (fun ex -> diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 4e85f1b99..d0b22cca4 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -1,6 +1,6 @@ open Eio.Std -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace let () = Logs.(set_level ~all:true (Some Debug)); @@ -78,8 +78,8 @@ let test_direct_copy () = let buffer = Buffer.create 20 in let to_output = Eio.Flow.buffer_sink buffer in Switch.run (fun sw -> - Fiber.fork ~sw (fun () -> Ctf.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2); - Fiber.fork ~sw (fun () -> Ctf.label "copy2"; Eio.Flow.copy from_pipe2 to_output); + Fiber.fork ~sw (fun () -> Trace.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2); + Fiber.fork ~sw (fun () -> Trace.label "copy2"; Eio.Flow.copy from_pipe2 to_output); Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1; Eio.Flow.close to_pipe1; ); diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 7ff5ee85e..0fadc8308 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -18,13 +18,13 @@ module Suspended = Eio_utils.Suspended module Zzz = Eio_utils.Zzz module Lf_queue = Eio_utils.Lf_queue module Fiber_context = Eio.Private.Fiber_context -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace module Rcfd = Eio_unix.Private.Rcfd module Poll = Iomux.Poll type exit = [`Exit_scheduler] -let system_thread = Ctf.mint_id () +let system_thread = Trace.mint_id () (* The type of items in the run queue. *) type runnable = @@ -209,12 +209,12 @@ let rec next t : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Ctf.(note_hiatus Wait_for_work); + Trace.hiatus (); let nready = try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout with Unix.Unix_error (Unix.EINTR, _, "") -> 0 in - Ctf.note_resume system_thread; + Trace.fiber system_thread; Atomic.set t.need_wakeup false; Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) Poll.iter_ready t.poll nready (ready t); @@ -325,7 +325,7 @@ let enter fn = Effect.perform (Enter fn) let run ~extra_effects t main x = let rec fork ~new_fiber:fiber fn = let open Effect.Deep in - Ctf.note_switch (Fiber_context.tid fiber); + Trace.fiber (Fiber_context.tid fiber); match_with fn () { retc = (fun () -> Fiber_context.destroy fiber; next t); exnc = (fun ex -> diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index 0cc587c9e..eb28a0884 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -18,12 +18,12 @@ module Suspended = Eio_utils.Suspended module Zzz = Eio_utils.Zzz module Lf_queue = Eio_utils.Lf_queue module Fiber_context = Eio.Private.Fiber_context -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace module Rcfd = Eio_unix.Private.Rcfd type exit = [`Exit_scheduler] -let system_thread = Ctf.mint_id () +let system_thread = Trace.mint_id () (* The type of items in the run queue. *) type runnable = @@ -204,14 +204,14 @@ let rec next t : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Ctf.(note_hiatus Wait_for_work); + Trace.hiatus (); let cons fd acc = fd :: acc in let read = FdSet.fold cons t.poll.to_read [] in let write = FdSet.fold cons t.poll.to_write [] in match Unix.select read write [] timeout with | exception Unix.(Unix_error (EINTR, _, _)) -> next t | readable, writeable, _ -> - Ctf.note_resume system_thread; + Trace.resume system_thread; Atomic.set t.need_wakeup false; Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) List.iter (ready t [ `W ]) writeable; @@ -317,7 +317,7 @@ let enter fn = Effect.perform (Enter fn) let run ~extra_effects t main x = let rec fork ~new_fiber:fiber fn = let open Effect.Deep in - Ctf.note_switch (Fiber_context.tid fiber); + Trace.fiber (Fiber_context.tid fiber); match_with fn () { retc = (fun () -> Fiber_context.destroy fiber; next t); exnc = (fun ex -> diff --git a/tests/nounix/nounix.ml b/tests/nounix/nounix.ml index 4c0c79dc0..3f855d236 100644 --- a/tests/nounix/nounix.ml +++ b/tests/nounix/nounix.ml @@ -1,9 +1,9 @@ (* This module also checks that Eio doesn't pull in a dependency on Unix. See the [dune] file. *) -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace let () = let bs = Cstruct.create 8 in - Ctf.BS.set_int64_le bs.buffer 0 1234L; + Trace.BS.set_int64_le bs.buffer 0 1234L; assert (Cstruct.LE.get_uint64 bs 0 = 1234L) diff --git a/tests/sync.md b/tests/sync.md index 2b8df4ba7..f39d5af31 100644 --- a/tests/sync.md +++ b/tests/sync.md @@ -7,7 +7,7 @@ ```ocaml open Eio.Std -module Ctf = Eio.Private.Ctf +module Trace = Eio.Private.Trace let pp_promise pp f x = match Promise.peek x with @@ -96,10 +96,10 @@ Basic semaphore tests: let running = ref 0 in let sem = Semaphore.make 2 in let fork = Fiber.fork_promise ~sw in - let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in - let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in - let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in - let d = fork (fun () -> Ctf.label "d"; Semaphore.acquire sem; incr running) in + let a = fork (fun () -> Trace.label "a"; Semaphore.acquire sem; incr running) in + let b = fork (fun () -> Trace.label "b"; Semaphore.acquire sem; incr running) in + let c = fork (fun () -> Trace.label "c"; Semaphore.acquire sem; incr running) in + let d = fork (fun () -> Trace.label "d"; Semaphore.acquire sem; incr running) in traceln "Semaphore means that only %d threads are running" !running; Promise.await_exn a; Promise.await_exn b; @@ -132,8 +132,8 @@ Releasing a semaphore when no-one is waiting for it: let sem = Semaphore.make 0 in Semaphore.release sem; (* Release with free-counter *) traceln "Initial config: %d" (Semaphore.get_value sem); - Fiber.fork ~sw (fun () -> Ctf.label "a"; Semaphore.acquire sem); - Fiber.fork ~sw (fun () -> Ctf.label "b"; Semaphore.acquire sem); + Fiber.fork ~sw (fun () -> Trace.label "a"; Semaphore.acquire sem); + Fiber.fork ~sw (fun () -> Trace.label "b"; Semaphore.acquire sem); traceln "A running: %d" (Semaphore.get_value sem); Semaphore.release sem; (* Release with a non-empty wait-queue *) traceln "Now b running: %d" (Semaphore.get_value sem);