Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-32622: avoid using select and instead use epoll #4

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
19 changes: 11 additions & 8 deletions ocaml/libs/http-svr/buf_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,17 @@ let is_full ic = ic.cur = 0 && ic.max = Bytes.length ic.buf
let fill_buf ~buffered ic timeout =
let buf_size = Bytes.length ic.buf in
let fill_no_exc timeout len =
let l, _, _ = Unix.select [ic.fd] [] [] timeout in
if List.length l <> 0 then (
let n = Unix.read ic.fd ic.buf ic.max len in
ic.max <- n + ic.max ;
if n = 0 && len <> 0 then raise Eof ;
n
) else
-1
Unix.setsockopt_float ic.fd Unix.SO_RCVTIMEO timeout ;
let result =
try
let n = Unix.read ic.fd ic.buf ic.max len in
ic.max <- n + ic.max ;
if n = 0 && len <> 0 then raise Eof ;
n
with Unix.Unix_error (Unix.EAGAIN, _, _) -> -1
in
Unix.setsockopt_float ic.fd Unix.SO_RCVTIMEO 0. ;
result
in
(* If there's no space to read, shift *)
if ic.max = buf_size then shift ic ;
Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/stunnel/stunnel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ let rec retry f = function
try f ()
with Stunnel_initialisation_failed ->
(* Leave a few seconds between each attempt *)
ignore (Unix.select [] [] [] 3.) ;
Thread.delay 3. ;
retry f (n - 1)
)

Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
(libraries
cohttp
message-switch-core
polly
rpclib.core
rpclib.json
threads.posix
Expand Down
37 changes: 21 additions & 16 deletions ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ module Delay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
mutable pipe_out: Unix.file_descr option
; mutable pipe_in: Unix.file_descr option
mutable sock_out: Unix.file_descr option
; mutable sock_in: Unix.file_descr option
; (* Indicates that a signal arrived before a wait: *)
mutable signalled: bool
; m: Mutex.t
}

let make () =
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()}
{sock_out= None; sock_in= None; signalled= false; m= Mutex.create ()}

exception Pre_signalled

Expand All @@ -59,39 +59,44 @@ module Delay = struct
finally'
(fun () ->
try
let pipe_out =
let sock_out =
Mutex.execute x.m (fun () ->
if x.signalled then (
x.signalled <- false ;
raise Pre_signalled
) ;
let pipe_out, pipe_in = Unix.pipe () in
let sock_out, sock_in =
Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0
in
(* these will be unconditionally closed on exit *)
to_close := [pipe_out; pipe_in] ;
x.pipe_out <- Some pipe_out ;
x.pipe_in <- Some pipe_in ;
to_close := [sock_out; sock_in] ;
x.sock_out <- Some sock_out ;
x.sock_in <- Some sock_in ;
x.signalled <- false ;
pipe_out
sock_out
)
in
let r, _, _ = Unix.select [pipe_out] [] [] seconds in
(* flush the single byte from the pipe *)
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
(* flush the single byte from the socket *)
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO seconds ;
(* return true if we waited the full length of time, false if we were woken *)
r = []
try
ignore (Unix.read sock_out (Bytes.create 1) 0 1) ;
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO 0. ;
false
with Unix.Unix_error (Unix.EAGAIN, _, _) -> true
with Pre_signalled -> false
)
(fun () ->
Mutex.execute x.m (fun () ->
x.pipe_out <- None ;
x.pipe_in <- None ;
x.sock_out <- None ;
x.sock_in <- None ;
List.iter close' !to_close
)
)

