Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for socket asynchronous api for recv & recvfrom #1390

Open
wants to merge 1 commit into
base: feature/distributed-erlang
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
- Added `erl_epmd` client implementation to epmd using `socket` module
- Added support for socket asynchronous API for `recv` and `recvfrom`.

### Changed

Expand Down
27 changes: 23 additions & 4 deletions libs/estdlib/src/gen_tcp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{select, _Socket, Ref, ready_input}, State
]),
handle_info({'$socket', _Socket, select, Ref}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
Expand All @@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) ->
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({'$socket', Socket, abort, {Ref, closed}}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]),
socket:nif_select_stop(Socket),
{noreply, State};
{accept, From, _AcceptingProc, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State};
active ->
WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE},
State#state.controlling_process ! {tcp_closed, WrappedSocket},
{noreply, State};
{passive, From, _Length, _Timeout} ->
socket:nif_select_stop(Socket),
gen_server:reply(From, {error, closed}),
{noreply, State#state{
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
handle_info({timeout, Ref, From}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{timeout, Ref, From}, State
Expand Down
2 changes: 1 addition & 1 deletion libs/estdlib/src/gen_udp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

%% @hidden
handle_info({select, _Socket, Ref, ready_input}, State) ->
handle_info({'$socket', _Socket, select, Ref}, State) ->
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]),
Expand Down
177 changes: 156 additions & 21 deletions libs/estdlib/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,23 @@ accept(Socket, Timeout) ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
?MODULE:nif_select_stop(Socket),
E;
R ->
R
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
% (b) flush of messages as we can have both
% {closed, Ref} and {select, _, Ref, _} in the
% (b) flush of messages as we can have both in the
% queue
{error, closed}
{error, closed};
Other ->
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
after Timeout ->
{error, timeout}
end;
Expand Down Expand Up @@ -299,25 +300,60 @@ recv(Socket, Length) ->
%% `{ok, Data} = socket:recv(ConnectedSocket)'
%% @end
%%-----------------------------------------------------------------------------
-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, Data :: binary()} | {error, Reason :: term()}.
-spec recv(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, Data :: binary()}
| {select, {select_info, recvfrom, reference()}}
| {select, {{select_info, recvfrom, reference()}, Data :: binary()}}
| {error, Reason :: term()}.
recv(Socket, Length, 0) ->
recv0_noselect(Socket, Length);
recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity ->
recv0(Socket, 0, Timeout);
recv(Socket, Length, nowait) ->
recv0_nowait(Socket, Length, erlang:make_ref());
recv(Socket, Length, Ref) when is_reference(Ref) ->
recv0_nowait(Socket, Length, Ref);
recv(Socket, Length, Timeout) ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} when Timeout =/= infinity ->
recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []);
{ok, stream} when Timeout =:= infinity ->
recv0_r(Socket, Length, Timeout, undefined, []);
_ ->
recv0(Socket, Length, Timeout)
end.

recv0_noselect(Socket, Length) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
E;
{ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length ->
{ok, Data};
{ok, Data} ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
{error, {timeout, Data}};
{ok, dgram} ->
{ok, Data}
end
end.

recv0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, Data} ->
{ok, Data}
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) ->
Error
end.

recv0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recv(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recv, Ref}};
{error, _} = Error1 ->
Error1
end;
{error, _} = E ->
E;
{ok, Data} when byte_size(Data) < Length ->
case ?MODULE:getopt(Socket, {socket, type}) of
{ok, stream} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {{select_info, recv, Ref}, Data}};
{error, _} = Error1 ->
Error1
end;
{ok, dgram} ->
{ok, Data}
end;
{ok, Data} ->
{ok, Data}
end.

recv0_r(Socket, Length, Timeout, EndQuery, Acc) ->
Ref = erlang:make_ref(),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
{ok, Data} ->
NewAcc = [Data | Acc],
Remaining = Length - byte_size(Data),
case Remaining of
0 ->
{ok, list_to_binary(lists:reverse(NewAcc))};
_ ->
NewTimeout =
case Timeout of
infinity -> infinity;
_ -> EndQuery - erlang:system_time(millisecond)
end,
recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc)
end
end;
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
after Timeout ->
case Acc of
[] -> {error, timeout};
_ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}}
end
end;
{error, _Reason} = Error ->
Error
end.

%%-----------------------------------------------------------------------------
%% @equiv socket:recvfrom(Socket, 0)
%% @end
Expand Down Expand Up @@ -370,25 +472,43 @@ recvfrom(Socket, Length) ->
%% bytes are available and return these bytes.
%% @end
%%-----------------------------------------------------------------------------
-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
{ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}.
-spec recvfrom(
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
) ->
{ok, {Address :: sockaddr(), Data :: binary()}}
| {select, {select_info, recvfrom, reference()}}
| {error, Reason :: term()}.
recvfrom(Socket, Length, 0) ->
recvfrom0_noselect(Socket, Length);
recvfrom(Socket, Length, nowait) ->
recvfrom0_nowait(Socket, Length, erlang:make_ref());
recvfrom(Socket, Length, Ref) when is_reference(Ref) ->
recvfrom0_nowait(Socket, Length, Ref);
recvfrom(Socket, Length, Timeout) ->
recvfrom0(Socket, Length, Timeout).

recvfrom0_noselect(Socket, Length) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
E;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

recvfrom0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _AcceptedSocket, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
% TODO: Assemble data to have more if Length > byte_size(Data)
% as long as timeout did not expire
{ok, {Address, Data}} ->
{ok, {Address, Data}}
{ok, {_Address, _Data}} = Reply ->
Reply
end;
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
Expand All @@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
Error
end.

recvfrom0_nowait(Socket, Length, Ref) ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, timeout} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, recvfrom, Ref}};
{error, _} = SelectError ->
SelectError
end;
{error, _} = RecvError ->
RecvError;
{ok, {_Address, _Data}} = Reply ->
Reply
end.

%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param Data the data to send
Expand Down
16 changes: 8 additions & 8 deletions libs/estdlib/src/ssl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
handshake_loop(SSLContext, Socket);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
Expand Down Expand Up @@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
close_notify_loop(SSLContext, Socket);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
Expand Down Expand Up @@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
send(SSLSocket, Binary);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
Expand Down Expand Up @@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
{select, _SocketResource, Ref, ready_input} ->
{'$socket', Socket, select, Ref} ->
recv0(SSLSocket, Length, Remaining, Acc);
{closed, Ref} ->
{'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
Expand Down
6 changes: 6 additions & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,9 @@ X(SHUTDOWN_ATOM, "\x8", "shutdown")

X(NONODE_AT_NOHOST_ATOM, "\xD", "nonode@nohost")
X(NET_KERNEL_ATOM, "\xA", "net_kernel")

X(DOLLAR_SOCKET_ATOM, "\x7", "$socket")
X(ABORT_ATOM, "\x5", "abort")
X(FAMILY_ATOM, "\x6", "family")
X(INET_ATOM, "\x4", "inet")
X(TIMEOUT_ATOM, "\x7", "timeout")
26 changes: 26 additions & 0 deletions src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);

/**
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param obj resource object working as a container of the event object.
* @param pid process id to send a message to or NULL to use the current process (from `env`)
* @param msg message to send (copied).
* @param msg_env must be NULL.
* @return a negative value on failure, 0 or flags on success.
*/
int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);

/**
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param obj resource object working as a container of the event object.
* @param pid process id to send a message to or NULL to use the current process (from `env`)
* @param msg message to send (copied).
* @param msg_env must be NULL.
* @return a negative value on failure, 0 or flags on success.
*/
int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);

/**
* @brief Monitor a process by using a resource object.
* @details The monitor is automatically removed after being triggered or if the
Expand Down
Loading
Loading