From 2b578bd845a2a514137a2a1a4c7ee536f85e6239 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 14 May 2020 18:45:33 +0200 Subject: [erts] Inform all async(s) when closing udp/sctp socket The UDP and SCTP sockets did not inform all (async) waiting processes about socket close, basically leaving them hanging. OTP-16654 --- erts/emulator/drivers/common/inet_drv.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 2ef452fa01..1ac6dc4a93 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -12246,14 +12246,15 @@ static void packet_inet_stop(ErlDrvData e) into "udp_descriptor*" or "inet_descriptor*": */ udp_descriptor * udesc = (udp_descriptor*) e; - inet_descriptor* descr = INETP(udesc); + inet_descriptor* desc = INETP(udesc); if (udesc->i_buf != NULL) { release_buffer(udesc->i_buf); udesc->i_buf = NULL; } - ASSERT(NO_SUBSCRIBERS(&(descr->empty_out_q_subs))); - inet_stop(descr); + ASSERT(NO_SUBSCRIBERS(&(desc->empty_out_q_subs))); + async_error_am_all(desc, am_closed); + inet_stop(desc); } static int packet_error(udp_descriptor* udesc, int err) -- cgit v1.2.1 From 0e15083a36e93fb47a650e1da19e81700eb0d232 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 14 May 2020 18:48:03 +0200 Subject: [kernel|test] Add UDP test case for recv socket close OTP-16654 --- lib/kernel/test/gen_udp_SUITE.erl | 84 ++++++++++++++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 9 deletions(-) diff --git a/lib/kernel/test/gen_udp_SUITE.erl b/lib/kernel/test/gen_udp_SUITE.erl index 2720d3cc77..7ec553ec51 100644 --- a/lib/kernel/test/gen_udp_SUITE.erl +++ b/lib/kernel/test/gen_udp_SUITE.erl @@ -40,25 +40,45 @@ recvtos/1, recvtosttl/1, recvttl/1, recvtclass/1, sendtos/1, sendtosttl/1, sendttl/1, sendtclass/1, local_basic/1, local_unbound/1, - local_fdopen/1, local_fdopen_unbound/1, local_abstract/1]). + local_fdopen/1, local_fdopen_unbound/1, local_abstract/1, + recv_close/1]). suite() -> [{ct_hooks,[ts_install_cth]}, {timetrap,{minutes,1}}]. all() -> - [send_to_closed, buffer_size, binary_passive_recv, max_buffer_size, - bad_address, read_packets, recv_poll_after_active_once, - open_fd, connect, - implicit_inet6, active_n, + [ + send_to_closed, + buffer_size, + binary_passive_recv, + max_buffer_size, + bad_address, + read_packets, + recv_poll_after_active_once, + open_fd, + connect, + implicit_inet6, + active_n, recvtos, recvtosttl, recvttl, recvtclass, sendtos, sendtosttl, sendttl, sendtclass, - {group, local}]. + {group, local}, + recv_close + ]. groups() -> - [{local, [], - [local_basic, local_unbound, - local_fdopen, local_fdopen_unbound, local_abstract]}]. + [ + {local, [], local_cases()} + ]. + +local_cases() -> + [ + local_basic, + local_unbound, + local_fdopen, + local_fdopen_unbound, + local_abstract + ]. init_per_suite(Config) -> Config. @@ -969,6 +989,52 @@ local_handshake(S, SAddr, C, CAddr) -> end. + + +%%------------------------------------------------------------- +%% Open a passive socket. Create a socket that reads from it. +%% Then close the socket. +recv_close(Config) when is_list(Config) -> + {ok, Sock} = gen_udp:open(0, [{active, false}]), + RECV = fun() -> + io:format("~p try recv~n", [self()]), + Res = gen_udp:recv(Sock, 0), + io:format("~p recv res: ~p~n", [self(), Res]), + exit(Res) + end, + io:format("~p spawn reader", [self()]), + {Pid, MRef} = spawn_monitor(RECV), + receive + {'DOWN', MRef, process, Pid, PreReason} -> + %% Make sure id does not die for some other reason... + ?line ct:fail("Unexpected pre close from reader (~p): ~p", + [Pid, PreReason]) + after 5000 -> % Just in case... + ok + end, + io:format("~p close socket", [self()]), + ok = gen_udp:close(Sock), + io:format("~p await reader termination", [self()]), + receive + {'DOWN', MRef, process, Pid, {error, closed}} -> + io:format("~p expected reader termination result", [self()]), + ok; + {'DOWN', MRef, process, Pid, PostReason} -> + io:format("~p unexpected reader termination: ~p", + [self(), PostReason]), + ?line ct:fail("Unexpected post close from reader (~p): ~p", + [Pid, PostReason]) + after 5000 -> + io:format("~p unexpected reader termination timeout", [self()]), + demonitor(MRef, [flush]), + exit(Pid, kill), + ?line ct:fail("Reader (~p) termination timeout", [Pid]) + end, + ok. + + + + %% %% Utils %% -- cgit v1.2.1 From c3bd412f73a4bf7f67c9179e0bb0feb8a14630f1 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 15 May 2020 09:04:55 +0200 Subject: [kernel|test] Add SCTP test case for recv socket close OTP-16654 --- lib/kernel/test/gen_sctp_SUITE.erl | 152 +++++++++++++++++++++++++++++++++++-- 1 file changed, 146 insertions(+), 6 deletions(-) diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl index 65183d83cc..55109a5178 100644 --- a/lib/kernel/test/gen_sctp_SUITE.erl +++ b/lib/kernel/test/gen_sctp_SUITE.erl @@ -41,7 +41,8 @@ peeloff_active_once/1, peeloff_active_true/1, peeloff_active_n/1, buffers/1, names_unihoming_ipv4/1, names_unihoming_ipv6/1, - names_multihoming_ipv4/1, names_multihoming_ipv6/1]). + names_multihoming_ipv4/1, names_multihoming_ipv6/1, + recv_close/1]). suite() -> [{ct_hooks,[ts_install_cth]}, @@ -56,10 +57,25 @@ all() -> {group,G}]. groups() -> - [{smoke,[],[basic,basic_stream]}, - {old_solaris,[],[skip_old_solaris]}, - {extensive,[], - [api_open_close, api_listen, api_connect_init, + [ + {smoke, [], smoke_cases()}, + {old_solaris, [], old_solaris_cases()}, + {extensive, [], extensive_cases()} + ]. + +smoke_cases() -> + [ + basic, + basic_stream + ]. + +old_solaris_cases() -> + [ + skip_old_solaris + ]. + +extensive_cases() -> + [api_open_close, api_listen, api_connect_init, api_opts, xfer_min, xfer_active, def_sndrcvinfo, implicit_inet6, open_multihoming_ipv4_socket, open_unihoming_ipv6_socket, @@ -68,7 +84,8 @@ groups() -> xfer_stream_min, peeloff_active_once, peeloff_active_true, peeloff_active_n, buffers, names_unihoming_ipv4, names_unihoming_ipv6, - names_multihoming_ipv4, names_multihoming_ipv6]}]. + names_multihoming_ipv4, names_multihoming_ipv6, + recv_close]. init_per_suite(_Config) -> case gen_sctp:open() of @@ -1511,6 +1528,111 @@ recv_comm_up_eventually(S) -> recv_comm_up_eventually(S) end. + +%% +recv_close(Config) when is_list(Config) -> + p("create server socket (and listen)"), + {ok, S} = gen_sctp:open(), + gen_sctp:listen(S, true), + {ok, SPort} = inet:port(S), + + p("create client socket (and connect)"), + {ok, C} = gen_sctp:open(), + {ok, _} = gen_sctp:connect(C, localhost, SPort, []), + + TC = self(), + RECV = fun() -> + p("try setup recv(s)"), + ok = recv_close_setup_recv(S), + p("announce ready"), + TC ! {self(), ready}, + p("try data recv"), + Res = gen_sctp:recv(S), + p("recv res: " + "~n ~p", [Res]), + exit(Res) + end, + p("spawn reader - then await reader ready"), + {Pid, MRef} = spawn_monitor(RECV), + receive + {'DOWN', MRef, process, Pid, PreReason} -> + %% Make sure it does not die for some other reason... + p("unexpected reader termination:" + "~n ~p", [PreReason]), + (catch gen_sctp:close(S)), + (catch gen_sctp:close(C)), + ?line ct:fail("Unexpected pre close from reader (~p): ~p", + [Pid, PreReason]); + {Pid, ready} -> + p("reader ready"), + ok + after 30000 -> % Just in case... + %% This is **extreme**, but there is no way to know + %% how long it will take to iterate through all the + %% addresses of a host... + p("reader ready timeout"), + (catch gen_sctp:close(S)), + (catch gen_sctp:close(C)), + ?line ct:fail("Unexpected pre close timeout (~p)", [Pid]) + end, + + p("\"ensure\" reader reading..."), + receive + Any -> + p("Received unexpected message: " + "~n ~p", [Any]), + (catch gen_sctp:close(S)), + (catch gen_sctp:close(C)), + ?line ct:fail("Unexpected message: ~p", [Any]) + after 5000 -> + ok + end, + + p("close server socket"), + ok = gen_sctp:close(S), + p("await reader termination"), + receive + {'DOWN', MRef, process, Pid, {error, closed}} -> + p("expected reader termination result"), + (catch gen_sctp:close(C)), + ok; + {'DOWN', MRef, process, Pid, PostReason} -> + p("unexpected reader termination: " + "~n ~p", [PostReason]), + (catch gen_sctp:close(C)), + ?line ct:fail("Unexpected post close from reader (~p): ~p", + [Pid, PostReason]) + after 5000 -> + p("unexpected reader termination timeout"), + demonitor(MRef, [flush]), + (catch gen_sctp:close(C)), + exit(Pid, kill), + ?line ct:fail("Reader (~p) termination timeout", [Pid]) + end, + p("close client socket"), + (catch gen_sctp:close(C)), + p("done"), + ok. + + +recv_close_setup_recv(S) -> + recv_close_setup_recv(S, 1). + +recv_close_setup_recv(S, N) -> + p("try setup recv ~w", [N]), + case gen_sctp:recv(S, 5000) of + {ok, {Addr, + Port, + _AncData, + Data}} when is_tuple(Addr) andalso is_integer(Port) -> + p("setup recv ~w: " + "~n ~p", [N, Data]), + recv_close_setup_recv(S, N+1); + {error, timeout} -> + ok + end. + + %%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% socket gen_server ultra light @@ -1745,3 +1867,21 @@ match_unless_solaris(A, B) -> timestamp() -> erlang:monotonic_time(). + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour, Min, Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.~.3.0w", + [Hour, Min, Sec, N3 div 1000]), + lists:flatten(FormatTS). + +p(F) -> + p(F, []). + +p(F, A) -> + io:format("~s ~p " ++ F ++ "~n", [formated_timestamp(), self() | A]). + + -- cgit v1.2.1