let signal (x : t) =
Mutex.execute x.m (fun () ->
match x.pipe_in with
match x.sock_in with
| Some fd ->
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
| None ->
Expand Down
80 changes: 30 additions & 50 deletions ocaml/networkd/lib/jsonrpc_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ let json_rpc_write_timeout = ref 60000000000L

(* timeout value in ns when writing RPC request *)

let to_s s = Int64.to_float s *. 1e-9

(* Read the entire contents of the fd, of unknown length *)
let timeout_read fd timeout =
let buf = Buffer.create !json_rpc_max_len in
Expand All @@ -42,13 +40,6 @@ let timeout_read fd timeout =
Mtime.Span.to_uint64_ns (Mtime_clock.count read_start)
in
let rec inner max_time max_bytes =
let ready_to_read, _, _ =
try Unix.select [fd] [] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
([], [], [])
in
(* This is not accurate the calculate time just for the select part.
However, we think the read time will be minor comparing to the scale of
tens of seconds. the current style will be much concise in code. *)
Expand All @@ -60,27 +51,25 @@ let timeout_read fd timeout =
debug "Timeout after read %d" (Buffer.length buf) ;
raise Timeout
) ;
if List.mem fd ready_to_read then
let bytes = Bytes.make 4096 '\000' in
match Unix.read fd bytes 0 4096 with
| 0 ->
Buffer.contents buf (* EOF *)
| n ->
if n > max_bytes then (
debug "exceeding maximum read limit %d, clear buffer"
!json_rpc_max_len ;
Buffer.clear buf ;
raise Read_error
) else (
Buffer.add_subbytes buf bytes 0 n ;
inner remain_time (max_bytes - n)
)
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
->
inner remain_time max_bytes
else
inner remain_time max_bytes
( try Unix.setsockopt_float fd SO_RCVTIMEO (Int64.to_float max_time)
with Unix.Unix_error (Unix.ENOTSOCK, _, _) ->
(* In the unit tests, the fd comes from a pipe... ignore *)
()
) ;
let bytes = Bytes.make 4096 '\000' in
match Unix.read fd bytes 0 4096 with
| 0 ->
Buffer.contents buf (* EOF *)
| n when n > max_bytes ->
debug "exceeding maximum read limit %d, clear buffer" !json_rpc_max_len ;
Buffer.clear buf ;
raise Read_error
| n ->
Buffer.add_subbytes buf bytes 0 n ;
inner remain_time (max_bytes - n)
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
inner remain_time max_bytes
in
inner timeout !json_rpc_max_len

Expand All @@ -95,13 +84,6 @@ let timeout_write filedesc total_length data response_time =
Mtime.Span.to_uint64_ns (Mtime_clock.count write_start)
in
let rec inner_write offset max_time =
let _, ready_to_write, _ =
try Unix.select [] [filedesc] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
([], [], [])
in
let remain_time =
let used_time = get_total_used_time () in
Int64.sub response_time used_time
Expand All @@ -110,32 +92,30 @@ let timeout_write filedesc total_length data response_time =
debug "Timeout to write %d at offset %d" total_length offset ;
raise Timeout
) ;
if List.mem filedesc ready_to_write then
let length = total_length - offset in
let bytes_written =
try Unix.single_write filedesc data offset length
with
| Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
->
0
in
let new_offset = offset + bytes_written in
Unix.setsockopt_float filedesc Unix.SO_SNDTIMEO (Int64.to_float max_time) ;
let length = total_length - offset in
let bytes_written =
try Unix.single_write filedesc data offset length
with Unix.Unix_error (Unix.EINTR, _, _) -> 0
in
let new_offset = offset + bytes_written in
try
if length = bytes_written then
()
else
inner_write new_offset remain_time
else
inner_write offset remain_time
with Unix.Unix_error (Unix.EAGAIN, _, _) -> inner_write offset remain_time
in
inner_write 0 response_time

let with_rpc ?(version = Jsonrpc.V2) ~path ~call () =
let uri = Uri.of_string (Printf.sprintf "file://%s" path) in
Open_uri.with_open_uri uri (fun s ->
Unix.set_nonblock s ;
let req = Bytes.of_string (Jsonrpc.string_of_call ~version call) in
timeout_write s (Bytes.length req) req !json_rpc_write_timeout ;
Unix.setsockopt_float s SO_SNDTIMEO 0. ;
let res = timeout_read s !json_rpc_read_timeout in
Unix.setsockopt_float s SO_RCVTIMEO 0. ;
debug "Response: %s" res ;
Jsonrpc.response_of_string ~strict:false res
)
4 changes: 2 additions & 2 deletions ocaml/squeezed/src/squeeze_xen.ml
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ let make_host ~verbose ~xc =
1024L
<> 0L
do
ignore (Unix.select [] [] [] 0.25)
Thread.delay 0.25
done ;

