summaryrefslogtreecommitdiff
path: root/erts/emulator
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/bif.c75
-rw-r--r--erts/emulator/beam/bif.h10
-rw-r--r--erts/emulator/beam/erl_bif_info.c221
-rw-r--r--erts/emulator/beam/erl_bif_port.c89
-rw-r--r--erts/emulator/beam/erl_message.c6
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c181
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h70
-rw-r--r--erts/emulator/beam/erl_process.c62
-rw-r--r--erts/emulator/beam/erl_process.h9
-rw-r--r--erts/emulator/beam/erl_process_dump.c17
-rw-r--r--erts/emulator/test/bif_SUITE.erl60
-rw-r--r--erts/emulator/test/nif_SUITE.erl5
-rw-r--r--erts/emulator/test/port_SUITE.erl1
-rw-r--r--erts/emulator/test/process_SUITE.erl178
14 files changed, 792 insertions, 192 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index a30d6b6a9e..87241e6dce 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1524,7 +1524,7 @@ erts_internal_await_exit_trap(BIF_ALIST_0)
* terminated in order to ensure that signal order
* is preserved. Yield if necessary.
*/
- erts_aint32_t state;
+ erts_aint32_t state = erts_atomic32_read_nob(&BIF_P->state);
int reds = ERTS_BIF_REDS_LEFT(BIF_P);
(void) erts_proc_sig_handle_incoming(BIF_P, &state, &reds,
reds, !0);
@@ -5395,45 +5395,52 @@ static BIF_RETTYPE bif_return_trap(BIF_ALIST_2)
}
static BIF_RETTYPE
-bif_handle_signals_return(BIF_ALIST_1)
+bif_handle_signals_return(BIF_ALIST_2)
{
- int local_only = BIF_P->sig_qs.flags & FS_LOCAL_SIGS_ONLY;
- int sres, sreds, reds_left;
+ int reds_left;
erts_aint32_t state;
- reds_left = ERTS_BIF_REDS_LEFT(BIF_P);
- sreds = reds_left;
-
- if (!local_only) {
- erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MSGQ);
- erts_proc_sig_fetch(BIF_P);
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MSGQ);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) {
+ flushed:
+ ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS);
+ BIF_P->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS);
+ erts_set_gc_state(BIF_P, !0); /* Allow GC again... */
+ BIF_RET(BIF_ARG_2);
}
+
+ if (!(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS)) {
+ int flags = ((is_internal_pid(BIF_ARG_1)
+ || is_internal_port(BIF_ARG_1))
+ ? ERTS_PROC_SIG_FLUSH_FLG_FROM_ID
+ : ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL);
+ erts_proc_sig_init_flush_signals(BIF_P, flags, BIF_ARG_1);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ }
+
+ ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS);
- state = erts_atomic32_read_nob(&BIF_P->state);
- sres = erts_proc_sig_handle_incoming(BIF_P, &state, &sreds,
- sreds, !0);
-
- BUMP_REDS(BIF_P, (int) sreds);
- reds_left -= sreds;
+ reds_left = ERTS_BIF_REDS_LEFT(BIF_P);
- if (state & ERTS_PSFLG_EXITING) {
- BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- ERTS_BIF_EXITED(BIF_P);
- }
- if (!sres | (reds_left <= 0)) {
- /*
- * More signals to handle or out of reds; need
- * to yield and continue. Prevent fetching of
- * more signals by setting local-sigs-only flag.
- */
- BIF_P->sig_qs.flags |= FS_LOCAL_SIGS_ONLY;
- ERTS_BIF_YIELD1(&erts_bif_handle_signals_return_export,
- BIF_P, BIF_ARG_1);
- }
+ state = erts_atomic32_read_nob(&BIF_P->state);
+ do {
+ int sreds = reds_left;
+ (void) erts_proc_sig_handle_incoming(BIF_P, &state, &sreds,
+ sreds, !0);
+ BUMP_REDS(BIF_P, (int) sreds);
+ if (state & ERTS_PSFLG_EXITING)
+ ERTS_BIF_EXITED(BIF_P);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ reds_left -= sreds;
+ } while (reds_left > 0);
- BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- BIF_RET(BIF_ARG_1);
+ /*
+ * More signals to handle, but out of reductions. Yield
+ * and come back here and continue...
+ */
+ ERTS_BIF_YIELD2(&erts_bif_handle_signals_return_export,
+ BIF_P, BIF_ARG_1, BIF_ARG_2);
}
Export bif_return_trap_export;
@@ -5475,7 +5482,7 @@ void erts_init_bif(void)
&bif_return_trap);
erts_init_trap_export(&erts_bif_handle_signals_return_export,
- am_erlang, am_bif_handle_signals_return, 1,
+ am_erlang, am_bif_handle_signals_return, 2,
&bif_handle_signals_return);
erts_await_result = erts_export_put(am_erts_internal,
diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h
index 350f2ad430..a89ea5d4f2 100644
--- a/erts/emulator/beam/bif.h
+++ b/erts/emulator/beam/bif.h
@@ -522,12 +522,12 @@ do { \
extern Export erts_bif_handle_signals_return_export;
-#define ERTS_BIF_HANDLE_SIGNALS_RETURN(P, VAL) \
- BIF_TRAP1(&erts_bif_handle_signals_return_export, (P), (VAL))
+#define ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(P, FROM, VAL) \
+ BIF_TRAP2(&erts_bif_handle_signals_return_export, (P), (FROM), (VAL))
-#define ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(Ret, P, Val) \
- ERTS_BIF_PREP_TRAP1((Ret), &erts_bif_handle_signals_return_export, \
- (P), (Val))
+#define ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(Ret, P, From, Val) \
+ ERTS_BIF_PREP_TRAP2((Ret), &erts_bif_handle_signals_return_export, \
+ (P), (From), (Val))
#define ERTS_BIF_PREP_EXITED(RET, PROC) \
do { \
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 2c27d460fd..430c4f1d80 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -1007,6 +1007,7 @@ process_info_aux(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int item_ix,
+ Sint *msgq_len_p,
int flags,
Uint *reserve_sizep,
Uint *reds);
@@ -1025,11 +1026,12 @@ erts_process_info(Process *c_p,
Eterm res;
Eterm part_res[ERTS_PI_ARGS];
int item_ix_ix, ix;
+ Sint msgq_len = -1;
if (ERTS_PI_FLAG_SINGELTON & flags) {
ASSERT(item_ix_len == 1);
res = process_info_aux(c_p, hfact, rp, rp_locks, item_ix[0],
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
return res;
}
@@ -1047,7 +1049,7 @@ erts_process_info(Process *c_p,
ix = pi_arg2ix(am_messages);
ASSERT(part_res[ix] == THE_NON_VALUE);
res = process_info_aux(c_p, hfact, rp, rp_locks, ix,
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
ASSERT(res != am_undefined);
ASSERT(res != THE_NON_VALUE);
part_res[ix] = res;
@@ -1057,7 +1059,7 @@ erts_process_info(Process *c_p,
ix = item_ix[item_ix_ix];
if (part_res[ix] == THE_NON_VALUE) {
res = process_info_aux(c_p, hfact, rp, rp_locks, ix,
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
ASSERT(res != am_undefined);
ASSERT(res != THE_NON_VALUE);
part_res[ix] = res;
@@ -1092,6 +1094,92 @@ erts_process_info(Process *c_p,
static void
pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix);
+static ERTS_INLINE int
+pi_maybe_flush_signals(Process *c_p, int pi_flags)
+{
+ int reds_left;
+ erts_aint32_t state;
+
+ /*
+ * pi_maybe_flush_signals() flush signals in callers
+ * signal queue for two different reasons:
+ *
+ * 1. If we need 'message_queue_len', but not 'messages', we need
+ * to handle all signals in the middle queue in order for
+ * 'c_p->sig_qs.len' to reflect the amount of messages in the
+ * message queue. We could count traverse the queues, but it
+ * is better to handle all signals in the queue instead since
+ * this is work we anyway need to do at some point.
+ *
+ * 2. Ensures that all signals that the caller might have sent to
+ * itself are handled before we gather information.
+ *
+ * This is, however, not strictly necessary. process_info() is
+ * not documented to send itself a signal when gathering
+ * information about itself. That is, the operation is not
+ * bound by the signal order guarantee when gathering
+ * information about itself. If we do not handle outstanding
+ * signals before gathering the information, outstanding signals
+ * from the caller to itself will not be part of the result.
+ * This would not be wrong, but perhaps surprising for the user.
+ * We continue doing it this way for now, since this is how it
+ * has been done for a very long time. We should, however,
+ * consider changing this in a future release, since this signal
+ * handling is not for free, although quite cheap since these
+ * signals anyway must be handled at some point.
+ */
+
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) {
+ flushed:
+
+ ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
+ | ERTS_PI_FLAG_NEED_MSGQ_LEN))
+ != ERTS_PI_FLAG_NEED_MSGQ_LEN)
+ || !c_p->sig_qs.cont);
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+
+ c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS);
+ erts_set_gc_state(c_p, !0); /* Allow GC again... */
+ return 0; /* done, all signals handled... */
+ }
+
+ state = erts_atomic32_read_nob(&c_p->state);
+
+ if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
+ int flush_flags = 0;
+ if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
+ | ERTS_PI_FLAG_NEED_MSGQ_LEN))
+ == ERTS_PI_FLAG_NEED_MSGQ_LEN)
+ && c_p->sig_qs.cont) {
+ flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ;
+ }
+ if (state & ERTS_PSFLG_MAYBE_SELF_SIGS)
+ flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID;
+ if (!flush_flags)
+ return 0; /* done; no need to flush... */
+ erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id);
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ }
+
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ reds_left = ERTS_BIF_REDS_LEFT(c_p);
+
+ do {
+ int sreds = reds_left;
+ (void) erts_proc_sig_handle_incoming(c_p, &state, &sreds,
+ sreds, !0);
+ BUMP_REDS(c_p, (int) sreds);
+ if (state & ERTS_PSFLG_EXITING)
+ return -1; /* process exiting... */
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ reds_left -= sreds;
+ } while (reds_left > 0);
+
+ return 1; /* yield and continue here later... */
+}
+
static BIF_RETTYPE
process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
{
@@ -1110,41 +1198,6 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
ERTS_CT_ASSERT(ERTS_PI_DEF_ARR_SZ > 0);
- if (c_p->common.id == pid) {
- int local_only = c_p->sig_qs.flags & FS_LOCAL_SIGS_ONLY;
- int sres, sreds, reds_left;
-
- reds_left = ERTS_BIF_REDS_LEFT(c_p);
- sreds = reds_left;
-
- if (!local_only) {
- erts_proc_sig_queue_lock(c_p);
- erts_proc_sig_fetch(c_p);
- erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
- }
-
- sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0);
-
- BUMP_REDS(c_p, (int) sreds);
- reds_left -= sreds;
-
- if (state & ERTS_PSFLG_EXITING) {
- c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- goto exited;
- }
- if (!sres | (reds_left <= 0)) {
- /*
- * More signals to handle or out of reds; need
- * to yield and continue. Prevent fetching of
- * more signals by setting local-sigs-only flag.
- */
- c_p->sig_qs.flags |= FS_LOCAL_SIGS_ONLY;
- goto yield;
- }
-
- c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- }
-
if (is_atom(opt)) {
int ix = pi_arg2ix(opt);
item_ix[0] = ix;
@@ -1190,7 +1243,16 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
goto badarg;
}
- if (is_not_internal_pid(pid)) {
+ if (c_p->common.id == pid) {
+ int res = pi_maybe_flush_signals(c_p, flags);
+ if (res != 0) {
+ if (res > 0)
+ goto yield;
+ else
+ goto exited;
+ }
+ }
+ else if (is_not_internal_pid(pid)) {
if (is_external_pid(pid)
&& external_pid_dist_entry(pid) == erts_this_dist_entry)
goto undefined;
@@ -1226,6 +1288,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
}
if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) {
ASSERT(locks & ERTS_PROC_LOCK_MAIN);
+ if (rp->sig_qs.flags & FS_FLUSHING_SIGS) {
+ erts_proc_unlock(rp, locks);
+ goto send_signal;
+ }
erts_proc_sig_queue_lock(rp);
erts_proc_sig_fetch(rp);
if (rp->sig_qs.cont) {
@@ -1264,7 +1330,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
if (c_p == rp || !ERTS_PROC_HAS_INCOMING_SIGNALS(c_p))
ERTS_BIF_PREP_RET(ret, res);
else
- ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(ret, c_p, res);
+ ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(ret, c_p,
+ pid, res);
done:
@@ -1355,6 +1422,7 @@ process_info_aux(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int item_ix,
+ Sint *msgq_len_p,
int flags,
Uint *reserve_sizep,
Uint *reds)
@@ -1468,8 +1536,10 @@ process_info_aux(Process *c_p,
case ERTS_PI_IX_MESSAGES: {
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
- if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE))
+ if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) {
+ *msgq_len_p = 0;
res = NIL;
+ }
else {
int info_on_self = !(flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER);
ErtsMessageInfo *mip;
@@ -1487,8 +1557,8 @@ process_info_aux(Process *c_p,
heap_need = erts_proc_sig_prep_msgq_for_inspection(c_p, rp,
rp_locks,
info_on_self,
- mip);
- len = rp->sig_qs.len;
+ mip, msgq_len_p);
+ len = *msgq_len_p;
heap_need += len*2; /* Cons cells */
@@ -1517,9 +1587,13 @@ process_info_aux(Process *c_p,
}
case ERTS_PI_IX_MESSAGE_QUEUE_LEN: {
- Sint len = rp->sig_qs.len;
+ Sint len = *msgq_len_p;
+ if (len < 0) {
+ ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER)
+ || !rp->sig_qs.cont);
+ len = rp->sig_qs.len;
+ }
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
- ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) || !rp->sig_qs.cont);
ASSERT(len >= 0);
if (len <= MAX_SMALL)
res = make_small(len);
@@ -3690,42 +3764,54 @@ BIF_RETTYPE erts_internal_is_process_alive_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
if (!erts_proc_sig_send_is_alive_request(BIF_P, BIF_ARG_1, BIF_ARG_2)) {
if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, am_ok);
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, am_ok);
}
BIF_RET(am_ok);
}
BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
{
+
if (is_internal_pid(BIF_ARG_1)) {
- erts_aint32_t state;
+ BIF_RETTYPE result;
Process *rp;
if (BIF_ARG_1 == BIF_P->common.id)
BIF_RET(am_true);
rp = erts_proc_lookup_raw(BIF_ARG_1);
- if (!rp)
- BIF_RET(am_false);
+ if (!rp) {
+ result = am_false;
+ }
+ else {
+ erts_aint32_t state = erts_atomic32_read_acqb(&rp->state);
+ if (state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_SIG_Q
+ | ERTS_PSFLG_SIG_IN_Q)) {
+ /*
+ * If in exiting state, trap out and send 'is alive'
+ * request and wait for it to complete termination.
+ *
+ * If process has signals enqueued, we need to
+ * send it an 'is alive' request via its signal
+ * queue in order to ensure that signal order is
+ * preserved (we may earlier have sent it an
+ * exit signal that has not been processed yet).
+ */
+ BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1);
+ }
- state = erts_atomic32_read_acqb(&rp->state);
- if (state & (ERTS_PSFLG_EXITING
- | ERTS_PSFLG_SIG_Q
- | ERTS_PSFLG_SIG_IN_Q)) {
+ result = am_true;
+ }
+
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
/*
- * If in exiting state, trap out and send 'is alive'
- * request and wait for it to complete termination.
- *
- * If process has signals enqueued, we need to
- * send it an 'is alive' request via its signal
- * queue in order to ensure that signal order is
- * preserved (we may earlier have sent it an
- * exit signal that has not been processed yet).
+ * Ensure that signal order of signals from inspected
+ * process to us is preserved...
*/
- BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1);
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, result);
}
-
- BIF_RET(am_true);
+ BIF_RET(result);
}
if (is_external_pid(BIF_ARG_1)) {
@@ -3734,6 +3820,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
}
BIF_ERROR(BIF_P, BADARG);
+
+
}
static Eterm
@@ -4218,10 +4306,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
ERTS_ASSERT_IS_NOT_EXITING(BIF_P);
BIF_RET(am_undefined);
}
-
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(p);
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ state = erts_atomic32_read_nob(&BIF_P->state);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
@@ -4284,10 +4372,11 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
ERTS_ASSERT_IS_NOT_EXITING(BIF_P);
BIF_RET(am_undefined);
}
-
+
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(p);
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ state = erts_atomic32_read_nob(&BIF_P->state);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index fc415aa4e5..737a6be15f 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -238,8 +238,17 @@ BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3)
}
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(res, BIF_P, res);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(res, BIF_P,
+ from, res);
+ }
}
return res;
@@ -287,8 +296,16 @@ BIF_RETTYPE erts_internal_port_call_3(BIF_ALIST_3)
ERTS_BIF_EXITED(BIF_P);
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
}
BIF_RET(retval);
@@ -335,8 +352,16 @@ BIF_RETTYPE erts_internal_port_control_3(BIF_ALIST_3)
ERTS_BIF_EXITED(BIF_P);
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
}
BIF_RET(retval);
@@ -382,8 +407,16 @@ BIF_RETTYPE erts_internal_port_close_1(BIF_ALIST_1)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -426,8 +459,16 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -435,7 +476,7 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2)
BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
{
Eterm retval;
- Port* prt;
+ Port* prt = NULL;
if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) {
prt = sig_lookup_port(BIF_P, BIF_ARG_1);
@@ -474,8 +515,16 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -484,7 +533,7 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2)
{
Eterm retval;
- Port* prt;
+ Port* prt = NULL;
if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) {
prt = sig_lookup_port(BIF_P, BIF_ARG_1);
@@ -523,8 +572,16 @@ BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 151d0b41ad..0026ecce99 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -539,6 +539,9 @@ erts_queue_proc_message(Process* sender,
Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* mp, Eterm msg)
{
+ if (sender == receiver)
+ (void) erts_atomic32_read_bor_nob(&sender->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = sender->common.id;
queue_messages(sender->common.id, receiver, receiver_locks,
@@ -552,6 +555,9 @@ erts_queue_proc_messages(Process* sender,
Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* first, ErtsMessage** last, Uint len)
{
+ if (sender == receiver)
+ (void) erts_atomic32_read_bor_nob(&sender->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
queue_messages(sender->common.id, receiver, receiver_locks,
prepend_pending_sig_maybe(sender, receiver, first),
last, len);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 7cb5f9e491..bfddf9e48f 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -220,7 +220,7 @@ typedef struct {
Eterm request_id;
} ErtsCLAData;
-static void wait_handle_signals(Process *c_p);
+static erts_aint32_t wait_handle_signals(Process *c_p);
static void wake_handle_signals(Process *proc);
static int handle_msg_tracing(Process *c_p,
@@ -729,7 +729,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
/* Tracing requires sender for local procs and ports. The assertions below
* will not catch errors after time-of-death, but ought to find most
* problems. */
- ASSERT(sender != NULL ||
+ ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH ||
(is_normal_sched && esdp->pending_signal.sig == sig) ||
(!(is_internal_pid(from) &&
erts_proc_lookup(from) != NULL) &&
@@ -824,6 +824,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
sigp = &first;
first_last_done:
+
+ if ((void *) sender == (void *) rp)
+ (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
+
sig->common.specific.next = NULL;
/* may add signals before sig */
@@ -2751,6 +2756,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to)
}
}
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id)
+{
+ int force_flush_buffers = 0, enqueue_mq, fetch_sigs;
+ ErtsSignal *sig;
+
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS)));
+ ASSERT(flags);
+ ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0);
+ ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+ || is_internal_pid(id) || is_internal_port(id));
+
+ sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon));
+ sig->common.next = NULL;
+ sig->common.specific.attachment = NULL;
+ sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ switch (flags) {
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL:
+ id = c_p->common.id;
+ force_flush_buffers = !0;
+ /* Fall through... */
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID:
+ if (!proc_queue_signal(NULL, id, c_p->common.id, sig,
+ force_flush_buffers, ERTS_SIG_Q_OP_FLUSH))
+ ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves");
+ enqueue_mq = 0;
+ fetch_sigs = !0;
+ break;
+ case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ:
+ enqueue_mq = !0;
+ fetch_sigs = 0;
+ break;
+ default:
+ enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ);
+ fetch_sigs = !0;
+ break;
+ }
+
+ erts_set_gc_state(c_p, 0);
+
+ if (fetch_sigs) {
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
+
+ c_p->sig_qs.flags |= FS_FLUSHING_SIGS;
+
+ if (enqueue_mq) {
+ if (!c_p->sig_qs.cont) {
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ }
+ else {
+ if (!c_p->sig_qs.nmsigs.last) {
+ ASSERT(!c_p->sig_qs.nmsigs.next);
+ c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last;
+ }
+ else {
+ ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last;
+ ASSERT(c_p->sig_qs.nmsigs.next);
+ ASSERT(lsig && !lsig->common.specific.next);
+ lsig->common.specific.next = c_p->sig_qs.cont_last;
+ }
+
+ c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last;
+ *c_p->sig_qs.cont_last = (ErtsMessage *) sig;
+ c_p->sig_qs.cont_last = &sig->common.next;
+ }
+ }
+
+}
+
static int
handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
{
@@ -5149,22 +5231,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
int *redsp, int max_reds, int local_only)
{
Eterm tag;
- erts_aint32_t state;
+ erts_aint32_t state = *statep;
int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS));
if (c_p->sig_qs.flags & FS_HANDLING_SIGS)
- wait_handle_signals(c_p);
+ state = wait_handle_signals(c_p);
else
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
- state = erts_atomic32_read_nob(&c_p->state);
- if (!local_only) {
+ if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
if (ERTS_PSFLG_SIG_IN_Q & state) {
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
@@ -5694,6 +5775,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ case ERTS_SIG_Q_OP_FLUSH:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ /*
+ * The caller has been exclusively handling signals until this
+ * point. Break out and let the process continue with other
+ * things as well...
+ */
+ goto stop;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -5810,7 +5905,7 @@ stop: {
*next_nm_sig = &c_p->sig_qs.cont;
if (c_p->sig_qs.nmsigs.last == tracing.messages.next)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
}
else {
ASSERT(!c_p->sig_qs.nmsigs.next);
@@ -5818,7 +5913,6 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
}
if (tracing.messages.next != &c_p->sig_qs.cont) {
@@ -5864,7 +5958,7 @@ stop: {
ASSERT(c_p->sig_qs.cont);
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
res = 0;
}
@@ -5897,10 +5991,21 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
res = !0;
}
+ if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS)
+ & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) {
+ /*
+ * We know we do not have any outstanding signals
+ * from ourselves...
+ */
+ (void) erts_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_MAYBE_SELF_SIGS);
+ state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS;
+ }
+ *statep = state;
+
/* Ensure that 'save' doesn't point to a receive marker... */
if (*c_p->sig_qs.save
&& ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) {
@@ -6183,6 +6288,12 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
}
break;
+ case ERTS_SIG_Q_OP_FLUSH:
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
@@ -6280,6 +6391,7 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_RPC:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_ADJ_MSGQ:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default:
@@ -6292,8 +6404,33 @@ clear_seq_trace_token(ErtsMessage *sig)
void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p)
{
- erts_proc_sig_fetch(c_p);
- ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig));
+ int ix;
+ ErtsSignalInQueueBufferArray *bap;
+ int unget_info;
+ ErtsMessage *qs[] = {c_p->sig_qs.first,
+ c_p->sig_qs.cont,
+ c_p->sig_inq.first};
+
+ ERTS_LC_ASSERT(erts_thr_progress_is_blocking());
+
+ for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) {
+ ErtsMessage *sigp;
+ for (sigp = qs[ix]; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+
+ bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info);
+ if (bap) {
+ for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) {
+ ErtsSignalInQueueBuffer* bp = &bap->slots[ix];
+ if (bp->b.alive) {
+ ErtsMessage *sigp;
+ for (sigp = bp->b.queue.first; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+ }
+ erts_proc_sig_queue_unget_buffers(bap, unget_info);
+ }
}
Uint
@@ -6419,6 +6556,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
}
+ case ERTS_SIG_Q_OP_FLUSH:
+ size = sizeof(ErtsSignalCommon);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
size = sizeof(ErtsSigTraceInfo);
break;
@@ -6537,6 +6678,7 @@ erts_proc_sig_receive_helper(Process *c_p,
if (max_reds < reds)
max_reds = reds;
#endif
+ state = erts_atomic32_read_nob(&c_p->state);
(void) erts_proc_sig_handle_incoming(c_p, &state, &reds,
max_reds, !0);
consumed_reds += reds;
@@ -7246,7 +7388,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip)
+ ErtsMessageInfo *mip,
+ Sint *msgq_len_p)
{
Uint tot_heap_size;
ErtsMessage *mp, **mpp;
@@ -7320,7 +7463,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
mp = mp->next;
}
- ASSERT(c_p->sig_qs.len == i);
+
+ ASSERT(info_on_self || c_p->sig_qs.len == i);
+ ASSERT(!info_on_self || c_p->sig_qs.len >= i);
+
+ *msgq_len_p = i;
return tot_heap_size;
}
@@ -7409,7 +7556,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)
BIF_RET(am_normal); /* will handle signals itself... */
}
else {
- erts_aint32_t state;
int done;
Eterm res = am_false;
int reds = 0;
@@ -7477,7 +7623,7 @@ erts_proc_sig_cleanup_queues(Process *c_p)
/* Debug */
-static void
+static erts_aint32_t
wait_handle_signals(Process *c_p)
{
/*
@@ -7527,6 +7673,8 @@ wait_handle_signals(Process *c_p)
c_p->sig_qs.flags &= ~FS_WAIT_HANDLE_SIGS;
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
+
+ return erts_atomic32_read_mb(&c_p->state);
}
static void
@@ -7708,6 +7856,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_PROCESS_INFO:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default:
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index b3aefff0b0..deb1b802e1 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -110,7 +110,7 @@ typedef struct {
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 18
+#define ERTS_SIG_Q_OP_MAX 19
#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
@@ -130,7 +130,8 @@ typedef struct {
#define ERTS_SIG_Q_OP_ALIAS_MSG 15
#define ERTS_SIG_Q_OP_RECV_MARK 16
#define ERTS_SIG_Q_OP_UNLINK_ACK 17
-#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_ADJ_MSGQ 18
+#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)
@@ -1133,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to);
*
* @param[out] statep Pointer to process state after
* signal handling. May not be NULL.
+ * The state should recently have
+ * been updated before calling
+ * this function.
*
* @param[in,out] redsp Pointer to an integer containing
* reductions. On input, the amount
@@ -1253,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls,
int neg_o_reds, ErtsMessage **msgpp,
int *get_outp);
+/*
+ * CLEAN_SIGQ - Flush until middle queue is empty, i.e.
+ * the content of inner+middle queue equals
+ * the message queue.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0)
+/*
+ * FROM_ALL - Flush signals from all local senders (processes
+ * and ports).
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1)
+/*
+ * FROM_ID - Flush signals from process or port identified
+ * by 'id'.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2)
+
+/*
+ * All erts_proc_sig_init_flush_signals() flags.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLGS \
+ (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+
+/**
+ *
+ * @brief Initialize flush of signals from another process or port
+ *
+ * Inserts a flush signal in the outer signal queue of
+ * current process and sets the FS_FLUSHING_SIGS flag in
+ * 'c_p->sig_qs.flags'. When the flush signal has been
+ * handled the FS_FLUSHED_SIGS flag is set as well.
+ *
+ * While the flushing is ongoing the process *should* only
+ * handle incoming signals and not execute Erlang code. When
+ * the functionality that initiated the flush detects that
+ * the flush is done by the FS_FLUSHED_SIGS flag being set,
+ * it should clear both the FS_FLUSHED_SIGS flag and the
+ * FS_FLUSHING_SIGS flag.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ * flags Flags indicating how to flush.
+ * (see above).
+ * from Identifier of sender to flush
+ * signals from in case the
+ * FROM_ID flag is set.
+ */
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from);
+
/**
*
* @brief Fetch signals from the outer queue
@@ -1370,12 +1426,16 @@ typedef struct {
*
* @param[in] mip Pointer to array of
* ErtsMessageInfo structures.
+ *
+ * @param[out] msgq_lenp Pointer to integer containing
+ * amount of messages.
*/
Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip);
+ ErtsMessageInfo *mip,
+ Sint *msgq_lenp);
/**
*
@@ -1725,6 +1785,10 @@ erts_proc_sig_fetch(Process *proc)
== (ERTS_PROC_LOCK_MAIN
| ERTS_PROC_LOCK_MSGQ)));
+ ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS)
+ || ERTS_PROC_IS_EXITING(proc)
+ || ERTS_IS_CRASH_DUMPING);
+
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc);
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index ff81b453a7..393de2b7b1 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -10063,27 +10063,30 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
/* On normal scheduler */
if (state & ERTS_PSFLG_RUNNING_SYS) {
if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
- int local_only = (!!(p->sig_qs.flags & FS_LOCAL_SIGS_ONLY)
- & !(state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLGS_DIRTY_WORK)));
- if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) {
- int sig_reds;
- /*
- * If we have dirty work scheduled we allow
- * usage of all reductions since we need to
- * handle all signals before doing dirty
- * work...
- */
- if (state & ERTS_PSFLGS_DIRTY_WORK)
- sig_reds = reds;
- else
- sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED;
- (void) erts_proc_sig_handle_incoming(p,
- &state,
- &sig_reds,
- sig_reds,
- local_only);
- reds -= sig_reds;
- }
+ int sig_reds;
+ /*
+ * If we have dirty work scheduled we allow
+ * usage of all reductions since we need to
+ * handle all signals before doing dirty
+ * work...
+ *
+ * If a BIF is flushing signals, we also allow
+ * usage of all reductions since the BIF cannot
+ * continue exectution until the flush
+ * completes...
+ */
+ if (state & ERTS_PSFLGS_DIRTY_WORK)
+ sig_reds = reds;
+ else if (p->sig_qs.flags & FS_FLUSHING_SIGS)
+ sig_reds = reds;
+ else
+ sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED;
+ (void) erts_proc_sig_handle_incoming(p,
+ &state,
+ &sig_reds,
+ sig_reds,
+ 0);
+ reds -= sig_reds;
}
if ((state & (ERTS_PSFLG_SYS_TASKS
| ERTS_PSFLG_EXITING)) == ERTS_PSFLG_SYS_TASKS) {
@@ -10093,8 +10096,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
* hand written beam assembly in
* prim_eval:'receive'. If GC is delayed we are
* not allowed to execute system tasks.
+ *
+ * We also don't allow execution of system tasks
+ * if a BIF is flushing signals, since there are
+ * system tasks that might need to fetch from the
+ * outer signal queue...
*/
- if (!(p->flags & F_DELAY_GC)) {
+ if (!(p->flags & F_DELAY_GC)
+ && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) {
int cost = execute_sys_tasks(p, &state, reds);
calls += cost;
reds -= cost;
@@ -10622,15 +10631,19 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
break;
case ERTS_PSTT_PRIO_SIG: {
erts_aint32_t fail_state, state;
- int sig_res, sig_reds = reds;
+ int sig_res, sig_reds;
st_res = am_false;
+ ASSERT(!(c_p->sig_qs.flags & FS_FLUSHING_SIGS));
+
if (st->arg[0] == am_false) {
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ st->arg[0] = am_true;
}
+ state = erts_atomic32_read_nob(&c_p->state);
sig_reds = reds;
sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds,
reds, !0);
@@ -10644,8 +10657,6 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
if (sig_res)
break;
- st->arg[0] = am_true;
-
fail_state = ERTS_PSFLG_EXITING;
if (schedule_process_sys_task(c_p, st_prio, st, &fail_state)) {
@@ -10656,6 +10667,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
state = erts_atomic32_read_nob(&c_p->state);
exit_permanent_prio_elevation(c_p, state, st_prio);
}
+
break;
}
case ERTS_PSTT_TEST:
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 81958e373a..0459aeaba5 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -1246,8 +1246,9 @@ void erts_check_for_holes(Process* p);
process table. Always ACTIVE while EXITING. Never
SUSPENDED unless also FREE. */
#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5)
-/* UNUSED */
-#define ERTS_PSFLG_UNUSED ERTS_PSFLG_BIT(6)
+/* MAYBE_SELF_SIGS - We might have outstanding signals
+ from ourselves to ourselvs. */
+#define ERTS_PSFLG_MAYBE_SELF_SIGS ERTS_PSFLG_BIT(6)
/* ACTIVE - Process "wants" to execute */
#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7)
/* IN_RUNQ - Real process (not proxy) struct used in a
@@ -1572,10 +1573,12 @@ extern int erts_system_profile_ts_type;
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
#define FS_ON_HEAP_MSGQ (1 << 1) /* On heap msg queue */
#define FS_OFF_HEAP_MSGQ_CHNG (1 << 2) /* Off heap msg queue changing */
-#define FS_LOCAL_SIGS_ONLY (1 << 3) /* Handle privq sigs only */
+#define FS_UNUSED (1 << 3) /* Unused */
#define FS_HANDLING_SIGS (1 << 4) /* Process is handling signals */
#define FS_WAIT_HANDLE_SIGS (1 << 5) /* Process is waiting to handle signals */
#define FS_DELAYED_PSIGQS_LEN (1 << 6) /* Delayed update of sig_qs.len */
+#define FS_FLUSHING_SIGS (1 << 7) /* Currently flushing signals */
+#define FS_FLUSHED_SIGS (1 << 8) /* Flushing of signals completed */
/*
* F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent
diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c
index f95c7ad1d7..9fa5969b5e 100644
--- a/erts/emulator/beam/erl_process_dump.c
+++ b/erts/emulator/beam/erl_process_dump.c
@@ -159,9 +159,12 @@ Uint erts_process_memory(Process *p, int include_sigs_in_transit)
* Size of message queue plus size of all signals
* in transit to the process!
*/
- erts_proc_sig_queue_lock(p);
- erts_proc_sig_fetch(p);
- erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ if (!(p->sig_qs.flags & FS_FLUSHING_SIGS)
+ || ERTS_IS_CRASH_DUMPING) {
+ erts_proc_sig_queue_lock(p);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
ERTS_FOREACH_SIG_PRIVQS(
p, mp,
@@ -228,7 +231,9 @@ dump_process_info(fmtfn_t to, void *to_arg, Process *p)
if (ERTS_TRACE_FLAGS(p) & F_SENSITIVE)
return;
- erts_proc_sig_fetch(p);
+ if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) || ERTS_IS_CRASH_DUMPING) {
+ erts_proc_sig_fetch(p);
+ }
if (p->sig_qs.first || p->sig_qs.cont) {
erts_print(to, to_arg, "=proc_messages:%T\n", p->common.id);
@@ -1123,8 +1128,8 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg)
erts_print(to, to_arg, "FREE"); break;
case ERTS_PSFLG_EXITING:
erts_print(to, to_arg, "EXITING"); break;
- case ERTS_PSFLG_UNUSED:
- erts_print(to, to_arg, "UNUSED"); break;
+ case ERTS_PSFLG_MAYBE_SELF_SIGS:
+ erts_print(to, to_arg, "MAYBE_SELF_SIGS"); break;
case ERTS_PSFLG_ACTIVE:
erts_print(to, to_arg, "ACTIVE"); break;
case ERTS_PSFLG_IN_RUNQ:
diff --git a/erts/emulator/test/bif_SUITE.erl b/erts/emulator/test/bif_SUITE.erl
index c5be79528b..ee6c494145 100644
--- a/erts/emulator/test/bif_SUITE.erl
+++ b/erts/emulator/test/bif_SUITE.erl
@@ -37,6 +37,7 @@
error_stacktrace_during_call_trace/1,
group_leader_prio/1, group_leader_prio_dirty/1,
is_process_alive/1,
+ is_process_alive_signal_from/1,
process_info_blast/1,
os_env_case_sensitivity/1,
verify_middle_queue_save/1,
@@ -57,7 +58,8 @@ all() ->
erl_crash_dump_bytes, min_max, erlang_halt, is_builtin,
error_stacktrace, error_stacktrace_during_call_trace,
group_leader_prio, group_leader_prio_dirty,
- is_process_alive, process_info_blast, os_env_case_sensitivity,
+ is_process_alive, is_process_alive_signal_from,
+ process_info_blast, os_env_case_sensitivity,
verify_middle_queue_save, test_length,fixed_apply_badarg,
external_fun_apply3].
@@ -1205,6 +1207,51 @@ is_process_alive(Config) when is_list(Config) ->
Ps),
ok.
+is_process_alive_signal_from(Config) when is_list(Config) ->
+ process_flag(priority, high),
+ process_flag(scheduler, 1),
+ Schdlr = case erlang:system_info(schedulers_online) of
+ 1 -> 1;
+ _ -> 2
+ end,
+ X = is_process_alive_signal_from_test(100000, 0, Schdlr),
+ erlang:display({exits_detected, X}),
+ {comment, integer_to_list(X) ++ " exited processes detected"}.
+
+is_process_alive_signal_from_test(0, X, _Schdlr) ->
+ X;
+is_process_alive_signal_from_test(N, X, Schdlr) ->
+ Tester = self(),
+ {Testee, TMon} = spawn_opt(fun () ->
+ Mon = erlang:monitor(process, Tester),
+ Tester ! {self(), ready},
+ busy_wait_go(),
+ _ = erlang:demonitor(Mon),
+ exit(normal)
+ end,
+ [link,
+ monitor,
+ {priority, high},
+ {scheduler, Schdlr}]),
+ receive {Testee, ready} -> ok end,
+ {monitored_by, MBList1} = process_info(self(), monitored_by),
+ true = lists:member(Testee, MBList1),
+ erlang:yield(),
+ Testee ! {go, ok},
+ erlang:yield(),
+ NewX = case erlang:is_process_alive(Testee) of
+ true ->
+ X;
+ false ->
+ %% Demonitor signal should have reached us before the
+ %% is-process-alive reply...
+ {monitored_by, MBList2} = process_info(self(), monitored_by),
+ false = lists:member(Testee, MBList2),
+ X+1
+ end,
+ receive {'DOWN', TMon, process, Testee, normal} -> ok end,
+ is_process_alive_signal_from_test(N-1, NewX, Schdlr).
+
process_info_blast(Config) when is_list(Config) ->
Tester = self(),
NoAttackers = 1000,
@@ -1396,7 +1443,16 @@ external_fun_apply3(_Config) ->
ok.
%% helpers
-
+
+busy_wait_go() ->
+ receive
+ {go, Info} ->
+ Info
+ after
+ 0 ->
+ busy_wait_go()
+ end.
+
id(I) -> I.
%% Get code path, including the path for the erts application.
diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl
index 4aa9cf6b9f..2f28b4a0e9 100644
--- a/erts/emulator/test/nif_SUITE.erl
+++ b/erts/emulator/test/nif_SUITE.erl
@@ -1152,6 +1152,7 @@ monitor_process_c(Config) ->
Pid = spawn_link(fun() ->
R_ptr = alloc_monitor_resource_nif(),
{0,Mon} = monitor_process_nif(R_ptr, self(), true, Papa),
+ receive after 1000 -> ok end,
[R_ptr] = monitored_by(self()),
put(store, make_resource(R_ptr)),
ok = release_resource(R_ptr),
@@ -1159,8 +1160,8 @@ monitor_process_c(Config) ->
Papa ! {self(), done, R_ptr, Mon},
exit
end),
- [{Pid, done, R_ptr, Mon1},
- {monitor_resource_down, R_ptr, Pid, Mon2}] = flush(2),
+ receive {Pid, done, R_ptr, Mon1} -> ok end,
+ [{monitor_resource_down, R_ptr, Pid, Mon2}] = flush(1),
compare_monitors_nif(Mon1, Mon2),
{R_ptr, _, 1} = last_resource_dtor_call(),
ok.
diff --git a/erts/emulator/test/port_SUITE.erl b/erts/emulator/test/port_SUITE.erl
index 4a3ecef397..1fa6922648 100644
--- a/erts/emulator/test/port_SUITE.erl
+++ b/erts/emulator/test/port_SUITE.erl
@@ -1915,6 +1915,7 @@ otp_5112(Config) when is_list(Config) ->
true = lists:member(Port, Links1),
Port ! {self(), {command, ""}},
wait_until(fun () -> lists:member(Port, erlang:ports()) == false end),
+ receive after 1000 -> ok end, %% Give signal some time to propagate...
{links, Links2} = process_info(self(),links),
io:format("Links2: ~p~n",[Links2]),
false = lists:member(Port, Links2), %% This used to fail
diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl
index 0e549d424c..23a208adb2 100644
--- a/erts/emulator/test/process_SUITE.erl
+++ b/erts/emulator/test/process_SUITE.erl
@@ -49,6 +49,10 @@
process_info_smoke_all/1,
process_info_status_handled_signal/1,
process_info_reductions/1,
+ process_info_self_signal/1,
+ process_info_self_msgq_len/1,
+ process_info_self_msgq_len_messages/1,
+ process_info_self_msgq_len_more/1,
bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1,
otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1,
garbage_collect/1, otp_6237/1,
@@ -107,21 +111,8 @@ suite() ->
all() ->
[spawn_with_binaries, t_exit_1, {group, t_exit_2},
trap_exit_badarg, trap_exit_badarg_in_bif,
- t_process_info, process_info_other, process_info_other_msg,
- process_info_other_dist_msg, process_info_other_status,
- process_info_2_list,
- process_info_lock_reschedule,
- process_info_lock_reschedule2,
- process_info_lock_reschedule3,
- process_info_other_message_queue_len_signal_race,
- process_info_garbage_collection,
- process_info_parent,
- process_info_smoke_all,
- process_info_status_handled_signal,
- process_info_reductions,
bump_reductions, low_prio, yield, yield2, otp_4725,
- dist_unlink_ack_exit_leak,
- bad_register, garbage_collect, process_info_messages,
+ dist_unlink_ack_exit_leak, bad_register, garbage_collect,
process_flag_badarg,
process_flag_fullsweep_after, process_flag_heap_size,
command_line_max_heap_size,
@@ -129,6 +120,7 @@ all() ->
spawn_huge_arglist,
otp_6237,
{group, spawn_request},
+ {group, process_info_bif},
{group, processes_bif},
{group, otp_7738}, garb_other_running,
{group, system_task},
@@ -159,6 +151,24 @@ groups() ->
processes_small_tab, processes_this_tab,
processes_last_call_trap, processes_apply_trap,
processes_gc_trap, processes_term_proc_list]},
+ {process_info_bif, [],
+ [t_process_info, process_info_messages,
+ process_info_other, process_info_other_msg,
+ process_info_other_message_queue_len_signal_race,
+ process_info_other_dist_msg, process_info_other_status,
+ process_info_2_list,
+ process_info_lock_reschedule,
+ process_info_lock_reschedule2,
+ process_info_lock_reschedule3,
+ process_info_garbage_collection,
+ process_info_parent,
+ process_info_smoke_all,
+ process_info_status_handled_signal,
+ process_info_reductions,
+ process_info_self_signal,
+ process_info_self_msgq_len,
+ process_info_self_msgq_len_messages,
+ process_info_self_msgq_len_more]},
{otp_7738, [],
[otp_7738_waiting, otp_7738_suspended,
otp_7738_resume]},
@@ -1351,6 +1361,146 @@ pi_reductions_main_unlocker_loop(Other) ->
erlang:yield(),
pi_reductions_main_unlocker_loop(Other).
+process_info_self_signal(Config) when is_list(Config) ->
+ %% Test that signals that we have sent to ourselves are
+ %% visible in process_info() result. This is not strictly
+ %% a necessary property, but implemented so now. See
+ %% process_info.c:process_info_bif() for more info.
+ Self = self(),
+ Ref = make_ref(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_signal_spammer(Self)
+ end,
+ fun () ->
+ self() ! Ref,
+ process_info(self(), messages)
+ end,
+ fun (Res) ->
+ {messages, [Ref]} = Res
+ end).
+
+process_info_self_signal_spammer(To) ->
+ erlang:demonitor(erlang:monitor(process, To)),
+ process_info_self_signal_spammer(To).
+
+process_info_self_msgq_len(Config) when is_list(Config) ->
+ %% Spam ourselves with signals forcing us to flush own
+ %% signal queue..
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun () ->
+ process_info(self(), message_queue_len)
+ end,
+ fun (Res) ->
+ {message_queue_len, Len} = Res,
+ true = Len > 0,
+ ok
+ end).
+
+
+process_info_self_msgq_len_messages(Config) when is_list(Config) ->
+ %% Spam ourselves with signals normally forcing us to flush own
+ %% signal queue, but since we also want messages wont be flushed...
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self, 100000)
+ end,
+ fun () ->
+ process_info(self(),
+ [message_queue_len,
+ messages])
+ end,
+ fun (Res) ->
+ [{message_queue_len, Len},
+ {messages, Msgs}] = Res,
+ Len = length(Msgs),
+ ok
+ end).
+
+process_info_self_msgq_len_more(Config) when is_list(Config) ->
+ self() ! hej,
+ BodyRes = process_info_self_msgq_len_more_caller_body(),
+ ok = process_info_self_msgq_len_more_caller_body_result(BodyRes),
+ TailRes = process_info_self_msgq_len_more_caller_tail(),
+ ok = process_info_self_msgq_len_more_caller_tail_result(TailRes),
+ receive hej -> ok end,
+ %% Check that current_function, current_location, and
+ %% current_stacktrace give sane results flushing or not...
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun process_info_self_msgq_len_more_caller_body/0,
+ fun process_info_self_msgq_len_more_caller_body_result/1),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun process_info_self_msgq_len_more_caller_tail/0,
+ fun process_info_self_msgq_len_more_caller_tail_result/1).
+
+process_info_self_msgq_len_more_caller_body() ->
+ Res = process_info(self(),
+ [message_queue_len,
+ current_function,
+ current_location,
+ current_stacktrace]),
+ id(Res).
+
+process_info_self_msgq_len_more_caller_body_result(Res) ->
+ [{message_queue_len, Len},
+ {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_body,0}},
+ {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_body,0,_}},
+ {current_stacktrace,
+ [{process_SUITE,process_info_self_msgq_len_more_caller_body,0,_} | _]}] = Res,
+ true = Len > 0,
+ ok.
+
+process_info_self_msgq_len_more_caller_tail() ->
+ process_info(self(),
+ [message_queue_len,
+ current_function,
+ current_location,
+ current_stacktrace]).
+
+process_info_self_msgq_len_more_caller_tail_result(Res) ->
+ [{message_queue_len, Len},
+ {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0}},
+ {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_}},
+ {current_stacktrace,
+ [{process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_} | _]}] = Res,
+ true = Len > 0,
+ ok.
+
+
+process_info_self_msgq_len_spammer(To) ->
+ process_info_self_msgq_len_spammer(To, 10000000).
+
+process_info_self_msgq_len_spammer(_To, 0) ->
+ ok;
+process_info_self_msgq_len_spammer(To, N) ->
+ To ! hejhopp,
+ erlang:demonitor(erlang:monitor(process, To)),
+ process_info_self_msgq_len_spammer(To, N-1).
+
+pi_sig_spam_test(SpamFun, PITest, PICheckRes) ->
+ SO = erlang:system_flag(schedulers_online, 1),
+ try
+ Self = self(),
+ SigSpammer = spawn_link(SpamFun),
+ process_flag(priority, low),
+ receive after 10 -> ok end,
+ Res = PITest(),
+ process_flag(priority, high),
+ unlink(SigSpammer),
+ exit(SigSpammer, kill),
+ false = is_process_alive(SigSpammer),
+ PICheckRes(Res)
+ after
+ _ = erlang:system_flag(schedulers_online, SO)
+ end.
+
%% Tests erlang:bump_reductions/1.
bump_reductions(Config) when is_list(Config) ->