From 07a05953022253190d60a0381bc6984e9ce2dd4e Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Thu, 9 Jan 2025 22:06:31 +0100 Subject: [PATCH] Add support for asynchronous socket:accept/2 API Signed-off-by: Paul Guyot --- CHANGELOG.md | 2 +- libs/estdlib/src/socket.erl | 53 +++++++++++++++---- src/libAtomVM/otp_socket.c | 4 +- tests/libs/estdlib/test_tcp_socket.erl | 72 ++++++++++++++++++++++++-- 4 files changed, 113 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e19180b15..323f87282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms - Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms. - Added `erl_epmd` client implementation to epmd using `socket` module -- Added support for socket asynchronous API for `recv` and `recvfrom`. +- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`. ### Changed diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index 0de6087f9..447dbaee6 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -229,29 +229,47 @@ accept(Socket) -> %% be set to listen for connections. %% %% Note that this function will block until a connection is made -%% from a client. Typically, users will spawn a call to `accept' -%% in a separate process. +%% from a client, unless `nowait' or a reference is passed as `Timeout'. +%% Typically, users will spawn a call to `accept' in a separate process. %% %% Example: %% %% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)' %% @end %%----------------------------------------------------------------------------- --spec accept(Socket :: socket(), Timeout :: timeout()) -> - {ok, Connection :: socket()} | {error, Reason :: term()}. +-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) -> + {ok, Connection :: socket()} + | {select, {select_info, accept, reference()}} + | {error, Reason :: term()}. +accept(Socket, 0) -> + accept0_noselect(Socket); +accept(Socket, nowait) -> + accept0_nowait(Socket, erlang:make_ref()); +accept(Socket, Ref) when is_reference(Ref) -> + accept0_nowait(Socket, Ref); accept(Socket, Timeout) -> + accept0(Socket, Timeout). + +accept0_noselect(Socket) -> + case ?MODULE:nif_accept(Socket) of + {error, _} = E -> + E; + {ok, _Socket} = Reply -> + Reply + end. + +accept0(Socket, Timeout) -> Ref = erlang:make_ref(), - ?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]), case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive {'$socket', Socket, select, Ref} -> case ?MODULE:nif_accept(Socket) of - {error, closed} = E -> + {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - R -> - R + {ok, _Socket} = Reply -> + Reply end; {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process @@ -259,9 +277,7 @@ accept(Socket, Timeout) -> % (a) SELECT_STOP being scheduled % (b) flush of messages as we can have both in the % queue - {error, closed}; - Other -> - {error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}} + {error, closed} after Timeout -> {error, timeout} end; @@ -269,6 +285,21 @@ accept(Socket, Timeout) -> Error end. +accept0_nowait(Socket, Ref) -> + case ?MODULE:nif_accept(Socket) of + {error, eagain} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {select_info, accept, Ref}}; + {error, _} = SelectError -> + SelectError + end; + {error, _} = RecvError -> + RecvError; + {ok, _Socket} = Reply -> + Reply + end. + %%----------------------------------------------------------------------------- %% @equiv socket:recv(Socket, 0) %% @end diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index 9edc10ae5..65656733c 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -1743,8 +1743,10 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { - AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd); int err = errno; + if (err != EAGAIN) { + AVM_LOGI(TAG, "Unable to accept on socket %i. errno=%i", rsrc_obj->fd, (int) err); + } term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); return make_error_tuple(reason, ctx); } else { diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index fa154af17..e642d8915 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -28,7 +28,8 @@ test() -> ok = test_close_by_another_process(), ok = test_buf_size(), ok = test_timeout(), - ok = test_nowait(), + ok = test_recv_nowait(), + ok = test_accept_nowait(), ok = test_setopt_getopt(), case get_otp_version() of atomvm -> @@ -430,12 +431,12 @@ test_timeout() -> ok = close_client_socket(Socket), ok = close_listen_socket(ListenSocket). -test_nowait() -> - ok = test_nowait(fun receive_loop_nowait/2), - ok = test_nowait(fun receive_loop_nowait_ref/2), +test_recv_nowait() -> + ok = test_recv_nowait(fun receive_loop_nowait/2), + ok = test_recv_nowait(fun receive_loop_nowait_ref/2), ok. -test_nowait(ReceiveFun) -> +test_recv_nowait(ReceiveFun) -> etest:flush_msg_queue(), Port = 44404, @@ -460,6 +461,67 @@ test_nowait(ReceiveFun) -> ok = close_listen_socket(ListenSocket). +test_accept_nowait() -> + OTPVersion = get_otp_version(), + ok = test_accept_nowait(nowait, OTPVersion), + ok = test_accept_nowait(make_ref(), OTPVersion), + ok. + +% actually since 22.1, but let's simplify here. +test_accept_nowait(_NoWaitRef, Version) when Version =/= atomvm andalso Version < 23 -> ok; +test_accept_nowait(Ref, Version) when + is_reference(Ref) andalso Version =/= atomvm andalso Version < 24 +-> + ok; +test_accept_nowait(NoWaitRef, _Version) -> + etest:flush_msg_queue(), + + Port = 44404, + {ok, Socket} = socket:open(inet, stream, tcp), + ok = socket:setopt(Socket, {socket, reuseaddr}, true), + ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}), + + ok = socket:bind(Socket, #{ + family => inet, addr => loopback, port => Port + }), + + ok = socket:listen(Socket), + + Parent = self(), + {Child, MonitorRef} = spawn_opt( + fun() -> + {select, {select_info, accept, Ref}} = socket:accept(Socket, NoWaitRef), + Parent ! {self(), got_nowait}, + receive + {'$socket', Socket, select, Ref} -> + {ok, ConnSocket} = socket:accept(Socket, 0), + socket:send(ConnSocket, <<"hello">>), + socket:close(ConnSocket) + after 5000 -> + exit(timeout) + end + end, + [link, monitor] + ), + ok = + receive + {Child, got_nowait} -> ok + after 5000 -> timeout + end, + {ok, ClientSocket} = socket:open(inet, stream, tcp), + ok = socket:connect(ClientSocket, #{family => inet, addr => loopback, port => Port}), + {ok, <<"hello">>} = socket:recv(ClientSocket, 5), + + socket:close(ClientSocket), + ok = + receive + {'DOWN', MonitorRef, process, Child, normal} -> ok + after 5000 -> + timeout + end, + socket:close(Socket), + ok. + test_setopt_getopt() -> {ok, Socket} = socket:open(inet, stream, tcp), {ok, stream} = socket:getopt(Socket, {socket, type}),