Skip to content

Commit

Permalink
Eio.Executor_pool
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin committed Nov 6, 2023
1 parent bc1e231 commit e6e2550
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Condition = Condition
module Stream = Stream
module Lazy = Lazy
module Pool = Pool
module Executor_pool = Executor_pool
module Exn = Exn
module Resource = Resource
module Buf_read = Buf_read
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ module Pool = Pool
(** Cancelling fibers. *)
module Cancel = Eio__core.Cancel

(** A high-level domain task pool *)
module Executor_pool = Executor_pool

(** Commonly used standard features. This module is intended to be [open]ed. *)
module Std = Std

Expand Down
73 changes: 73 additions & 0 deletions lib_eio/executor_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
type job = Pack : (unit -> 'a) * ('a, exn) Result.t Promise.u -> job

(* Worker: 1 domain/thread
m jobs per worker, n domains per executor_pool *)

type t = {
stream: job Stream.t;
}

(* This function is the core of executor_pool.ml.
Each worker recursively calls [loop ()] until the [terminating]
promise is resolved. Workers pull one job at a time from the Stream. *)
let start_worker ~limit stream =
Switch.run @@ fun sw ->
let capacity = Semaphore.make limit in
let run_job job w =
Fiber.fork ~sw (fun () ->
Promise.resolve w
(try Ok (job ()) with
| exn -> Error exn);
Semaphore.release capacity )
in
(* The main worker loop. *)
let rec loop () =
Semaphore.acquire capacity;
let actions = Stream.take stream in
match actions with
| Pack (job, w) ->
(* Give a chance to other domains to start waiting on the Stream before the current thread blocks on [Stream.take] again. *)
Fiber.yield ();
run_job job w;
(loop [@tailcall]) ()
in
loop ()

