diff options
author | Sverker Eriksson <sverker@erlang.org> | 2022-06-28 14:30:35 +0200 |
---|---|---|
committer | Sverker Eriksson <sverker@erlang.org> | 2022-06-28 14:30:35 +0200 |
commit | 0e40f9e1324f3fe68902cc6dac3599e98df01eb4 (patch) | |
tree | 4ca608f15a2298365f1794ae6430d6dac00b65c2 | |
parent | d27e17d6d6d03a518e86dd457e79add5699a1ff5 (diff) | |
parent | e41174a16654af83d1eef3052ed5ffa0e534e6e9 (diff) | |
download | erlang-0e40f9e1324f3fe68902cc6dac3599e98df01eb4.tar.gz |
Merge branch 'sverker/erts/test-signal-order' into maint
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 11 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 32 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 1 | ||||
-rw-r--r-- | erts/emulator/test/signal_SUITE.erl | 132 |
5 files changed, 167 insertions, 11 deletions
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 3cf822e757..9da481d036 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -4951,6 +4951,17 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) #else BIF_RET(am_notsup); #endif + } else if (ERTS_IS_ATOM_STR("proc_sig_buffers", BIF_ARG_1)) { + switch (BIF_ARG_2) + { + case am_true: { + int has_buffers = erts_proc_sig_queue_force_buffers(BIF_P); + BIF_RET(has_buffers ? am_true : am_false); + } + default: + break; + } + BIF_RET(am_notsup); } } diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 0bfa974f1a..87cd96f4cd 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -417,7 +417,7 @@ typedef struct { * part of the thread progress system. */ erts_atomic64_t dirty_refc; - Uint nr_of_rounds; + Uint nr_of_rounds_left; Uint nr_of_enqueues; int alive; } ErtsSignalInQueueBufferArray; diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 814c3fdd52..0af9bf6818 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -8345,15 +8345,14 @@ erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers) } } } - buffers->nr_of_rounds += 1; - if (buffers->nr_of_rounds > - ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE) { + if (--buffers->nr_of_rounds_left == 0) { /* Take decision if we should adapt back to the normal state */ if(buffers->nr_of_enqueues < ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP) { erts_proc_sig_queue_flush_and_deinstall_buffers(proc); } else { - buffers->nr_of_rounds = 0; + buffers->nr_of_rounds_left = + ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE; buffers->nr_of_enqueues = 0; } } @@ -8465,7 +8464,8 @@ void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state) erts_atomic64_init_nob(&buffers->nonmsg_slots, (erts_aint64_t)(Uint64)0); erts_atomic64_init_nob(&buffers->dirty_refc, (erts_aint64_t)(Uint64)1); buffers->nr_of_enqueues = 0; - buffers->nr_of_rounds = 0; + buffers->nr_of_rounds_left = + ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE; buffers->alive = 1; /* Initialize slots */ for(i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) { @@ -8524,3 +8524,25 @@ void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, erts_free(ERTS_ALC_T_SIGQ_BUFFERS, buffers); } } + +/* Only for test purposes */ +int erts_proc_sig_queue_force_buffers(Process* p) +{ + erts_aint32_t state; + ErtsSignalInQueueBufferArray* buffers; + + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + state = erts_atomic32_read_nob(&p->state); + /* Fake contention */ + p->sig_inq_contention_counter = + 1 + ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT; + erts_proc_sig_queue_maybe_install_buffers(p, state); + buffers = ((ErtsSignalInQueueBufferArray*) + erts_atomic_read_nob(&p->sig_inq_buffers)); + if (buffers) { + /* "Prevent" buffer deinstallation */ + buffers->nr_of_rounds_left = ERTS_UINT_MAX; + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + return buffers != NULL; +} diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index bd11c2712e..bde4a912db 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -260,6 +260,7 @@ int erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from, ErtsMessage** last_next, Uint len, int is_signal); +int erts_proc_sig_queue_force_buffers(Process*); #define ERTS_SIG_Q_OP_BITS 8 #define ERTS_SIG_Q_OP_SHIFT 0 diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl index 5893f69467..c6f7b1fc8d 100644 --- a/erts/emulator/test/signal_SUITE.erl +++ b/erts/emulator/test/signal_SUITE.erl @@ -42,7 +42,12 @@ busy_dist_down_signal/1, busy_dist_spawn_reply_signal/1, busy_dist_unlink_ack_signal/1, - monitor_order/1]). + monitor_order/1, + monitor_named_order_local/1, + monitor_named_order_remote/1, + monitor_nodes_order/1]). + +-export([spawn_spammers/3]). init_per_testcase(Func, Config) when is_atom(Func), is_list(Config) -> [{testcase, Func}|Config]. @@ -69,14 +74,17 @@ all() -> busy_dist_down_signal, busy_dist_spawn_reply_signal, busy_dist_unlink_ack_signal, - monitor_order]. + monitor_order, + monitor_named_order_local, + monitor_named_order_remote, + monitor_nodes_order]. %% Test that exit signals and messages are received in correct order xm_sig_order(Config) when is_list(Config) -> LNode = node(), - repeat(fun () -> xm_sig_order_test(LNode) end, 1000), + repeat(fun (_) -> xm_sig_order_test(LNode) end, 1000), {ok, Peer, RNode} = ?CT_PEER(), - repeat(fun () -> xm_sig_order_test(RNode) end, 1000), + repeat(fun (_) -> xm_sig_order_test(RNode) end, 1000), peer:stop(Peer), ok. @@ -490,10 +498,124 @@ monitor_order_1(N) -> monitor_order_1(N - 1) end. +%% Signal order: Message vs DOWN from local process monitored by name. +monitor_named_order_local(_Config) -> + process_flag(message_queue_data, off_heap), + erts_debug:set_internal_state(available_internal_state, true), + true = erts_debug:set_internal_state(proc_sig_buffers, true), + + LNode = node(), + repeat(fun (N) -> monitor_named_order(LNode, N) end, 100), + ok. + +%% Signal order: Message vs DOWN from remote process monitored by name. +monitor_named_order_remote(_Config) -> + process_flag(message_queue_data, off_heap), + erts_debug:set_internal_state(available_internal_state, true), + true = erts_debug:set_internal_state(proc_sig_buffers, true), + + {ok, Peer, RNode} = ?CT_PEER(), + repeat(fun (N) -> monitor_named_order(RNode, N) end, 10), + peer:stop(Peer), + ok. + +monitor_named_order(Node, N) -> + %% Send messages using pid, name and alias. + Pid = self(), + register(tester, Pid), + Name = {tester, node()}, + AliasA = alias(), + NumMsg = 1000 + N, + Sender = spawn_link(Node, + fun() -> + register(monitor_named_order, self()), + Pid ! {self(), ready}, + {go, AliasM} = receive_any(), + send_msg_seq(Pid, Name, AliasA, AliasM, NumMsg), + exit(normal) + end), + {Sender, ready} = receive_any(), + AliasM = monitor(process, {monitor_named_order,Node}, + [{alias,explicit_unalias}]), + Sender ! {go, AliasM}, + recv_msg_seq(NumMsg), + {'DOWN', AliasM, process, {monitor_named_order,Node}, normal} + = receive_any(), + unregister(tester), + unalias(AliasA), + unalias(AliasM), + ok. + +send_msg_seq(_, _, _, _, 0) -> ok; +send_msg_seq(To1, To2, To3, To4, N) -> + To1 ! N, + send_msg_seq(To2, To3, To4, To1, N-1). + +recv_msg_seq(0) -> ok; +recv_msg_seq(N) -> + N = receive M -> M end, + recv_msg_seq(N-1). + +receive_any() -> + receive M -> M end. + +receive_any(Timeout) -> + receive M -> M + after Timeout -> timeout + end. + +monitor_nodes_order(_Config) -> + process_flag(message_queue_data, off_heap), + erts_debug:set_internal_state(available_internal_state, true), + true = erts_debug:set_internal_state(proc_sig_buffers, true), + + {ok, Peer, RNode} = ?CT_PEER(#{peer_down => continue, + connection => 0}), + Self = self(), + ok = net_kernel:monitor_nodes(true, [nodedown_reason]), + [] = nodes(connected), + Pids = peer:call(Peer, ?MODULE, spawn_spammers, [64, Self, []]), + {nodeup, RNode, []} = receive_any(), + + ok = peer:cast(Peer, erlang, halt, [0]), + + [put(P, 0) || P <- Pids], % spam counters per sender + {nodedown, RNode, [{nodedown_reason,connection_closed}]} = + receive_filter_spam(), + [io:format("From spammer ~p: ~p messages\n", [P, get(P)]) || P <- Pids], + timeout = receive_any(100), % Nothing after nodedown + + {down, tcp_closed} = peer:get_state(Peer), + peer:stop(Peer), + ok. + +spawn_spammers(0, _To, Acc) -> + Acc; +spawn_spammers(N, To, Acc) -> + Pid = spawn(fun() -> spam_pid(To, 1) end), + spawn_spammers(N-1, To, [Pid | Acc]). + +spam_pid(To, N) -> + To ! {spam, self(), N}, + erlang:yield(), % Let other spammers run to get lots of different senders + spam_pid(To, N+1). + +receive_filter_spam() -> + receive + {spam, From, N} -> + match(N, get(From) + 1), + put(From, N), + receive_filter_spam(); + M -> M + end. + + %% %% -- Internal utils -------------------------------------------------------- %% +match(X,X) -> ok. + load_driver(Config, Driver) -> DataDir = proplists:get_value(data_dir, Config), case erl_ddll:load_driver(DataDir, Driver) of @@ -610,5 +732,5 @@ spam(To, Data) -> repeat(_Fun, N) when is_integer(N), N =< 0 -> ok; repeat(Fun, N) when is_integer(N) -> - Fun(), + Fun(N), repeat(Fun, N-1). |