Skip to content

Commit

Permalink
Add support for asynchronous socket API for recv and recvfrom.
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Jan 5, 2025
1 parent 27ffddd commit ea66fee
Show file tree
Hide file tree
Showing 18 changed files with 976 additions and 253 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `socket:getopt/2`
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
- Added support for socket asynchronous API for `recv` and `recvfrom`.

### Changed

Expand Down
27 changes: 23 additions & 4 deletions libs/estdlib/src/gen_tcp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{select, _Socket, Ref, ready_input}, State
]),
handle_info({'$socket', _Socket, select, Ref}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
Expand All @@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) ->
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({'$socket', Socket, abort, {Ref, closed}}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]),
socket:nif_select_stop(Socket),
{noreply, State};
{accept, From, _AcceptingProc, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State};
active ->
WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE},
State#state.controlling_process ! {tcp_closed, WrappedSocket},
{noreply, State};
{passive, From, _Length, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State#state{
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({timeout, Ref, From}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{timeout, Ref, From}, State
Expand Down
2 changes: 1 addition & 1 deletion libs/estdlib/src/gen_udp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
handle_info({'$socket', _Socket, select, Ref}, State) ->
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]),
Expand Down
177 changes: 156 additions & 21 deletions libs/estdlib/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,23 @@ accept(Socket, Timeout) ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
?MODULE:nif_select_stop(Socket),
E;
R ->
R
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
% (b) flush of messages as we can have both
% {closed, Ref} and {select, _, Ref, _} in the
% (b) flush of messages as we can have both in the
% queue
{error, closed}
{error, closed};
Other ->
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
after Timeout ->
{error, timeout}
end;
Expand Down Expand Up @@ -299,25 +300,60 @@ recv(Socket, Length) ->
%% `{ok, Data} = socket:recv(ConnectedSocket)'
%% @end
%%-----------------------------------------------------------------------------
-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, Data :: binary()} | {error, Reason :: term()}.
-spec recv(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, Data :: binary()}
| {select, {select_info, recvfrom, reference()}}
| {select, {{select_info, recvfrom, reference()}, Data :: binary()}}
| {error, Reason :: term()}.
recv(Socket, Length, 0) ->
recv0_noselect(Socket, Length);
recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity ->
recv0(Socket, 0, Timeout);
recv(Socket, Length, nowait) ->
recv0_nowait(Socket, Length, erlang:make_ref());
recv(Socket, Length, Ref) when is_reference(Ref) ->
recv0_nowait(Socket, Length, Ref);
recv(Socket, Length, Timeout) ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} when Timeout =/= infinity ->
recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []);
{ok, stream} when Timeout =:= infinity ->
recv0_r(Socket, Length, Timeout, undefined, []);
_ ->
recv0(Socket, Length, Timeout)
end.

recv0_noselect(Socket, Length) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
E;
{ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length ->
{ok, Data};
{ok, Data} ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
{error, {timeout, Data}};
{ok, dgram} ->
{ok, Data}
end
end.

recv0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, Data} ->
{ok, Data}
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) ->
Error
end.

recv0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recv, Ref}};
{error, _} = Error1 ->
Error1
end;
{error, _} = E ->
E;
{ok, Data} when byte_size(Data) < Length ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {{select_info, recv, Ref}, Data}};
{error, _} = Error1 ->
Error1
end;
{ok, dgram} ->
{ok, Data}
end;
{ok, Data} ->
{ok, Data}
end.

recv0_r(Socket, Length, Timeout, EndQuery, Acc) ->
Ref = erlang:make_ref(),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
{ok, Data} ->
NewAcc = [Data | Acc],
Remaining = Length - byte_size(Data),
case Remaining of
0 ->
{ok, list_to_binary(lists:reverse(NewAcc))};
_ ->
NewTimeout =
case Timeout of
infinity -> infinity;
_ -> EndQuery - erlang:system_time(millisecond)
end,
recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc)
end
end;
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
after Timeout ->
case Acc of
[] -> {error, timeout};
_ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}}
end
end;
{error, _Reason} = Error ->
Error
end.

%%-----------------------------------------------------------------------------
%% @equiv socket:recvfrom(Socket, 0)
%% @end
Expand Down Expand Up @@ -370,25 +472,43 @@ recvfrom(Socket, Length) ->
%% bytes are available and return these bytes.
%% @end
%%-----------------------------------------------------------------------------
-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}.
-spec recvfrom(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, {Address :: sockaddr(), Data :: binary()}}
| {select, {select_info, recvfrom, reference()}}
| {error, Reason :: term()}.
recvfrom(Socket, Length, 0) ->
recvfrom0_noselect(Socket, Length);
recvfrom(Socket, Length, nowait) ->
recvfrom0_nowait(Socket, Length, erlang:make_ref());
recvfrom(Socket, Length, Ref) when is_reference(Ref) ->
recvfrom0_nowait(Socket, Length, Ref);
recvfrom(Socket, Length, Timeout) ->
recvfrom0(Socket, Length, Timeout).

recvfrom0_noselect(Socket, Length) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
E;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

recvfrom0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, {Address, Data}} ->
{ok, {Address, Data}}
{ok, {_Address, _Data}} = Reply ->
Reply
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
Error
end.

recvfrom0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recvfrom, Ref}};
{error, _} = SelectError ->
SelectError
end;
{error, _} = RecvError ->
RecvError;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param Data the data to send
Expand Down
16 changes: 8 additions & 8 deletions libs/estdlib/src/ssl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
handshake_loop(SSLContext, Socket);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
Expand Down Expand Up @@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
close_notify_loop(SSLContext, Socket);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
Expand Down Expand Up @@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
send(SSLSocket, Binary);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
Expand Down Expand Up @@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
recv0(SSLSocket, Length, Remaining, Acc);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
Expand Down
6 changes: 6 additions & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,9 @@ X(REGISTERED_NAME_ATOM, "\xF", "registered_name")

X(NONODE_AT_NOHOST_ATOM, "\xD", "nonode@nohost")
X(NET_KERNEL_ATOM, "\xA", "net_kernel")

X(DOLLAR_SOCKET_ATOM, "\x7", "$socket")
X(ABORT_ATOM, "\x5", "abort")
X(FAMILY_ATOM, "\x6", "family")
X(INET_ATOM, "\x4", "inet")
X(TIMEOUT_ATOM, "\x7", "timeout")
26 changes: 26 additions & 0 deletions src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);

/**
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param obj resource object working as a container of the event object.
* @param pid process id to send a message to or NULL to use the current process (from `env`)
* @param msg message to send (copied).
* @param msg_env must be NULL.
* @return a negative value on failure, 0 or flags on success.
*/
int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);

/**
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param obj resource object working as a container of the event object.
* @param pid process id to send a message to or NULL to use the current process (from `env`)
* @param msg message to send (copied).
* @param msg_env must be NULL.
* @return a negative value on failure, 0 or flags on success.
*/
int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);

/**
* @brief Monitor a process by using a resource object.
* @details The monitor is automatically removed after being triggered or if the
Expand Down
Loading

0 comments on commit ea66fee

Please sign in to comment.