Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-32622: avoid using select and instead use epoll #4877

Closed
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
(libraries
cohttp
message-switch-core
polly
rpclib.core
rpclib.json
threads.posix
Expand Down
37 changes: 21 additions & 16 deletions ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ module Delay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
mutable pipe_out: Unix.file_descr option
; mutable pipe_in: Unix.file_descr option
mutable sock_out: Unix.file_descr option
; mutable sock_in: Unix.file_descr option
; (* Indicates that a signal arrived before a wait: *)
mutable signalled: bool
; m: Mutex.t
}

let make () =
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()}
{sock_out= None; sock_in= None; signalled= false; m= Mutex.create ()}

exception Pre_signalled

Expand All @@ -59,39 +59,44 @@ module Delay = struct
finally'
(fun () ->
try
let pipe_out =
let sock_out =
Mutex.execute x.m (fun () ->
if x.signalled then (
x.signalled <- false ;
raise Pre_signalled
) ;
let pipe_out, pipe_in = Unix.pipe () in
let sock_out, sock_in =
Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0
in
(* these will be unconditionally closed on exit *)
to_close := [pipe_out; pipe_in] ;
x.pipe_out <- Some pipe_out ;
x.pipe_in <- Some pipe_in ;
to_close := [sock_out; sock_in] ;
x.sock_out <- Some sock_out ;
x.sock_in <- Some sock_in ;
x.signalled <- false ;
pipe_out
sock_out
)
in
let r, _, _ = Unix.select [pipe_out] [] [] seconds in
(* flush the single byte from the pipe *)
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
(* flush the single byte from the socket *)
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO seconds ;
(* return true if we waited the full length of time, false if we were woken *)
r = []
try
ignore (Unix.read sock_out (Bytes.create 1) 0 1) ;
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO 0. ;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we'd miss resetting the socket option in case of an exception.

false
with Unix.Unix_error (Unix.EAGAIN, _, _) -> true
with Pre_signalled -> false
)
(fun () ->
Mutex.execute x.m (fun () ->
x.pipe_out <- None ;
x.pipe_in <- None ;
x.sock_out <- None ;
x.sock_in <- None ;
List.iter close' !to_close
)
)

let signal (x : t) =
Mutex.execute x.m (fun () ->
match x.pipe_in with
match x.sock_in with
| Some fd ->
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
| None ->
Expand Down