Skip to content

Commit

Permalink
Add support for asynchronous socket API for recv and recvfrom.
Browse files Browse the repository at this point in the history
Also improve socket stability.

Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Dec 24, 2024
1 parent bd67862 commit f186dc1
Show file tree
Hide file tree
Showing 21 changed files with 1,373 additions and 311 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for external pids and encoded pids in external terms
- Added support for external refs and encoded refs in external terms
- Introduce ports to represent native processes and added support for external ports and encoded ports in external terms
- Added support for socket asynchronous API for `recv` and `recvfrom`.

## [0.6.6] - Unreleased

Expand Down
27 changes: 23 additions & 4 deletions libs/estdlib/src/gen_tcp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{select, _Socket, Ref, ready_input}, State
]),
handle_info({'$socket', _Socket, select, Ref}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
Expand All @@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) ->
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({'$socket', Socket, abort, {Ref, closed}}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]),
socket:nif_select_stop(Socket),
{noreply, State};
{accept, From, _AcceptingProc, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State};
active ->
WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE},
State#state.controlling_process ! {tcp_closed, WrappedSocket},
{noreply, State};
{passive, From, _Length, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State#state{
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({timeout, Ref, From}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{timeout, Ref, From}, State
Expand Down
2 changes: 1 addition & 1 deletion libs/estdlib/src/gen_udp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
handle_info({'$socket', _Socket, select, Ref}, State) ->
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]),
Expand Down
208 changes: 184 additions & 24 deletions libs/estdlib/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
send/2,
sendto/3,
setopt/3,
getopt/2,
connect/2,
shutdown/2
]).
Expand Down Expand Up @@ -66,7 +67,9 @@
-type in_addr() :: {0..255, 0..255, 0..255, 0..255}.
-type port_number() :: 0..65535.

-type socket_option() :: {socket, reuseaddr} | {socket, linger}.
-type socket_option() ::
{socket, reuseaddr | linger | type}
| {otp, recvbuf}.