(* Start a new domain. The worker will need a switch, then we start the worker. *)
let start_domain ~sw ~domain_mgr ~limit stream =
let go () =
Domain_manager.run domain_mgr (fun () -> start_worker ~limit stream )
in
(* Executor_pools run as daemons to not hold the user's switch from completing.
It's up to the user to hold the switch open (and thus, the executor_pool)
by blocking on the jobs issued to the executor_pool. *)
Fiber.fork_daemon ~sw (fun () ->
let _ = go () in
`Stop_daemon )

let create ~sw ~domain_count ~domain_concurrency domain_mgr =
let stream = Stream.create 0 in
let instance = { stream; } in
for _ = 1 to domain_count do
start_domain ~sw ~domain_mgr ~limit:domain_concurrency stream
done;
instance

let enqueue { stream } f =
let p, w = Promise.create () in
Stream.add stream (Pack (f, w));
p

let submit_fork ~sw instance f =
Fiber.fork_promise ~sw (fun () ->
enqueue instance f
|> Promise.await_exn )

let submit instance f =
enqueue instance f
|> Promise.await

let submit_exn instance f =
match submit instance f with
| Ok x -> x
| Error exn -> raise exn
22 changes: 22 additions & 0 deletions lib_eio/executor_pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
type t

(** Creates a new executorpool with [domain_count].
[domain_concurrency] is the maximum number of jobs that each domain can run at a time.
The executorpool will not block the [~sw] Switch from completing. *)
val create :
sw:Switch.t ->
domain_count:int ->
domain_concurrency:int ->
_ Domain_manager.t ->
t

(** Run a job on this executorpool. It is placed at the end of the queue. *)
val submit : t -> (unit -> 'a) -> ('a, exn) result

(** Same as [submit] but raises if the job failed. *)
val submit_exn : t -> (unit -> 'a) -> 'a

(** Same as [submit] but returns immediately, without blocking. *)
val submit_fork : sw:Switch.t -> t -> (unit -> 'a) -> ('a, exn) result Promise.t
280 changes: 280 additions & 0 deletions tests/executor_pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# Setting up the environment

```ocaml
# #require "eio_main";;
# #require "eio.mock";;
```

Creating some useful helper functions

```ocaml
open Eio.Std
module Executor_pool = Eio.Executor_pool
let () = Eio.Exn.Backend.show := false
let run fn =
Eio_mock.Backend.run @@ fun () ->
Eio_mock.Domain_manager.run @@ fun mgr ->
let clock = Eio_mock.Clock.make () in
let sleep ms =
let t0 = Eio.Time.now clock in
let t1 = t0 +. ms in
traceln "Sleeping %.0f: %.0f -> %.0f" ms t0 t1;
Fiber.both
(fun () -> Eio.Time.sleep_until clock t1)
(fun () ->
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
if Float.(Eio.Time.now clock <> t1) then
Eio_mock.Clock.advance clock)
in
let duration expected f =
let t0 = Eio.Time.now clock in
let res = f () in
let t1 = Eio.Time.now clock in
let actual = t1 -. t0 in
if Float.(actual = expected)
then (traceln "Duration (valid): %.0f" expected; res)
else failwith (Format.sprintf "Duration was not %.0f: %.0f" expected actual)
in
fn mgr sleep duration
```

# Concurrency

Runs jobs in parallel as much as possible (domains):

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let total = ref 0 in
let ep = Executor_pool.create ~sw ~domain_count:2 ~domain_concurrency:1 mgr in
duration 150. (fun () ->
List.init 5 (fun i -> i + 1)
|> Fiber.List.iter (fun i -> Executor_pool.submit_exn ep (fun () ->
sleep 50.;
total := !total + i
));
!total
);;
+[1] Sleeping 50: 0 -> 50
+[2] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[1] Sleeping 50: 50 -> 100
+[2] Sleeping 50: 50 -> 100
+[1] mock time is now 100
+[1] Sleeping 50: 100 -> 150
+[1] mock time is now 150
+[0] Duration (valid): 150
- : int = 15
```

Runs jobs in parallel as much as possible (workers):

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let total = ref 0 in
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:2 mgr in
duration 150. (fun () ->
List.init 5 (fun i -> i + 1)
|> Fiber.List.iter (fun i -> Executor_pool.submit_exn ep (fun () ->
sleep 50.;
total := !total + i
));
!total
);;
+[1] Sleeping 50: 0 -> 50
+[1] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[1] Sleeping 50: 50 -> 100
+[1] Sleeping 50: 50 -> 100
+[1] mock time is now 100
+[1] Sleeping 50: 100 -> 150
+[1] mock time is now 150
+[0] Duration (valid): 150
- : int = 15
```

Runs jobs in parallel as much as possible (both):

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let total = ref 0 in
let ep = Executor_pool.create ~sw ~domain_count:2 ~domain_concurrency:2 mgr in
duration 100. (fun () ->
List.init 5 (fun i -> i + 1)
|> Fiber.List.iter (fun i -> Executor_pool.submit_exn ep (fun () ->
sleep 50.;
total := !total + i
));
!total
);;
+[1] Sleeping 50: 0 -> 50
+[2] Sleeping 50: 0 -> 50
+[1] Sleeping 50: 0 -> 50
+[2] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[1] Sleeping 50: 50 -> 100
+[1] mock time is now 100
+[0] Duration (valid): 100
- : int = 15
```

# Job error handling

`Executor_pool.submit` returns a Result:

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let total = ref 0 in
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:4 mgr in
duration 100. (fun () ->
let results =
List.init 5 (fun i -> i + 1)
|> Fiber.List.map (fun i -> Executor_pool.submit ep (fun () ->
sleep 50.;
if i mod 2 = 0
then failwith (Int.to_string i)
else (let x = !total in total := !total + i; x)
))
in
results, !total
);;
+[1] Sleeping 50: 0 -> 50
+[1] Sleeping 50: 0 -> 50
+[1] Sleeping 50: 0 -> 50
+[1] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[1] Sleeping 50: 50 -> 100
+[1] mock time is now 100
+[0] Duration (valid): 100
- : (int, exn) result list * int =
([Ok 0; Error (Failure "2"); Ok 1; Error (Failure "4"); Ok 4], 9)
```

`Executor_pool.submit_exn` raises:

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let total = ref 0 in
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:2 mgr in
List.init 5 (fun i -> i + 1)
|> Fiber.List.map (fun i -> Executor_pool.submit_exn ep (fun () ->
traceln "Started %d" i;
let x = !total in
total := !total + i;
if x = 3
then failwith (Int.to_string i)
else x
));;
+[1] Started 1
+[1] Started 2
+[1] Started 3
+[1] Started 4
Exception: Failure "3".
```

# Blocking for capacity

`Executor_pool.submit` will block waiting for room in the queue:

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:1 mgr in
let p1 = Fiber.fork_promise ~sw (fun () -> Executor_pool.submit_exn ep (fun () -> sleep 50.)) in
duration 50. (fun () -> Executor_pool.submit_exn ep @@ fun () -> ());
duration 0. (fun () -> Promise.await_exn p1)
;;
+[1] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[0] Duration (valid): 50
+[0] Duration (valid): 0
- : unit = ()
```

`Executor_pool.submit_fork` will not block if there's not enough room in the queue:

```ocaml
# run @@ fun mgr sleep duration ->
Switch.run @@ fun sw ->
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:1 mgr in
let p1 = duration 0. (fun () ->
Fiber.fork_promise ~sw (fun () -> Executor_pool.submit_exn ep (fun () -> sleep 50.))
)
in
let p2 = duration 0. (fun () ->
Fiber.fork_promise ~sw (fun () -> Executor_pool.submit_exn ep (fun () -> sleep 50.))
)
in
let p3 = duration 0. (fun () ->
Executor_pool.submit_fork ~sw ep (fun () -> ())
)
in
duration 100. (fun () ->
Promise.await_exn p1;
Promise.await_exn p2;
Promise.await_exn p3;
(* Value restriction :( *)
Promise.create_resolved (Ok ())
)
|> Promise.await_exn
;;
+[0] Duration (valid): 0
+[0] Duration (valid): 0
+[0] Duration (valid): 0
+[1] Sleeping 50: 0 -> 50
+[1] mock time is now 50
+[1] Sleeping 50: 50 -> 100
+[1] mock time is now 100
+[0] Duration (valid): 100
- : unit = ()
```

# Checks switch status

```ocaml
# run @@ fun mgr sleep duration ->
let leak = ref None in
let count = ref 0 in
let () =
try (
Switch.run @@ fun sw ->
let ep = Executor_pool.create ~sw ~domain_count:1 ~domain_concurrency:1 mgr in
leak := Some ep;
let p1 = duration 0. (fun () ->
Fiber.fork_promise ~sw (fun () -> Executor_pool.submit_exn ep (fun () -> sleep 50.; incr count))
)
in
Switch.fail sw (Failure "Abort mission!");
Promise.await_exn p1;
Executor_pool.submit_exn ep (fun () -> sleep 50.; incr count) )
with _ -> ()
in
match !leak with
| None -> assert false
| Some ep ->
Executor_pool.submit_exn ep (fun () -> sleep 50.; incr count);
traceln "Count: %d" !count
+[0] Duration (valid): 0
Exception: Eio_mock__Backend.Deadlock_detected.
```

0 comments on commit e6e2550

Please sign in to comment.