summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSverker Eriksson <sverker@erlang.org>2022-06-28 14:30:35 +0200
committerSverker Eriksson <sverker@erlang.org>2022-06-28 14:30:35 +0200
commit0e40f9e1324f3fe68902cc6dac3599e98df01eb4 (patch)
tree4ca608f15a2298365f1794ae6430d6dac00b65c2
parentd27e17d6d6d03a518e86dd457e79add5699a1ff5 (diff)
parente41174a16654af83d1eef3052ed5ffa0e534e6e9 (diff)
downloaderlang-0e40f9e1324f3fe68902cc6dac3599e98df01eb4.tar.gz
Merge branch 'sverker/erts/test-signal-order' into maint
-rw-r--r--erts/emulator/beam/erl_bif_info.c11
-rw-r--r--erts/emulator/beam/erl_message.h2
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c32
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h1
-rw-r--r--erts/emulator/test/signal_SUITE.erl132
5 files changed, 167 insertions, 11 deletions
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 3cf822e757..9da481d036 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -4951,6 +4951,17 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
#else
BIF_RET(am_notsup);
#endif
+ } else if (ERTS_IS_ATOM_STR("proc_sig_buffers", BIF_ARG_1)) {
+ switch (BIF_ARG_2)
+ {
+ case am_true: {
+ int has_buffers = erts_proc_sig_queue_force_buffers(BIF_P);
+ BIF_RET(has_buffers ? am_true : am_false);
+ }
+ default:
+ break;
+ }
+ BIF_RET(am_notsup);
}
}
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 0bfa974f1a..87cd96f4cd 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -417,7 +417,7 @@ typedef struct {
* part of the thread progress system.
*/
erts_atomic64_t dirty_refc;
- Uint nr_of_rounds;
+ Uint nr_of_rounds_left;
Uint nr_of_enqueues;
int alive;
} ErtsSignalInQueueBufferArray;
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 814c3fdd52..0af9bf6818 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -8345,15 +8345,14 @@ erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers)
}
}
}
- buffers->nr_of_rounds += 1;
- if (buffers->nr_of_rounds >
- ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE) {
+ if (--buffers->nr_of_rounds_left == 0) {
/* Take decision if we should adapt back to the normal state */
if(buffers->nr_of_enqueues <
ERTS_PROC_SIG_INQ_BUFFERED_MIN_NO_ENQUEUES_TO_KEEP) {
erts_proc_sig_queue_flush_and_deinstall_buffers(proc);
} else {
- buffers->nr_of_rounds = 0;
+ buffers->nr_of_rounds_left =
+ ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE;
buffers->nr_of_enqueues = 0;
}
}
@@ -8465,7 +8464,8 @@ void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state)
erts_atomic64_init_nob(&buffers->nonmsg_slots, (erts_aint64_t)(Uint64)0);
erts_atomic64_init_nob(&buffers->dirty_refc, (erts_aint64_t)(Uint64)1);
buffers->nr_of_enqueues = 0;
- buffers->nr_of_rounds = 0;
+ buffers->nr_of_rounds_left =
+ ERTS_PROC_SIG_INQ_BUFFERED_MIN_FLUSH_ALL_OPS_BEFORE_CHANGE;
buffers->alive = 1;
/* Initialize slots */
for(i = 0; i < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; i++) {
@@ -8524,3 +8524,25 @@ void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers,
erts_free(ERTS_ALC_T_SIGQ_BUFFERS, buffers);
}
}
+
+/* Only for test purposes */
+int erts_proc_sig_queue_force_buffers(Process* p)
+{
+ erts_aint32_t state;
+ ErtsSignalInQueueBufferArray* buffers;
+
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ state = erts_atomic32_read_nob(&p->state);
+ /* Fake contention */
+ p->sig_inq_contention_counter =
+ 1 + ERTS_PROC_SIG_INQ_BUFFERED_CONTENTION_INSTALL_LIMIT;
+ erts_proc_sig_queue_maybe_install_buffers(p, state);
+ buffers = ((ErtsSignalInQueueBufferArray*)
+ erts_atomic_read_nob(&p->sig_inq_buffers));
+ if (buffers) {
+ /* "Prevent" buffer deinstallation */
+ buffers->nr_of_rounds_left = ERTS_UINT_MAX;
+ }
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ return buffers != NULL;
+}
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index bd11c2712e..bde4a912db 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -260,6 +260,7 @@ int erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from,
ErtsMessage** last_next,
Uint len,
int is_signal);
+int erts_proc_sig_queue_force_buffers(Process*);
#define ERTS_SIG_Q_OP_BITS 8
#define ERTS_SIG_Q_OP_SHIFT 0
diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl
index 5893f69467..c6f7b1fc8d 100644
--- a/erts/emulator/test/signal_SUITE.erl
+++ b/erts/emulator/test/signal_SUITE.erl
@@ -42,7 +42,12 @@
busy_dist_down_signal/1,
busy_dist_spawn_reply_signal/1,
busy_dist_unlink_ack_signal/1,
- monitor_order/1]).
+ monitor_order/1,
+ monitor_named_order_local/1,
+ monitor_named_order_remote/1,
+ monitor_nodes_order/1]).
+
+-export([spawn_spammers/3]).
init_per_testcase(Func, Config) when is_atom(Func), is_list(Config) ->
[{testcase, Func}|Config].
@@ -69,14 +74,17 @@ all() ->
busy_dist_down_signal,
busy_dist_spawn_reply_signal,
busy_dist_unlink_ack_signal,
- monitor_order].
+ monitor_order,
+ monitor_named_order_local,
+ monitor_named_order_remote,
+ monitor_nodes_order].
%% Test that exit signals and messages are received in correct order
xm_sig_order(Config) when is_list(Config) ->
LNode = node(),
- repeat(fun () -> xm_sig_order_test(LNode) end, 1000),
+ repeat(fun (_) -> xm_sig_order_test(LNode) end, 1000),
{ok, Peer, RNode} = ?CT_PEER(),
- repeat(fun () -> xm_sig_order_test(RNode) end, 1000),
+ repeat(fun (_) -> xm_sig_order_test(RNode) end, 1000),
peer:stop(Peer),
ok.
@@ -490,10 +498,124 @@ monitor_order_1(N) ->
monitor_order_1(N - 1)
end.
+%% Signal order: Message vs DOWN from local process monitored by name.
+monitor_named_order_local(_Config) ->
+ process_flag(message_queue_data, off_heap),
+ erts_debug:set_internal_state(available_internal_state, true),
+ true = erts_debug:set_internal_state(proc_sig_buffers, true),
+
+ LNode = node(),
+ repeat(fun (N) -> monitor_named_order(LNode, N) end, 100),
+ ok.
+
+%% Signal order: Message vs DOWN from remote process monitored by name.
+monitor_named_order_remote(_Config) ->
+ process_flag(message_queue_data, off_heap),
+ erts_debug:set_internal_state(available_internal_state, true),
+ true = erts_debug:set_internal_state(proc_sig_buffers, true),
+
+ {ok, Peer, RNode} = ?CT_PEER(),
+ repeat(fun (N) -> monitor_named_order(RNode, N) end, 10),
+ peer:stop(Peer),
+ ok.
+
+monitor_named_order(Node, N) ->
+ %% Send messages using pid, name and alias.
+ Pid = self(),
+ register(tester, Pid),
+ Name = {tester, node()},
+ AliasA = alias(),
+ NumMsg = 1000 + N,
+ Sender = spawn_link(Node,
+ fun() ->
+ register(monitor_named_order, self()),
+ Pid ! {self(), ready},
+ {go, AliasM} = receive_any(),
+ send_msg_seq(Pid, Name, AliasA, AliasM, NumMsg),
+ exit(normal)
+ end),
+ {Sender, ready} = receive_any(),
+ AliasM = monitor(process, {monitor_named_order,Node},
+ [{alias,explicit_unalias}]),
+ Sender ! {go, AliasM},
+ recv_msg_seq(NumMsg),
+ {'DOWN', AliasM, process, {monitor_named_order,Node}, normal}
+ = receive_any(),
+ unregister(tester),
+ unalias(AliasA),
+ unalias(AliasM),
+ ok.
+
+send_msg_seq(_, _, _, _, 0) -> ok;
+send_msg_seq(To1, To2, To3, To4, N) ->
+ To1 ! N,
+ send_msg_seq(To2, To3, To4, To1, N-1).
+
+recv_msg_seq(0) -> ok;
+recv_msg_seq(N) ->
+ N = receive M -> M end,
+ recv_msg_seq(N-1).
+
+receive_any() ->
+ receive M -> M end.
+
+receive_any(Timeout) ->
+ receive M -> M
+ after Timeout -> timeout
+ end.
+
+monitor_nodes_order(_Config) ->
+ process_flag(message_queue_data, off_heap),
+ erts_debug:set_internal_state(available_internal_state, true),
+ true = erts_debug:set_internal_state(proc_sig_buffers, true),
+
+ {ok, Peer, RNode} = ?CT_PEER(#{peer_down => continue,
+ connection => 0}),
+ Self = self(),
+ ok = net_kernel:monitor_nodes(true, [nodedown_reason]),
+ [] = nodes(connected),
+ Pids = peer:call(Peer, ?MODULE, spawn_spammers, [64, Self, []]),
+ {nodeup, RNode, []} = receive_any(),
+
+ ok = peer:cast(Peer, erlang, halt, [0]),
+
+ [put(P, 0) || P <- Pids], % spam counters per sender
+ {nodedown, RNode, [{nodedown_reason,connection_closed}]} =
+ receive_filter_spam(),
+ [io:format("From spammer ~p: ~p messages\n", [P, get(P)]) || P <- Pids],
+ timeout = receive_any(100), % Nothing after nodedown
+
+ {down, tcp_closed} = peer:get_state(Peer),
+ peer:stop(Peer),
+ ok.
+
+spawn_spammers(0, _To, Acc) ->
+ Acc;
+spawn_spammers(N, To, Acc) ->
+ Pid = spawn(fun() -> spam_pid(To, 1) end),
+ spawn_spammers(N-1, To, [Pid | Acc]).
+
+spam_pid(To, N) ->
+ To ! {spam, self(), N},
+ erlang:yield(), % Let other spammers run to get lots of different senders
+ spam_pid(To, N+1).
+
+receive_filter_spam() ->
+ receive
+ {spam, From, N} ->
+ match(N, get(From) + 1),
+ put(From, N),
+ receive_filter_spam();
+ M -> M
+ end.
+
+
%%
%% -- Internal utils --------------------------------------------------------
%%
+match(X,X) -> ok.
+
load_driver(Config, Driver) ->
DataDir = proplists:get_value(data_dir, Config),
case erl_ddll:load_driver(DataDir, Driver) of
@@ -610,5 +732,5 @@ spam(To, Data) ->
repeat(_Fun, N) when is_integer(N), N =< 0 ->
ok;
repeat(Fun, N) when is_integer(N) ->
- Fun(),
+ Fun(N),
repeat(Fun, N-1).