(* Some VMs are considered by us (but not by xen) to have an
Expand Down Expand Up @@ -857,7 +857,7 @@ let io ~xc ~verbose =
(fun domid kib ->
execute_action ~xc {Squeeze.action_domid= domid; new_target_kib= kib}
)
; wait= (fun delay -> ignore (Unix.select [] [] [] delay))
; wait= (fun delay -> Thread.delay delay)
; execute_action= (fun action -> execute_action ~xc action)
; target_host_free_mem_kib
; free_memory_tolerance_kib
Expand Down
1 change: 1 addition & 0 deletions ocaml/xapi-idl/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
message-switch-unix
mtime
mtime.clock.os
polly
ppx_sexp_conv.runtime-lib
re
rpclib.core
Expand Down
96 changes: 63 additions & 33 deletions ocaml/xapi-idl/lib/posix_channel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,34 @@ let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
in
(* If we can't make any progress (because fds have been closed), then stop *)
if r = [] && w = [] then raise End_of_file ;
let r, w, _ = Unix.select r w [] (-1.0) in
(* Do the writing before the reading *)
List.iter
(fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b)
w ;
List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r ;
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
if CBuf.end_of_reads buf then Unix.shutdown fd Unix.SHUTDOWN_SEND ;
if CBuf.end_of_writes buf then Unix.shutdown fd Unix.SHUTDOWN_RECEIVE
let epoll = Polly.create () in
List.iter (fun fd -> Polly.add epoll fd Polly.Events.inp) r ;
List.iter (fun fd -> Polly.add epoll fd Polly.Events.out) w ;
Fun.protect
~finally:(fun () -> Polly.close epoll)
(fun () ->
ignore
@@ Polly.wait epoll 4 (-1) (fun _ fd event ->
(* Note: only one fd is handled *)
if event = Polly.Events.inp then (
if a = fd then
CBuf.read a' a
else if b = fd then
CBuf.read b' b
) else if a = fd then
CBuf.write b' a
else if b = fd then
CBuf.write a' b
) ;
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
if CBuf.end_of_reads buf then Unix.shutdown fd Unix.SHUTDOWN_SEND ;
if CBuf.end_of_writes buf then
Unix.shutdown fd Unix.SHUTDOWN_RECEIVE
)
[(a', b); (b', a)]
)
[(a', b); (b', a)]
done
with _ -> (
(try Unix.clear_nonblock a with _ -> ()) ;
Expand Down Expand Up @@ -153,27 +168,42 @@ let send proxy_socket =
in
finally
(fun () ->
let readable, _, _ = Unix.select [s_ip; s_unix] [] [] (-1.0) in
if List.mem s_unix readable then (
let fd, _peer = Unix.accept s_unix in
to_close := fd :: !to_close ;
let buffer = Bytes.make (String.length token) '\000' in
let n = Unix.recv fd buffer 0 (Bytes.length buffer) [] in
let token' = Bytes.sub_string buffer 0 n in
if token = token' then
let (_ : int) =
Fd_send_recv.send_fd_substring fd token 0
(String.length token) [] proxy_socket
in
()
) else if List.mem s_ip readable then (
let fd, _peer = Unix.accept s_ip in
List.iter close !to_close ;
to_close := fd :: !to_close ;
proxy fd proxy_socket
) else
assert false
(* can never happen *)
let epoll = Polly.create () in
List.iter
(fun fd -> Polly.add epoll fd Polly.Events.inp)
[s_ip; s_unix] ;
Fun.protect
~finally:(fun () -> Polly.close epoll)
(fun () ->
ignore
@@ Polly.wait epoll 2 (-1) (fun _ fd _ ->
(* Note: only one fd is handled *)
if s_unix = fd then (
let fd, _peer = Unix.accept s_unix in
to_close := fd :: !to_close ;
let buffer =
Bytes.make (String.length token) '\000'
in
let n =
Unix.recv fd buffer 0 (Bytes.length buffer) []
in
let token' = Bytes.sub_string buffer 0 n in
if token = token' then
let (_ : int) =
Fd_send_recv.send_fd_substring fd token 0
(String.length token) [] proxy_socket
in
()
) else if s_ip = fd then (
let fd, _peer = Unix.accept s_ip in
List.iter close !to_close ;
to_close := fd :: !to_close ;
proxy fd proxy_socket
) else
Printf.fprintf stderr
"Unexpected file descriptor returned by epoll"
)
)
)
(fun () ->
List.iter close !to_close ;
Expand Down
1 change: 1 addition & 0 deletions ocaml/xe-cli/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
astring
dune-build-info
fpath
polly
safe-resources
stunnel
threads
Expand Down
Loading