From 469b42439174f9c999558997cc0a104426ceb073 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sat, 7 Dec 2024 23:09:05 +0100 Subject: [PATCH] Add support for asynchronous socket API for `recv` and `recvfrom`. Signed-off-by: Paul Guyot --- CHANGELOG.md | 1 + libs/estdlib/src/gen_tcp_socket.erl | 27 +- libs/estdlib/src/gen_udp_socket.erl | 2 +- libs/estdlib/src/socket.erl | 177 ++++++++-- libs/estdlib/src/ssl.erl | 16 +- src/libAtomVM/defaultatoms.def | 6 + src/libAtomVM/erl_nif.h | 26 ++ src/libAtomVM/globalcontext.c | 34 +- src/libAtomVM/globalcontext.h | 36 ++ src/libAtomVM/mailbox.c | 20 +- src/libAtomVM/mailbox.h | 36 ++ src/libAtomVM/otp_socket.c | 131 ++++++-- src/libAtomVM/resources.c | 96 ++++-- src/libAtomVM/resources.h | 2 + .../test/main/test_erl_sources/test_ssl.erl | 16 +- .../rp2/tests/test_erl_sources/CMakeLists.txt | 1 + tests/libs/estdlib/test_tcp_socket.erl | 312 ++++++++++++------ tests/libs/estdlib/test_udp_socket.erl | 292 ++++++++++++---- 18 files changed, 977 insertions(+), 254 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c6194f6d..e19180b15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms - Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms. - Added `erl_epmd` client implementation to epmd using `socket` module +- Added support for socket asynchronous API for `recv` and `recvfrom`. ### Changed diff --git a/libs/estdlib/src/gen_tcp_socket.erl b/libs/estdlib/src/gen_tcp_socket.erl index 6d808c417..86079a42e 100644 --- a/libs/estdlib/src/gen_tcp_socket.erl +++ b/libs/estdlib/src/gen_tcp_socket.erl @@ -339,10 +339,7 @@ handle_cast(_Request, State) -> {noreply, State}. %% @hidden -handle_info({select, _Socket, Ref, ready_input}, State) -> - ?LOG_DEBUG("handle_info [~p], ~p]", [ - {select, _Socket, Ref, ready_input}, State - ]), +handle_info({'$socket', _Socket, select, Ref}, State) -> %% TODO cancel timer case maps:get(Ref, State#state.pending_selects, undefined) of undefined -> @@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) -> pending_selects = maps:remove(Ref, State#state.pending_selects) }} end; +handle_info({'$socket', Socket, abort, {Ref, closed}}, State) -> + %% TODO cancel timer + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + ?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]), + socket:nif_select_stop(Socket), + {noreply, State}; + {accept, From, _AcceptingProc, _Timeout} -> + socket:nif_select_stop(Socket), + gen_server:reply(From, {error, closed}), + {noreply, State}; + active -> + WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE}, + State#state.controlling_process ! {tcp_closed, WrappedSocket}, + {noreply, State}; + {passive, From, _Length, _Timeout} -> + socket:nif_select_stop(Socket), + gen_server:reply(From, {error, closed}), + {noreply, State#state{ + pending_selects = maps:remove(Ref, State#state.pending_selects) + }} + end; handle_info({timeout, Ref, From}, State) -> ?LOG_DEBUG("handle_info [~p], ~p]", [ {timeout, Ref, From}, State diff --git a/libs/estdlib/src/gen_udp_socket.erl b/libs/estdlib/src/gen_udp_socket.erl index 56aa27fe5..c3a561e53 100644 --- a/libs/estdlib/src/gen_udp_socket.erl +++ b/libs/estdlib/src/gen_udp_socket.erl @@ -242,7 +242,7 @@ handle_cast(_Request, State) -> {noreply, State}. %% @hidden -handle_info({select, _Socket, Ref, ready_input}, State) -> +handle_info({'$socket', _Socket, select, Ref}, State) -> case maps:get(Ref, State#state.pending_selects, undefined) of undefined -> ?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]), diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index bd4444d81..0de6087f9 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -245,7 +245,7 @@ accept(Socket, Timeout) -> case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_accept(Socket) of {error, closed} = E -> ?MODULE:nif_select_stop(Socket), @@ -253,14 +253,15 @@ accept(Socket, Timeout) -> R -> R end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: we need to handle: % (a) SELECT_STOP being scheduled - % (b) flush of messages as we can have both - % {closed, Ref} and {select, _, Ref, _} in the + % (b) flush of messages as we can have both in the % queue - {error, closed} + {error, closed}; + Other -> + {error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}} after Timeout -> {error, timeout} end; @@ -299,25 +300,60 @@ recv(Socket, Length) -> %% `{ok, Data} = socket:recv(ConnectedSocket)' %% @end %%----------------------------------------------------------------------------- --spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> - {ok, Data :: binary()} | {error, Reason :: term()}. +-spec recv( + Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference() +) -> + {ok, Data :: binary()} + | {select, {select_info, recvfrom, reference()}} + | {select, {{select_info, recvfrom, reference()}, Data :: binary()}} + | {error, Reason :: term()}. +recv(Socket, Length, 0) -> + recv0_noselect(Socket, Length); +recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity -> + recv0(Socket, 0, Timeout); +recv(Socket, Length, nowait) -> + recv0_nowait(Socket, Length, erlang:make_ref()); +recv(Socket, Length, Ref) when is_reference(Ref) -> + recv0_nowait(Socket, Length, Ref); recv(Socket, Length, Timeout) -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} when Timeout =/= infinity -> + recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []); + {ok, stream} when Timeout =:= infinity -> + recv0_r(Socket, Length, Timeout, undefined, []); + _ -> + recv0(Socket, Length, Timeout) + end. + +recv0_noselect(Socket, Length) -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> + E; + {ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length -> + {ok, Data}; + {ok, Data} -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} -> + {error, {timeout, Data}}; + {ok, dgram} -> + {ok, Data} + end + end. + +recv0(Socket, Length, Timeout) -> Ref = erlang:make_ref(), - ?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]), case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_recv(Socket, Length) of {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - % TODO: Assemble data to have more if Length > byte_size(Data) - % as long as timeout did not expire {ok, Data} -> {ok, Data} end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: see above in accept/2 {error, closed} @@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) -> Error end. +recv0_nowait(Socket, Length, Ref) -> + case ?MODULE:nif_recv(Socket, Length) of + {error, timeout} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {select_info, recv, Ref}}; + {error, _} = Error1 -> + Error1 + end; + {error, _} = E -> + E; + {ok, Data} when byte_size(Data) < Length -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {{select_info, recv, Ref}, Data}}; + {error, _} = Error1 -> + Error1 + end; + {ok, dgram} -> + {ok, Data} + end; + {ok, Data} -> + {ok, Data} + end. + +recv0_r(Socket, Length, Timeout, EndQuery, Acc) -> + Ref = erlang:make_ref(), + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + receive + {'$socket', Socket, select, Ref} -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> + ?MODULE:nif_select_stop(Socket), + E; + {ok, Data} -> + NewAcc = [Data | Acc], + Remaining = Length - byte_size(Data), + case Remaining of + 0 -> + {ok, list_to_binary(lists:reverse(NewAcc))}; + _ -> + NewTimeout = + case Timeout of + infinity -> infinity; + _ -> EndQuery - erlang:system_time(millisecond) + end, + recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc) + end + end; + {'$socket', Socket, abort, {Ref, closed}} -> + % socket was closed by another process + % TODO: see above in accept/2 + {error, closed} + after Timeout -> + case Acc of + [] -> {error, timeout}; + _ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}} + end + end; + {error, _Reason} = Error -> + Error + end. + %%----------------------------------------------------------------------------- %% @equiv socket:recvfrom(Socket, 0) %% @end @@ -370,25 +472,43 @@ recvfrom(Socket, Length) -> %% bytes are available and return these bytes. %% @end %%----------------------------------------------------------------------------- --spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> - {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. +-spec recvfrom( + Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference() +) -> + {ok, {Address :: sockaddr(), Data :: binary()}} + | {select, {select_info, recvfrom, reference()}} + | {error, Reason :: term()}. +recvfrom(Socket, Length, 0) -> + recvfrom0_noselect(Socket, Length); +recvfrom(Socket, Length, nowait) -> + recvfrom0_nowait(Socket, Length, erlang:make_ref()); +recvfrom(Socket, Length, Ref) when is_reference(Ref) -> + recvfrom0_nowait(Socket, Length, Ref); recvfrom(Socket, Length, Timeout) -> + recvfrom0(Socket, Length, Timeout). + +recvfrom0_noselect(Socket, Length) -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, _} = E -> + E; + {ok, {_Address, _Data}} = Reply -> + Reply + end. + +recvfrom0(Socket, Length, Timeout) -> Ref = erlang:make_ref(), - ?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]), case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_recvfrom(Socket, Length) of {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - % TODO: Assemble data to have more if Length > byte_size(Data) - % as long as timeout did not expire - {ok, {Address, Data}} -> - {ok, {Address, Data}} + {ok, {_Address, _Data}} = Reply -> + Reply end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: see above in accept/2 {error, closed} @@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) -> Error end. +recvfrom0_nowait(Socket, Length, Ref) -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, timeout} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {select_info, recvfrom, Ref}}; + {error, _} = SelectError -> + SelectError + end; + {error, _} = RecvError -> + RecvError; + {ok, {_Address, _Data}} = Reply -> + Reply + end. + %%----------------------------------------------------------------------------- %% @param Socket the socket %% @param Data the data to send diff --git a/libs/estdlib/src/ssl.erl b/libs/estdlib/src/ssl.erl index 95919765d..46385b066 100644 --- a/libs/estdlib/src/ssl.erl +++ b/libs/estdlib/src/ssl.erl @@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> handshake_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> close_notify_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> send(SSLSocket, Binary); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> recv0(SSLSocket, Length, Remaining, Acc); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> diff --git a/src/libAtomVM/defaultatoms.def b/src/libAtomVM/defaultatoms.def index 0a5ab513b..a8e6a42f3 100644 --- a/src/libAtomVM/defaultatoms.def +++ b/src/libAtomVM/defaultatoms.def @@ -170,3 +170,9 @@ X(SHUTDOWN_ATOM, "\x8", "shutdown") X(NONODE_AT_NOHOST_ATOM, "\xD", "nonode@nohost") X(NET_KERNEL_ATOM, "\xA", "net_kernel") + +X(DOLLAR_SOCKET_ATOM, "\x7", "$socket") +X(ABORT_ATOM, "\x5", "abort") +X(FAMILY_ATOM, "\x6", "family") +X(INET_ATOM, "\x4", "inet") +X(TIMEOUT_ATOM, "\x7", "timeout") diff --git a/src/libAtomVM/erl_nif.h b/src/libAtomVM/erl_nif.h index 23d3f256a..b13114d77 100644 --- a/src/libAtomVM/erl_nif.h +++ b/src/libAtomVM/erl_nif.h @@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj); */ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref); +/** + * @brief Variant of `enif_select` where sent message is `msg` instead of default. + * + * @param env current environment + * @param event event object (typically a file descriptor) + * @param obj resource object working as a container of the event object. + * @param pid process id to send a message to or NULL to use the current process (from `env`) + * @param msg message to send (copied). + * @param msg_env must be NULL. + * @return a negative value on failure, 0 or flags on success. + */ +int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env); + +/** + * @brief Variant of `enif_select` where sent message is `msg` instead of default. + * + * @param env current environment + * @param event event object (typically a file descriptor) + * @param obj resource object working as a container of the event object. + * @param pid process id to send a message to or NULL to use the current process (from `env`) + * @param msg message to send (copied). + * @param msg_env must be NULL. + * @return a negative value on failure, 0 or flags on success. + */ +int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env); + /** * @brief Monitor a process by using a resource object. * @details The monitor is automatically removed after being triggered or if the diff --git a/src/libAtomVM/globalcontext.c b/src/libAtomVM/globalcontext.c index 27d5e3d1a..950f0a6f6 100644 --- a/src/libAtomVM/globalcontext.c +++ b/src/libAtomVM/globalcontext.c @@ -334,6 +334,18 @@ bool globalcontext_process_exists(GlobalContext *glb, int32_t process_id) return p != NULL; } +enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m) +{ + Context *p = globalcontext_get_process_lock(glb, process_id); + enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND; + if (p) { + mailbox_post_message(p, &m->base); + globalcontext_get_process_unlock(glb, p); + result = SEND_MESSAGE_OK; + } + return result; +} + void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t) { Context *p = globalcontext_get_process_lock(glb, process_id); @@ -352,15 +364,17 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t } #ifdef AVM_TASK_DRIVER_ENABLED -void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t) +static inline enum SendMessageResult globalcontext_send_message_from_task_common(GlobalContext *glb, int32_t process_id, MailboxMessage *message, enum MessageType type, term t) { - MailboxMessage *message = NULL; + enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND; bool postponed = false; #ifndef AVM_NO_SMP Context *p = NULL; if (globalcontext_get_process_trylock(glb, process_id, &p)) { if (p) { - message = mailbox_message_create_from_term(type, t); + if (message == NULL) { + message = mailbox_message_create_from_term(type, t); + } // Ensure we can acquire the spinlock if (smp_spinlock_trylock(&glb->processes_spinlock)) { // We can send the message. @@ -371,6 +385,7 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id postponed = true; } globalcontext_get_process_unlock(glb, p); + result = SEND_MESSAGE_OK; } } else { postponed = true; @@ -397,7 +412,20 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&glb->message_queue, ¤t_first, queued_item)); // Make sure the scheduler is busy sys_signal(glb); + + result = SEND_MESSAGE_OK; } + return result; +} + +enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *message) +{ + return globalcontext_send_message_from_task_common(glb, process_id, &message->base, NormalMessage, term_nil()); +} + +void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t) +{ + globalcontext_send_message_from_task_common(glb, process_id, NULL, type, t); } static inline void globalcontext_process_message_queue(GlobalContext *glb) diff --git a/src/libAtomVM/globalcontext.h b/src/libAtomVM/globalcontext.h index 02c54c551..e093e4e59 100644 --- a/src/libAtomVM/globalcontext.h +++ b/src/libAtomVM/globalcontext.h @@ -71,6 +71,11 @@ typedef struct GlobalContext GlobalContext; typedef struct MailboxMessage MailboxMessage; #endif +#ifndef TYPEDEF_MESSAGE +#define TYPEDEF_MESSAGE +typedef struct Message Message; +#endif + struct MessageQueueItem { struct MessageQueueItem *next; @@ -165,6 +170,12 @@ struct GlobalContext void *platform_data; }; +enum SendMessageResult +{ + SEND_MESSAGE_OK = 0, + SEND_MESSAGE_PROCESS_NOT_FOUND = 1 +}; + /** * @brief Creates a new GlobalContext * @@ -250,6 +261,18 @@ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t); */ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, term t); +/** + * @brief Post a mailbox message to a process identified by its id. + * @details This function is only used by enif_select_read/enif_select_write to + * post a message that is built before. + * + * @param glb the global context (that owns the process table). + * @param process_id the local process id. + * @param m the mailbox message to send. + * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered). + */ +enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m); + #ifdef AVM_TASK_DRIVER_ENABLED /** * @brief Send a message to a process identified by its id. This variant is to @@ -267,6 +290,19 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t */ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t); +/** + * @brief Post a mailbox message to a process identified by its id. Variant + * to be used from task drivers. + * @details This function is only used by enif_select_read/enif_select_write to + * post a message that is built before. + * + * @param glb the global context (that owns the process table). + * @param process_id the local process id. + * @param m the mailbox message to send. + * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered). + */ +enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *m); + /** * @brief Enqueue a refc binary from a task driver, to be refc decremented * later from the scheduler. diff --git a/src/libAtomVM/mailbox.c b/src/libAtomVM/mailbox.c index 9d78bb1db..bcd75a75b 100644 --- a/src/libAtomVM/mailbox.c +++ b/src/libAtomVM/mailbox.c @@ -126,6 +126,16 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap) } } +// Dispose message. Normal / signal messages are not destroyed, instead they +// are appended to the current heap. +void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task) +{ + term mso_list = m->storage[STORAGE_MSO_LIST_INDEX]; + HeapFragment *fragment = mailbox_message_to_heap_fragment(m, m->heap_end); + memory_sweep_mso_list(mso_list, global, from_task); + memory_destroy_heap_fragment(fragment); +} + void mailbox_destroy(Mailbox *mbox, Heap *heap) { MailboxMessage *msg = mbox->outer_first; @@ -191,13 +201,13 @@ inline void mailbox_enqueue_message(Context *c, MailboxMessage *m) } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&c->mailbox.outer_first, ¤t_first, m)); } -static void mailbox_post_message(Context *c, MailboxMessage *m) +void mailbox_post_message(Context *c, MailboxMessage *m) { mailbox_enqueue_message(c, m); scheduler_signal_message(c); } #else -static void mailbox_post_message(Context *c, MailboxMessage *m) +void mailbox_post_message(Context *c, MailboxMessage *m) { m->next = c->mailbox.outer_first; c->mailbox.outer_first = m; @@ -231,6 +241,12 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t) } } +Message *mailbox_message_create_normal_message_from_term(term t) +{ + MailboxMessage *message = mailbox_message_create_from_term(NormalMessage, t); + return CONTAINER_OF(message, Message, base); +} + void mailbox_send(Context *c, term t) { MailboxMessage *msg = mailbox_message_create_from_term(NormalMessage, t); diff --git a/src/libAtomVM/mailbox.h b/src/libAtomVM/mailbox.h index 64d53a6f4..68fb92ae1 100644 --- a/src/libAtomVM/mailbox.h +++ b/src/libAtomVM/mailbox.h @@ -57,6 +57,11 @@ struct Context; typedef struct Context Context; #endif +#ifndef TYPEDEF_GLOBALCONTEXT +#define TYPEDEF_GLOBALCONTEXT +typedef struct GlobalContext GlobalContext; +#endif + struct Heap; #ifndef TYPEDEF_HEAP @@ -69,7 +74,10 @@ typedef struct Heap Heap; typedef struct MailboxMessage MailboxMessage; #endif +#ifndef TYPEDEF_MESSAGE +#define TYPEDEF_MESSAGE typedef struct Message Message; +#endif enum MessageType { @@ -238,6 +246,17 @@ void mailbox_send_ref_signal(Context *c, enum MessageType type, uint64_t ref_tic */ void mailbox_send_empty_body_signal(Context *c, enum MessageType type); +/** + * @brief Post a message. + * + * @details Post a message to a given context. Context gets ownership of the + * created message. + * + * @param c the process context. + * @param m the mailbox message to send + */ +void mailbox_post_message(Context *c, MailboxMessage *m); + #ifdef AVM_TASK_DRIVER_ENABLED /** * @brief Enqueue message @@ -341,6 +360,23 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t); */ void mailbox_message_dispose(MailboxMessage *m, Heap *heap); +/** + * @brief Allocate and serialize a term to a normal message. + * + * @details Can be called from a task or even ISR (provided malloc works). + * @param t the term that will be sent + */ +Message *mailbox_message_create_normal_message_from_term(term t); + +/** + * @brief Dispose an unsent normal message. The message will be freed. + * + * @param m the message to dispose. + * @param global the global context + * @param from_task boolean, true if called from a task, false otherwise + */ +void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task); + /** * @brief Remove next message from mailbox. * diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index d493a7b78..9edc10ae5 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -149,7 +149,8 @@ static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip struct SocketResource { int fd; - uint64_t ref_ticks; + uint64_t socket_ref_ticks; + uint64_t select_ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; size_t buf_size; @@ -166,7 +167,8 @@ struct SocketResource struct tcp_pcb *tcp_pcb; struct udp_pcb *udp_pcb; }; - uint64_t ref_ticks; + uint64_t socket_ref_ticks; + uint64_t select_ref_ticks; int32_t selecting_process_id; // trapped or selecting ErlNifMonitor selecting_process_monitor; bool linger_on; @@ -235,6 +237,9 @@ static const AtomStringIntPair otp_socket_setopt_level_table[] = { static ErlNifResourceType *socket_resource_type; +#define SOCKET_MAKE_SELECT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TUPLE_SIZE(2) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE) +static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap); + // // resource operations // @@ -356,12 +361,40 @@ static const ErlNifResourceTypeInit SocketResourceTypeInit = { .down = socket_down, }; +// Make a notification message, using SOCKET_MAKE_SELECT_NOTIFICATION_SIZE on heap +static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap) +{ + term notification = term_alloc_tuple(4, heap); + term_put_tuple_element(notification, 0, DOLLAR_SOCKET_ATOM); + term socket_tuple = term_alloc_tuple(2, heap); + term_put_tuple_element(socket_tuple, 0, term_from_resource(rsrc_obj, heap)); + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + refc_binary_increment_refcount(rsrc_refc); + term socket_ref; + if (rsrc_obj->socket_ref_ticks == 0) { + socket_ref = UNDEFINED_ATOM; + } else { + socket_ref = term_from_ref_ticks(rsrc_obj->socket_ref_ticks, heap); + } + term_put_tuple_element(socket_tuple, 1, socket_ref); + term_put_tuple_element(notification, 1, socket_tuple); + term_put_tuple_element(notification, 2, SELECT_ATOM); + term select_ref; + if (rsrc_obj->select_ref_ticks == 0) { + select_ref = UNDEFINED_ATOM; + } else { + select_ref = term_from_ref_ticks(rsrc_obj->select_ref_ticks, heap); + } + term_put_tuple_element(notification, 3, select_ref); + return notification; +} + // select emulation for lwIP that doesn't have select. #if OTP_SOCKET_LWIP static void select_event_send_notification_from_nif(struct SocketResource *rsrc_obj, Context *locked_ctx) { - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap) + term notification = socket_make_select_notification(rsrc_obj, &heap); mailbox_send(locked_ctx, notification); END_WITH_STACK_HEAP(heap, locked_ctx->global) } @@ -370,8 +403,8 @@ static void select_event_send_notification_from_handler(struct SocketResource *r { struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); GlobalContext *global = rsrc_refc->resource_type->global; - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap) + term notification = socket_make_select_notification(rsrc_obj, &heap); globalcontext_send_message(global, process_id, notification); END_WITH_STACK_HEAP(heap, global) } @@ -539,6 +572,14 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) TRACE("nif_socket_open: Created socket fd=%i\n", rsrc_obj->fd); rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (type != SOCK_STREAM) { + // TCP sockets are made non-blocking after connect, for now. + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + return make_errno_tuple(ctx); + } + } + #elif OTP_SOCKET_LWIP if (domain == PF_INET && type == SOCK_STREAM && protocol == IPPROTO_TCP) { LWIP_BEGIN(); @@ -592,6 +633,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) term socket_term = term_alloc_tuple(2, &ctx->heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); term_put_tuple_element(socket_term, 0, obj); term_put_tuple_element(socket_term, 1, ref); @@ -633,19 +675,25 @@ bool term_is_otp_socket(term socket_term) static int send_closed_notification(Context *ctx, term socket_term, int32_t selecting_process_id, struct SocketResource *rsrc_obj) { - // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(4) + TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); return -1; } + term socket_tuple = term_alloc_tuple(4, &ctx->heap); + term_put_tuple_element(socket_tuple, 0, DOLLAR_SOCKET_ATOM); + term_put_tuple_element(socket_tuple, 1, socket_term); + term_put_tuple_element(socket_tuple, 2, ABORT_ATOM); + term error_tuple = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(error_tuple, 0, CLOSED_ATOM); - term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap); - term_put_tuple_element(error_tuple, 1, ref); + term ref = (rsrc_obj->select_ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->select_ref_ticks, &ctx->heap); + term_put_tuple_element(error_tuple, 0, ref); + term_put_tuple_element(error_tuple, 1, CLOSED_ATOM); + term_put_tuple_element(socket_tuple, 3, error_tuple); TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) selecting_process_id, (void *) rsrc_obj); - globalcontext_send_message(ctx->global, selecting_process_id, error_tuple); + globalcontext_send_message(ctx->global, selecting_process_id, socket_tuple); return 0; } @@ -698,8 +746,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) // calling process. In this case we don't send any notification. // if (selecting_process_id != ctx->process_id) { - - // send a {closed, Ref | undefined} message to the pid + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) { SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -717,11 +764,11 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) TRACE("Double close on socket fd %i", rsrc_obj->fd); } #elif OTP_SOCKET_LWIP - // If the socket is being selected by another process, send a closed tuple. + // If the socket is being selected by another process, send a closed notification. if (rsrc_obj->socket_state & SocketStateSelectingRead && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID && rsrc_obj->selecting_process_id != ctx->process_id) { - // send a {closed, Ref | undefined} message to the pid + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) { SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -967,7 +1014,7 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) rsrc_obj->selecting_process_id = ctx->process_id; } - rsrc_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + rsrc_obj->select_ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); #if OTP_SOCKET_BSD TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); @@ -976,7 +1023,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) if (rsrc_obj->fd == CLOSED_FD) { send_closed_notification(ctx, argv[0], ctx->process_id, rsrc_obj); } else { - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &ctx->process_id, select_ref_term) < 0)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, 2, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + term notification = socket_make_select_notification(rsrc_obj, &ctx->heap); + if (UNLIKELY(enif_select_read(erl_nif_env_from_context(ctx), rsrc_obj->fd, rsrc_obj, &ctx->process_id, notification, NULL) < 0)) { enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); @@ -1635,6 +1688,7 @@ static term make_accepted_socket_term(struct SocketResource *conn_rsrc_obj, Heap term socket_term = term_alloc_tuple(2, heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(global); + conn_rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, heap); term_put_tuple_element(socket_term, 0, obj); term_put_tuple_element(socket_term, 1, ref); @@ -1681,6 +1735,11 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_BSD struct sockaddr_in clientaddr; socklen_t clientlen = sizeof(clientaddr); + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_errno_tuple(ctx); + } int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { @@ -1689,7 +1748,11 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); return make_error_tuple(reason, ctx); } else { - + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", fd); + close(fd); + return make_errno_tuple(ctx); + } struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); conn_rsrc_obj->fd = fd; conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; @@ -1714,6 +1777,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term socket_term = term_alloc_tuple(2, &ctx->heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + conn_rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); term_put_tuple_element(socket_term, 0, new_resource); term_put_tuple_element(socket_term, 1, ref); @@ -1806,9 +1870,10 @@ static ssize_t do_socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, siz term address = inet_make_addr4(ntohl(addr.sin_addr.s_addr), heap); term port_number = term_from_int(ntohs(addr.sin_port)); - term map = term_alloc_map(2, heap); + term map = term_alloc_map(3, heap); term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term_set_map_assoc(map, 1, FAMILY_ATOM, INET_ATOM); + term_set_map_assoc(map, 2, PORT_ATOM, port_number); *from = map; } else { res = recv(rsrc_obj->fd, buf, len, flags); @@ -1893,7 +1958,7 @@ static ssize_t do_socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, siz term port_number = term_from_int(port_u16); term map = term_alloc_map(2, heap); - term_set_map_assoc(map, 0, globalcontext_make_atom(global, addr_atom), address); + term_set_map_assoc(map, 0, ADDR_ATOM, address); term_set_map_assoc(map, 1, PORT_ATOM, port_number); *from = map; @@ -1925,6 +1990,12 @@ static term nif_socket_recv_with_peek(Context *ctx, term resource_term, struct S ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL); TRACE("%li bytes available.\n", (long int) res); if (res < 0) { + if (errno == EAGAIN) { + return make_error_tuple(TIMEOUT_ATOM, ctx); + } else if (errno == ECONNRESET) { + TRACE("Peer closed connection."); + return make_error_tuple(CLOSED_ATOM, ctx); + } AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); return make_errno_tuple(ctx); } else if (res == 0) { @@ -2004,15 +2075,16 @@ static term nif_socket_recv_without_peek(Context *ctx, term resource_term, struc if (res < 0) { int err = errno; - term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); - if (err == ECONNRESET) { - AVM_LOGI(TAG, "Peer closed connection."); + TRACE("Peer closed connection."); + return make_error_tuple(CLOSED_ATOM, ctx); + } else if (err == EAGAIN) { + return make_error_tuple(TIMEOUT_ATOM, ctx); } else { - AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); + TRACE("Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); } - return make_error_tuple(reason, ctx); + return make_errno_tuple(ctx); } if (res == 0) { @@ -2533,6 +2605,11 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) return make_error_tuple(CLOSED_ATOM, ctx); } } else if (res == 0) { + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_errno_tuple(ctx); + } SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c index f19eb3834..3d5036539 100644 --- a/src/libAtomVM/resources.c +++ b/src/libAtomVM/resources.c @@ -116,19 +116,20 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj) return term_from_resource(obj, &env->heap); } -int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref) +static void enif_select_event_message_dispose(Message *message, GlobalContext *global, bool from_task) { - if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) { - return ERL_NIF_SELECT_BADARG; - } - if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) { - return ERL_NIF_SELECT_BADARG; + if (message) { + mailbox_message_dispose_unsent(message, global, from_task); } +} +static int enif_select_common(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref, Message *message) +{ + GlobalContext *global = env->global; struct RefcBinary *resource = refc_binary_from_data(obj); // Search for event and obj struct ListHead *item; - struct ListHead *select_events = synclist_wrlock(&env->global->select_events); + struct ListHead *select_events = synclist_wrlock(&global->select_events); struct SelectEvent *select_event = NULL; LIST_FOR_EACH (item, select_events) { select_event = GET_LIST_ENTRY(item, struct SelectEvent, head); @@ -139,19 +140,20 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, } if (mode & ERL_NIF_SELECT_STOP) { if (select_event == NULL) { - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); return ERL_NIF_SELECT_INVALID_EVENT; } bool was_read = select_event->read; bool was_write = select_event->write; if (!was_read && !was_write) { list_remove(&select_event->head); - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); // We can call stop now. if (resource->resource_type->stop) { resource->resource_type->stop(env, obj, event, true); } - refc_binary_decrement_refcount(resource, env->global); + refc_binary_decrement_refcount(resource, global); + enif_select_event_message_dispose(select_event->message, global, false); free((void *) select_event); return ERL_NIF_SELECT_STOP_CALLED; } @@ -161,13 +163,13 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, select_event->close = 1; select_event->read = 0; select_event->write = 0; - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); // Platform loop should check close flag after unregister is called if (was_read) { - sys_unregister_select_event(env->global, event, false); + sys_unregister_select_event(global, event, false); } if (was_write) { - sys_unregister_select_event(env->global, event, true); + sys_unregister_select_event(global, event, true); } return ERL_NIF_SELECT_STOP_SCHEDULED; } @@ -179,31 +181,60 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, } select_event->event = event; select_event->resource = resource; + select_event->message = NULL; + select_event->ref_ticks = 0; // Resource is used in select_event, so we increase refcount. refc_binary_increment_refcount(resource); list_init(&select_event->head); list_append(select_events, &select_event->head); } - // Second read or second write overwrite ref & pid. - if (ref == UNDEFINED_ATOM) { + // Second read or second write overwrite ref/message & pid. + enif_select_event_message_dispose(select_event->message, global, false); + select_event->message = message; + if (message) { select_event->ref_ticks = 0; } else { - select_event->ref_ticks = term_to_ref_ticks(ref); + if (ref == UNDEFINED_ATOM) { + select_event->ref_ticks = 0; + } else { + select_event->ref_ticks = term_to_ref_ticks(ref); + } } select_event->local_pid = *pid; select_event->read = mode & ERL_NIF_SELECT_READ; select_event->write = mode & ERL_NIF_SELECT_WRITE; select_event->close = 0; - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); if (select_event->read) { - sys_register_select_event(env->global, event, false); + sys_register_select_event(global, event, false); } if (select_event->write) { - sys_register_select_event(env->global, event, true); + sys_register_select_event(global, event, true); } return 0; } +int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref) +{ + if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) { + return ERL_NIF_SELECT_BADARG; + } + if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) { + return ERL_NIF_SELECT_BADARG; + } + return enif_select_common(env, event, mode, obj, pid, ref, NULL); +} + +int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env) +{ + if (UNLIKELY(msg_env != NULL)) { + return ERL_NIF_SELECT_BADARG; + } + Message *message = mailbox_message_create_normal_message_from_term(msg); + enum ErlNifSelectFlags mode = ERL_NIF_SELECT_READ; + return enif_select_common(env, event, mode, obj, pid, term_nil(), message); +} + term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_write, Heap *heap) { term notification = term_alloc_tuple(4, heap); @@ -224,19 +255,33 @@ term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_ static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global) { - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap); + if (select_event->message) { + enum SendMessageResult result; #ifdef AVM_SELECT_IN_TASK - globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification); + result = globalcontext_post_message_from_task(global, select_event->local_pid, select_event->message); #else - globalcontext_send_message(global, select_event->local_pid, notification); + result = globalcontext_post_message(global, select_event->local_pid, select_event->message); #endif + if (result == SEND_MESSAGE_OK) { + // Ownership was properly transfered. + // Otherwise, it will be destroyed when we have a context (when enif_select is called with stop for example) + select_event->message = NULL; + } + } else { + BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) + term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap); +#ifdef AVM_SELECT_IN_TASK + globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification); +#else + globalcontext_send_message(global, select_event->local_pid, notification); +#endif + END_WITH_STACK_HEAP(heap, global) + } if (is_write) { select_event->write = 0; } else { select_event->read = 0; } - END_WITH_STACK_HEAP(heap, global) sys_unregister_select_event(global, select_event->event, is_write); } @@ -253,7 +298,6 @@ bool select_event_notify(ErlNifEvent event, bool is_read, bool is_write, GlobalC } select_event = NULL; } - synclist_unlock(&global->select_events); if (select_event) { if (is_read && select_event->read) { select_event_send_notification(select_event, false, global); @@ -264,6 +308,7 @@ bool select_event_notify(ErlNifEvent event, bool is_read, bool is_write, GlobalC result = true; } } + synclist_unlock(&global->select_events); return result; } @@ -279,6 +324,7 @@ static inline void select_event_destroy(struct SelectEvent *select_event, Global #else refc_binary_decrement_refcount(select_event->resource, global); #endif + enif_select_event_message_dispose(select_event->message, global, true); free((void *) select_event); } diff --git a/src/libAtomVM/resources.h b/src/libAtomVM/resources.h index b9f2c7b5f..f9ba20162 100644 --- a/src/libAtomVM/resources.h +++ b/src/libAtomVM/resources.h @@ -25,6 +25,7 @@ #include "erl_nif.h" #include "list.h" +#include "mailbox.h" #include "memory.h" #ifdef __cplusplus @@ -70,6 +71,7 @@ struct SelectEvent bool close; int32_t local_pid; uint64_t ref_ticks; + Message *message; }; static inline void resource_type_destroy(struct ResourceType *resource_type) diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl index 15f7a835e..42449c135 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl +++ b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl @@ -70,9 +70,9 @@ handshake_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> handshake_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -98,9 +98,9 @@ send_loop(SSLContext, Socket, Binary) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> send_loop(SSLContext, Socket, Binary); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -124,9 +124,9 @@ recv_loop(SSLContext, Socket, Remaining, Acc) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> recv_loop(SSLContext, Socket, Remaining, Acc); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -147,9 +147,9 @@ close_notify_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> close_notify_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> diff --git a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt index 6007ce23f..773a555dc 100644 --- a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt +++ b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt @@ -18,6 +18,7 @@ # SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later # +include(ExternalProject) ExternalProject_Add(HostAtomVM SOURCE_DIR ../../../../../../ INSTALL_COMMAND cmake -E echo "Skipping install step." diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index cca597891..fa154af17 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -27,7 +27,8 @@ test() -> ok = test_shutdown(), ok = test_close_by_another_process(), ok = test_buf_size(), - ok = test_override_buf_size(), + ok = test_timeout(), + ok = test_nowait(), ok = test_setopt_getopt(), case get_otp_version() of atomvm -> @@ -37,6 +38,8 @@ test() -> end, ok. +-define(PACKET_SIZE, 7). + test_echo_server() -> etest:flush_msg_queue(), @@ -45,7 +48,7 @@ test_echo_server() -> test_send_receive(Port, 10), - close_listen_socket(ListenSocket). + ok = close_listen_socket(ListenSocket). %% %% test_shutdown @@ -64,12 +67,12 @@ test_shutdown() -> id(ok). test_shutdown_of_client_sockets(Port) -> - ok = test_shutdown_of_side(Port, write), - ok = test_shutdown_of_side(Port, read_write), - ok = test_shutdown_of_side(Port, read), + ok = test_shutdown_of_side(Port, write, <<"echo:01">>), + ok = test_shutdown_of_side(Port, read_write, <<"echo:02">>), + ok = test_shutdown_of_side(Port, read, <<"echo:03">>), id(ok). -test_shutdown_of_side(Port, Side) -> +test_shutdown_of_side(Port, Side, Packet) -> {ok, Socket} = socket:open(inet, stream, tcp), ok = try_connect(Socket, Port, 10), @@ -77,21 +80,25 @@ test_shutdown_of_side(Port, Side) -> case Side of read -> %% read on the socket should fail - socket:send(Socket, erlang:atom_to_binary(Side, latin1)), + socket:send(Socket, Packet), case catch (socket:recv(Socket)) of {error, _} -> ok; {ok, Data} -> - %% On some Linux kernels, shutdown is not guaranteed to - %% result in an error on read. + %% On some Linux kernels, shutdown doesn't return an error + %% until all buffered data is read. %% C.f. https://stackoverflow.com/questions/740817/behavior-of-shutdownsock-shut-rd-with-tcp - erlang:display({warning, expected_error_on_recv, Side, Data}), - % error({expected_error_on_recv, Side, Data}) - ok + %% Second recv will fail + case catch (socket:recv(Socket)) of + {error, _} -> + ok; + {ok, Data} -> + error({expected_error_on_recv, Side, Data}) + end end; _ -> %% write on the socket should fail - case catch (socket:send(Socket, erlang:atom_to_binary(Side, latin1))) of + case catch (socket:send(Socket, Packet)) of {error, _} -> ok; {ok, Data} -> @@ -123,25 +130,7 @@ test_close_by_another_process() -> timer:sleep(10), - ok = close_listen_socket(ListenSocket), - - id(ok). - -check_receive(Socket, Packet, Length, Expect) -> - case socket:send(Socket, Packet) of - ok -> - ok = - case socket:recv(Socket, Length) of - {ok, Expect} -> - ok; - Error -> - io:format("Unexpected value on recv: ~p~n", [Error]), - Error - end; - {error, Reason} = Error -> - io:format("Error on send: ~p~n", [Reason]), - Error - end. + ok = close_listen_socket(ListenSocket). test_buf_size() -> etest:flush_msg_queue(), @@ -157,50 +146,26 @@ test_buf_size() -> {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), - %% limit the recv buffer size to 10 bytes - ok = socket:setopt(Socket, {otp, rcvbuf}, 10), - - Packet = "012345678901234567890123456789", + %% limit the recv buffer size to 5 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 5), + true = 5 < ?PACKET_SIZE, %% we should only be able to receive - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - - timer:sleep(10), - - ok = close_client_socket(Socket), - - ok = close_listen_socket(ListenSocket), - - id(ok). - -test_override_buf_size() -> - etest:flush_msg_queue(), - - Port = 44404, - ListenSocket = start_echo_server(Port), - - {ok, Socket} = socket:open(inet, stream, tcp), - ok = try_connect(Socket, Port, 10), - - %% limit the recv buffer size to 10 bytes - ok = socket:setopt(Socket, {otp, rcvbuf}, 10), - - Packet = "012345678901234567890123456789", + ok = socket:send(Socket, <<"echo:01">>), + {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"01">>} = socket:recv(Socket, 0, 5000), + ok = socket:send(Socket, <<"echo:02">>), + {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"02">>} = socket:recv(Socket, 0, 5000), %% verify that the socket:recv length parameter takes %% precedence over the default - ok = check_receive(Socket, Packet, 15, <<"012345678901234">>), - ok = check_receive(Socket, Packet, 15, <<"567890123456789">>), - - timer:sleep(10), + ok = socket:send(Socket, <<"echo:03">>), + {ok, <<"echo:03">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), ok = close_client_socket(Socket), - ok = close_listen_socket(ListenSocket), - - id(ok). + ok = close_listen_socket(ListenSocket). %% %% echo_server @@ -245,18 +210,18 @@ accept(Pid, ListenSocket) -> end. echo(Pid, Socket) -> - case socket:recv(Socket) of - {ok, Packet} -> - % Pid ! {packet_received, Packet}, - ok = - case socket:send(Socket, Packet) of - ok -> - ok; - E -> - %% TODO support returning Rest when Packet > buffer_size - {unexpected_reply_from_send, E} - end, - % Pid ! {packet_echoed, Packet}, + case socket:recv(Socket, ?PACKET_SIZE) of + {ok, <<"echo:", _/binary>> = Packet} -> + ok = socket:send(Socket, Packet), + echo(Pid, Socket); + {ok, <<"wait:", _/binary>> = Packet} -> + timer:sleep(500), + ok = socket:send(Socket, Packet), + echo(Pid, Socket); + {ok, <<"chnk:", Rest/binary>>} -> + ok = socket:send(Socket, <<"chnk:">>), + timer:sleep(500), + ok = socket:send(Socket, Rest), echo(Pid, Socket); %% estdlib TODO {error, closed} -> @@ -266,6 +231,9 @@ echo(Pid, Socket) -> {error, econnreset} -> Pid ! recv_terminated, ok; + {error, {closed, <<"read">>}} -> + Pid ! recv_terminated, + ok; SomethingElse -> error({unexpected_return_from_recv, SomethingElse}) end. @@ -275,23 +243,13 @@ close_listen_socket(ListenSocket) -> %% Close the socket, and wait for a signal that we came out of accept %% ok = socket:close(ListenSocket), - receive - accept_terminated -> - ok - after 1000 -> - %% - %% Closing the listening socket from another process may in some - %% cases not result in the blocking accept to break out of its - %% call with an expected return value. In this case, we will - %% allow the wait for the `accept_terminated` message to fail - %% and simply warn the user. See TODO comment to this effect in - %% `nif_socket_close` function in `otp_socket.c` - %% - erlang:display({warning, timeout, waiting, accept_terminated}) - % throw({timeout, waiting, accept_terminated}) - end, - - ok. + ok = + receive + accept_terminated -> + ok + after 1000 -> + {error, {timeout, accept_terminated}} + end. %% %% send_receive loop @@ -313,7 +271,7 @@ close_client_socket(Socket) -> receive recv_terminated -> ok - after 1000 -> + after 2000 -> throw({timeout, waiting, recv_terminated}) end. @@ -331,7 +289,8 @@ try_connect(Socket, Port, Tries) -> send_receive_loop(_Socket, 0) -> ok; send_receive_loop(Socket, I) -> - Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I), + Packet = list_to_binary(io_lib:format("echo:~2.10.0B", [I])), + ?PACKET_SIZE = byte_size(Packet), case socket:send(Socket, Packet) of ok -> case socket:recv(Socket) of @@ -346,6 +305,161 @@ send_receive_loop(Socket, I) -> Error end. +receive_loop_nowait(Socket, Packet) when byte_size(Packet) > 0 -> + case socket:recv(Socket, byte_size(Packet), nowait) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {select, {{select_info, recv, SelectHandle}, Data}} when is_reference(SelectHandle) -> + {Data, Rest} = split_binary(Packet, byte_size(Data)), + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Rest) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +receive_loop_nowait_ref(Socket, Packet) when byte_size(Packet) > 0 -> + Ref = make_ref(), + case socket:recv(Socket, byte_size(Packet), Ref) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, Ref}} -> + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {select, {{select_info, recv, Ref}, Data}} -> + {Data, Rest} = split_binary(Packet, byte_size(Data)), + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Rest) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +test_timeout() -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + % receive of two chunks with an infinity timeout + Packet0 = <<"chnk:00">>, + ok = socket:send(Socket, Packet0), + {ok, Packet0} = socket:recv(Socket, ?PACKET_SIZE, infinity), + + % receive of two chunks with a large timeout + Packet1 = <<"chnk:01">>, + ok = socket:send(Socket, Packet1), + {ok, Packet1} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + % receive of two chunks with a small timeout causing a timeout error + Packet2 = <<"chnk:02">>, + ok = socket:send(Socket, Packet2), + {error, Timeout02} = socket:recv(Socket, ?PACKET_SIZE, 250), + case Timeout02 of + {timeout, <<"chnk:">>} -> + % AtomVM usually does return partial data + {ok, <<"02">>} = socket:recv(Socket, 2, infinity); + timeout -> + % BEAM OTP-27 seems to never return partial data + {ok, <<"chnk:02">>} = socket:recv(Socket, ?PACKET_SIZE, infinity) + end, + + % receive of two chunks with a null timeout causing a timeout error + Packet3 = <<"chnk:03">>, + ok = socket:send(Socket, Packet3), + timer:sleep(250), + case socket:recv(Socket, ?PACKET_SIZE, 0) of + {ok, <<"chnk:">>} -> + % BEAM OTP-22 to OTP-24 returns this on Linux on the CI. + {ok, <<"03">>} = socket:recv(Socket, 2); + {error, Timeout03} -> + case Timeout03 of + {timeout, <<"chnk:">>} -> + % BEAM OTP-27 seems to always return partial data + % AtomVM usually does + {ok, <<"03">>} = socket:recv(Socket, 2); + timeout -> + % Depending on scheduling, AtomVM may return no partial data + {ok, <<"chnk:03">>} = socket:recv(Socket, ?PACKET_SIZE) + end + end, + + % Test recv + ok = socket:send(Socket, <<"wait:01">>), + {error, timeout} = socket:recv(Socket, 0, 100), + {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000), + + ok = socket:send(Socket, <<"wait:02">>), + {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0), + {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + ok = socket:send(Socket, <<"wait:03">>), + {error, Timeout04} = socket:recv(Socket, 2 * ?PACKET_SIZE, 5000), + ok = + case Timeout04 of + {timeout, <<"wait:03">>} -> + % AtomVM usually does return partial data + ok; + timeout -> + % BEAM OTP-27 seems to never return partial data + ok + end, + + ok = close_client_socket(Socket), + ok = close_listen_socket(ListenSocket). + +test_nowait() -> + ok = test_nowait(fun receive_loop_nowait/2), + ok = test_nowait(fun receive_loop_nowait_ref/2), + ok. + +test_nowait(ReceiveFun) -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + Packet0 = <<"echo:00">>, + ok = socket:send(Socket, Packet0), + ok = ReceiveFun(Socket, Packet0), + + Packet1 = <<"wait:00">>, + ok = socket:send(Socket, Packet1), + ok = ReceiveFun(Socket, Packet1), + + Packet2 = <<"chnk:00">>, + ok = socket:send(Socket, Packet2), + ok = ReceiveFun(Socket, Packet2), + + ok = close_client_socket(Socket), + + ok = close_listen_socket(ListenSocket). + test_setopt_getopt() -> {ok, Socket} = socket:open(inet, stream, tcp), {ok, stream} = socket:getopt(Socket, {socket, type}), diff --git a/tests/libs/estdlib/test_udp_socket.erl b/tests/libs/estdlib/test_udp_socket.erl index f861f57d7..99147fa5c 100644 --- a/tests/libs/estdlib/test_udp_socket.erl +++ b/tests/libs/estdlib/test_udp_socket.erl @@ -23,82 +23,262 @@ -export([test/0]). test() -> - ok = test_echo_server(), + ok = test_echo(), + ok = test_buf_size(), + ok = test_timeout(), + ok = test_nowait(), ok = test_setopt_getopt(), ok. -test_echo_server() -> - Port = 44405, - {ok, ReceiveSocket} = socket:open(inet, dgram, udp), +-define(PACKET_SIZE, 7). + +start_echo_server(Port) -> + {ok, Socket} = socket:open(inet, dgram, udp), - ok = socket:setopt(ReceiveSocket, {socket, reuseaddr}, true), - ok = socket:setopt(ReceiveSocket, {socket, linger}, #{onoff => true, linger => 0}), + ok = socket:setopt(Socket, {socket, reuseaddr}, true), + ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}), - ok = socket:bind(ReceiveSocket, #{ + ok = socket:bind(Socket, #{ family => inet, addr => loopback, port => Port }), - Self = self(), - spawn(fun() -> - Self ! ready, - receive_loop(Self, ReceiveSocket) - end), - - receive - ready -> - ok - end, - - test_send_receive(Port, 10), - - %% - %% Close the socket, and wait for a signal that we came out of recvfrom - %% - ok = socket:close(ReceiveSocket), - receive - recv_terminated -> ok - after 1000 -> - %% This is UDP, so raising an error might not be fair here. - %% Let's just log instead. - erlang:display({innocuous_udp_timeout, waiting, recv_terminated}) - end, - ok. + {Pid, MonitorRef} = spawn_opt( + fun() -> + echo_server_loop(Socket) + end, + [monitor] + ), + + {Pid, MonitorRef, Socket}. -receive_loop(Pid, ReceiveSocket) -> - case socket:recvfrom(ReceiveSocket) of - {ok, {_Source, Packet}} -> - Pid ! {received, Packet}, - receive_loop(Pid, ReceiveSocket); +echo_server_loop(Socket) -> + case socket:recvfrom(Socket, 0, 5000) of + {ok, {Source, <<"echo:", _/binary>> = Packet}} -> + ok = socket:sendto(Socket, Packet, Source), + echo_server_loop(Socket); + {ok, {Source, <<"wait:", _/binary>> = Packet}} -> + timer:sleep(500), + ok = socket:sendto(Socket, Packet, Source), + echo_server_loop(Socket); + {ok, {Source, <<"chnk:", Rest/binary>>}} -> + ok = socket:sendto(Socket, <<"chnk:">>, Source), + ok = socket:sendto(Socket, Rest, Source), + echo_server_loop(Socket); {error, closed} -> - Pid ! recv_terminated; + ok; SomethingElse -> - Pid ! recv_terminated, error({unexpected_return_from_recv, SomethingElse}) end. -test_send_receive(Port, N) -> +stop_echo_server({Pid, MonitorRef, Socket}) -> + % We stop the server by closing the packet. + ok = socket:close(Socket), + normal = + receive + {'DOWN', MonitorRef, process, Pid, Reason} -> Reason + end, + ok. + +test_echo() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + % Test recvfrom + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:01">>}} = socket:recvfrom(Socket, 0, 5000), + + % Test recv + ok = socket:sendto(Socket, <<"echo:02">>, Dest), + {ok, <<"echo:02">>} = socket:recv(Socket, 0, 5000), + + % Test loopback + ok = socket:sendto(Socket, <<"echo:03">>, #{family => inet, addr => loopback, port => Port}), + {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, 0, 5000), + + % Chunk means two packets with UDP + ok = socket:sendto(Socket, <<"chnk:01">>, Dest), + timer:sleep(200), + {ok, {Dest, <<"chnk:">>}} = socket:recvfrom(Socket, 0, 5000), + {ok, {Dest, <<"01">>}} = socket:recvfrom(Socket, 0, 5000), + + % Chunk means two packets with UDP, including with recv + ok = socket:sendto(Socket, <<"chnk:02">>, Dest), + timer:sleep(200), + {ok, <<"chnk:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"02">>} = socket:recv(Socket, 0, 5000), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_buf_size() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, {ok, Socket} = socket:open(inet, dgram, udp), - ok = loop(Socket, Port, N), + %% try a few failures first + {error, _} = socket:setopt(Socket, {otp, badopt}, any_value), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), + + %% limit the recv buffer size to 5 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 5), + true = 5 < ?PACKET_SIZE, + + %% we should only be able to receive + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + + %% verify that the socket:recv length parameter takes + %% precedence over the default + ok = socket:sendto(Socket, <<"echo:03">>, Dest), + {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_timeout() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + % Test recvfrom + ok = socket:sendto(Socket, <<"wait:01">>, Dest), + {error, timeout} = socket:recvfrom(Socket, 0, 100), + {ok, {Dest, <<"wait:01">>}} = socket:recvfrom(Socket, 0, 5000), + + ok = socket:sendto(Socket, <<"wait:02">>, Dest), + {error, timeout} = socket:recvfrom(Socket, ?PACKET_SIZE, 0), + {ok, {Dest, <<"wait:02">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000), - %% - %% Close the socket - %% - ok = socket:close(Socket). + ok = socket:sendto(Socket, <<"wait:03">>, Dest), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + {ok, {Dest, <<"wait:03">>}} = socket:recvfrom(Socket, 10, infinity), + + % Test recv + ok = socket:sendto(Socket, <<"wait:01">>, Dest), + {error, timeout} = socket:recv(Socket, 0, 100), + {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000), + + ok = socket:sendto(Socket, <<"wait:02">>, Dest), + {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0), + {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + ok = socket:sendto(Socket, <<"wait:03">>, Dest), + {error, timeout} = socket:recv(Socket, 2 * ?PACKET_SIZE, 0), + ok = + case socket:recv(Socket, 2 * ?PACKET_SIZE, 5000) of + {ok, <<"wait:03">>} -> + ok; + % https://github.com/erlang/otp/issues/9172 + {error, {timeout, <<"wait:03">>}} -> + "BEAM" = erlang:system_info(machine), + case erlang:system_info(otp_release) of + "26" -> ok; + "27" -> ok + end, + ok + end, + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_nowait() -> + ok = test_nowait(fun receive_loop_nowait/2), + ok = test_nowait(fun receive_loop_nowait_ref/2), + ok = test_nowait(fun receive_loop_recvfrom_nowait/2), + ok = test_nowait(fun receive_loop_recvfrom_nowait_ref/2), + ok. + +test_nowait(ReceiveFun) -> + etest:flush_msg_queue(), + + Port = 44404, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + Packet0 = <<"echo:00">>, + ok = socket:sendto(Socket, Packet0, Dest), + ok = ReceiveFun(Socket, Packet0), + + Packet1 = <<"wait:00">>, + ok = socket:sendto(Socket, Packet1, Dest), + ok = ReceiveFun(Socket, Packet1), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +receive_loop_nowait(Socket, Packet) -> + case socket:recv(Socket, byte_size(Packet), nowait) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +receive_loop_nowait_ref(Socket, Packet) -> + Ref = make_ref(), + case socket:recv(Socket, byte_size(Packet), Ref) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, Ref}} -> + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +receive_loop_recvfrom_nowait(Socket, Packet) -> + case socket:recvfrom(Socket, byte_size(Packet), nowait) of + {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recvfrom, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. -loop(_Socket, _Port, 0) -> - ok; -loop(Socket, Port, I) -> - Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I), - Dest = #{family => inet, addr => loopback, port => Port}, - case socket:sendto(Socket, Packet, Dest) of - ok -> +receive_loop_recvfrom_nowait_ref(Socket, Packet) -> + Ref = make_ref(), + case socket:recvfrom(Socket, byte_size(Packet), Ref) of + {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recvfrom, Ref}} -> receive - {received, _Packet} -> - loop(Socket, Port, I - 1) + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} end; - {error, _Reason} = Error -> - io:format("Error on sendto: ~p~n", [Error]), + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), Error end.