-export_type([
socket/0,
Expand Down Expand Up @@ -242,22 +245,23 @@ accept(Socket, Timeout) ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
?MODULE:nif_select_stop(Socket),
E;
R ->
R
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
% (b) flush of messages as we can have both
% {closed, Ref} and {select, _, Ref, _} in the
% (b) flush of messages as we can have both in the
% queue
{error, closed}
{error, closed};
Other ->
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
after Timeout ->
{error, timeout}
end;
Expand Down Expand Up @@ -296,25 +300,60 @@ recv(Socket, Length) ->
%% `{ok, Data} = socket:recv(ConnectedSocket)'
%% @end
%%-----------------------------------------------------------------------------
-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, Data :: binary()} | {error, Reason :: term()}.
-spec recv(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, Data :: binary()}
| {select, {select_info, recvfrom, reference()}}
| {select, {{select_info, recvfrom, reference()}, Data :: binary()}}
| {error, Reason :: term()}.
recv(Socket, Length, 0) ->
recv0_noselect(Socket, Length);
recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity ->
recv0(Socket, 0, Timeout);
recv(Socket, Length, nowait) ->
recv0_nowait(Socket, Length, erlang:make_ref());
recv(Socket, Length, Ref) when is_reference(Ref) ->
recv0_nowait(Socket, Length, Ref);
recv(Socket, Length, Timeout) ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} when Timeout =/= infinity ->
recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []);
{ok, stream} when Timeout =:= infinity ->
recv0_r(Socket, Length, Timeout, undefined, []);
_ ->
recv0(Socket, Length, Timeout)
end.

recv0_noselect(Socket, Length) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
E;
{ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length ->
{ok, Data};
{ok, Data} ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
{error, {timeout, Data}};
{ok, dgram} ->
{ok, Data}
end
end.

recv0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, Data} ->
{ok, Data}
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -325,6 +364,72 @@ recv(Socket, Length, Timeout) ->
Error
end.

recv0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recv, Ref}};
{error, _} = Error1 ->
Error1
end;
{error, _} = E ->
E;
{ok, Data} when byte_size(Data) < Length ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {{select_info, recv, Ref}, Data}};
{error, _} = Error1 ->
Error1
end;
{ok, dgram} ->
{ok, Data}
end;
{ok, Data} ->
{ok, Data}
end.

recv0_r(Socket, Length, Timeout, EndQuery, Acc) ->
Ref = erlang:make_ref(),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
{ok, Data} ->
NewAcc = [Data | Acc],
Remaining = Length - byte_size(Data),
case Remaining of
0 ->
{ok, list_to_binary(lists:reverse(NewAcc))};
_ ->
NewTimeout =
case Timeout of
infinity -> infinity;
_ -> EndQuery - erlang:system_time(millisecond)
end,
recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc)
end
end;
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
after Timeout ->
case Acc of
[] -> {error, timeout};
_ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}}
end
end;
{error, _Reason} = Error ->
Error
end.

%%-----------------------------------------------------------------------------
%% @equiv socket:recvfrom(Socket, 0)
%% @end
Expand Down Expand Up @@ -367,25 +472,43 @@ recvfrom(Socket, Length) ->
%% bytes are available and return these bytes.
%% @end
%%-----------------------------------------------------------------------------
-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}.
-spec recvfrom(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, {Address :: sockaddr(), Data :: binary()}}
| {select, {select_info, recvfrom, reference()}}
| {error, Reason :: term()}.
recvfrom(Socket, Length, 0) ->
recvfrom0_noselect(Socket, Length);
recvfrom(Socket, Length, nowait) ->
recvfrom0_nowait(Socket, Length, erlang:make_ref());
recvfrom(Socket, Length, Ref) when is_reference(Ref) ->
recvfrom0_nowait(Socket, Length, Ref);
recvfrom(Socket, Length, Timeout) ->
recvfrom0(Socket, Length, Timeout).

recvfrom0_noselect(Socket, Length) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
E;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

recvfrom0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, {Address, Data}} ->
{ok, {Address, Data}}
{ok, {_Address, _Data}} = Reply ->
Reply
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -396,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
Error
end.

recvfrom0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recvfrom, Ref}};
{error, _} = SelectError ->
SelectError
end;
{error, _} = RecvError ->
RecvError;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param Data the data to send
Expand Down Expand Up @@ -443,11 +581,32 @@ sendto(Socket, Data, Dest) when is_binary(Data) ->
sendto(Socket, Data, Dest) ->
?MODULE:nif_sendto(Socket, erlang:iolist_to_binary(Data), Dest).

%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param SocketOption the option
%% @returns `{ok, Value}' if successful; `{error, Reason}', otherwise.
%% @doc Get a socket option.
%%
%% Currently, the following options are supported:
%% <table>
%% <tr><td>`{socket, type}'</td><td>`type()'</td></tr>
%% </table>
%%
%% Example:
%%
%% `{ok, stream} = socket:getopt(ListeningSocket, {socket, type})'
%% @end
%%-----------------------------------------------------------------------------
-spec getopt(Socket :: socket(), SocketOption :: socket_option()) ->
{ok, Value :: term()} | {error, Reason :: term()}.
getopt(_Socket, _SocketOption) ->
erlang:nif_error(undefined).

%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param SocketOption the option
%% @param Value the option value
%% @returns `{ok, Address}' if successful; `{error, Reason}', otherwise.
%% @returns `ok' if successful; `{error, Reason}', otherwise.
%% @doc Set a socket option.
%%
%% Set an option on a socket.
Expand All @@ -456,6 +615,7 @@ sendto(Socket, Data, Dest) ->
%% <table>
%% <tr><td>`{socket, reuseaddr}'</td><td>`boolean()'</td></tr>
%% <tr><td>`{socket, linger}'</td><td>`#{onoff => boolean(), linger => non_neg_integer()}'</td></tr>
%% <tr><td>`{otp, recvbuf}'</td><td>`non_neg_integer()'</td></tr>
%% </table>
%%
%% Example:
Expand All @@ -465,7 +625,7 @@ sendto(Socket, Data, Dest) ->
%% @end
%%-----------------------------------------------------------------------------
-spec setopt(Socket :: socket(), SocketOption :: socket_option(), Value :: term()) ->
ok | {error, Reason :: term()}.
ok | {error, any()}.
setopt(_Socket, _SocketOption, _Value) ->
erlang:nif_error(undefined).

Expand Down
Loading

0 comments on commit f186dc1

Please sign in to comment.