summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2021-05-11 19:17:32 +0200
committerRickard Green <rickard@erlang.org>2022-07-13 19:09:44 +0200
commitac579bbec058b313af19de4a6d536c6ab446bfed (patch)
tree406f890da197933180a7c6f860686aa505e75b8d /erts/emulator/beam/erl_proc_sig_queue.c
parente053340291b303064023707aff03f297cf219552 (diff)
downloaderlang-ac579bbec058b313af19de4a6d536c6ab446bfed.tar.gz
[erts] Improve flushing of signals
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c181
1 files changed, 165 insertions, 16 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 7cb5f9e491..bfddf9e48f 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -220,7 +220,7 @@ typedef struct {
Eterm request_id;
} ErtsCLAData;
-static void wait_handle_signals(Process *c_p);
+static erts_aint32_t wait_handle_signals(Process *c_p);
static void wake_handle_signals(Process *proc);
static int handle_msg_tracing(Process *c_p,
@@ -729,7 +729,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
/* Tracing requires sender for local procs and ports. The assertions below
* will not catch errors after time-of-death, but ought to find most
* problems. */
- ASSERT(sender != NULL ||
+ ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH ||
(is_normal_sched && esdp->pending_signal.sig == sig) ||
(!(is_internal_pid(from) &&
erts_proc_lookup(from) != NULL) &&
@@ -824,6 +824,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
sigp = &first;
first_last_done:
+
+ if ((void *) sender == (void *) rp)
+ (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
+
sig->common.specific.next = NULL;
/* may add signals before sig */
@@ -2751,6 +2756,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to)
}
}
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id)
+{
+ int force_flush_buffers = 0, enqueue_mq, fetch_sigs;
+ ErtsSignal *sig;
+
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS)));
+ ASSERT(flags);
+ ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0);
+ ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+ || is_internal_pid(id) || is_internal_port(id));
+
+ sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon));
+ sig->common.next = NULL;
+ sig->common.specific.attachment = NULL;
+ sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ switch (flags) {
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL:
+ id = c_p->common.id;
+ force_flush_buffers = !0;
+ /* Fall through... */
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID:
+ if (!proc_queue_signal(NULL, id, c_p->common.id, sig,
+ force_flush_buffers, ERTS_SIG_Q_OP_FLUSH))
+ ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves");
+ enqueue_mq = 0;
+ fetch_sigs = !0;
+ break;
+ case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ:
+ enqueue_mq = !0;
+ fetch_sigs = 0;
+ break;
+ default:
+ enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ);
+ fetch_sigs = !0;
+ break;
+ }
+
+ erts_set_gc_state(c_p, 0);
+
+ if (fetch_sigs) {
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
+
+ c_p->sig_qs.flags |= FS_FLUSHING_SIGS;
+
+ if (enqueue_mq) {
+ if (!c_p->sig_qs.cont) {
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ }
+ else {
+ if (!c_p->sig_qs.nmsigs.last) {
+ ASSERT(!c_p->sig_qs.nmsigs.next);
+ c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last;
+ }
+ else {
+ ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last;
+ ASSERT(c_p->sig_qs.nmsigs.next);
+ ASSERT(lsig && !lsig->common.specific.next);
+ lsig->common.specific.next = c_p->sig_qs.cont_last;
+ }
+
+ c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last;
+ *c_p->sig_qs.cont_last = (ErtsMessage *) sig;
+ c_p->sig_qs.cont_last = &sig->common.next;
+ }
+ }
+
+}
+
static int
handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
{
@@ -5149,22 +5231,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
int *redsp, int max_reds, int local_only)
{
Eterm tag;
- erts_aint32_t state;
+ erts_aint32_t state = *statep;
int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS));
if (c_p->sig_qs.flags & FS_HANDLING_SIGS)
- wait_handle_signals(c_p);
+ state = wait_handle_signals(c_p);
else
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
- state = erts_atomic32_read_nob(&c_p->state);
- if (!local_only) {
+ if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
if (ERTS_PSFLG_SIG_IN_Q & state) {
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
@@ -5694,6 +5775,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ case ERTS_SIG_Q_OP_FLUSH:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ /*
+ * The caller has been exclusively handling signals until this
+ * point. Break out and let the process continue with other
+ * things as well...
+ */
+ goto stop;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -5810,7 +5905,7 @@ stop: {
*next_nm_sig = &c_p->sig_qs.cont;
if (c_p->sig_qs.nmsigs.last == tracing.messages.next)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
}
else {
ASSERT(!c_p->sig_qs.nmsigs.next);
@@ -5818,7 +5913,6 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
}
if (tracing.messages.next != &c_p->sig_qs.cont) {
@@ -5864,7 +5958,7 @@ stop: {
ASSERT(c_p->sig_qs.cont);
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
res = 0;
}
@@ -5897,10 +5991,21 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
res = !0;
}
+ if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS)
+ & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) {
+ /*
+ * We know we do not have any outstanding signals
+ * from ourselves...
+ */
+ (void) erts_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_MAYBE_SELF_SIGS);
+ state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS;
+ }
+ *statep = state;
+
/* Ensure that 'save' doesn't point to a receive marker... */
if (*c_p->sig_qs.save
&& ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) {
@@ -6183,6 +6288,12 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
}
break;
+ case ERTS_SIG_Q_OP_FLUSH:
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
@@ -6280,6 +6391,7 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_RPC:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_ADJ_MSGQ:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default:
@@ -6292,8 +6404,33 @@ clear_seq_trace_token(ErtsMessage *sig)
void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p)
{
- erts_proc_sig_fetch(c_p);
- ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig));
+ int ix;
+ ErtsSignalInQueueBufferArray *bap;
+ int unget_info;
+ ErtsMessage *qs[] = {c_p->sig_qs.first,
+ c_p->sig_qs.cont,
+ c_p->sig_inq.first};
+
+ ERTS_LC_ASSERT(erts_thr_progress_is_blocking());
+
+ for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) {
+ ErtsMessage *sigp;
+ for (sigp = qs[ix]; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+
+ bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info);
+ if (bap) {
+ for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) {
+ ErtsSignalInQueueBuffer* bp = &bap->slots[ix];
+ if (bp->b.alive) {
+ ErtsMessage *sigp;
+ for (sigp = bp->b.queue.first; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+ }
+ erts_proc_sig_queue_unget_buffers(bap, unget_info);
+ }
}
Uint
@@ -6419,6 +6556,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
}
+ case ERTS_SIG_Q_OP_FLUSH:
+ size = sizeof(ErtsSignalCommon);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
size = sizeof(ErtsSigTraceInfo);
break;
@@ -6537,6 +6678,7 @@ erts_proc_sig_receive_helper(Process *c_p,
if (max_reds < reds)
max_reds = reds;
#endif
+ state = erts_atomic32_read_nob(&c_p->state);
(void) erts_proc_sig_handle_incoming(c_p, &state, &reds,
max_reds, !0);
consumed_reds += reds;
@@ -7246,7 +7388,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip)
+ ErtsMessageInfo *mip,
+ Sint *msgq_len_p)
{
Uint tot_heap_size;
ErtsMessage *mp, **mpp;
@@ -7320,7 +7463,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
mp = mp->next;
}
- ASSERT(c_p->sig_qs.len == i);
+
+ ASSERT(info_on_self || c_p->sig_qs.len == i);
+ ASSERT(!info_on_self || c_p->sig_qs.len >= i);
+
+ *msgq_len_p = i;
return tot_heap_size;
}
@@ -7409,7 +7556,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)
BIF_RET(am_normal); /* will handle signals itself... */
}
else {
- erts_aint32_t state;
int done;
Eterm res = am_false;
int reds = 0;
@@ -7477,7 +7623,7 @@ erts_proc_sig_cleanup_queues(Process *c_p)
/* Debug */
-static void
+static erts_aint32_t
wait_handle_signals(Process *c_p)
{
/*
@@ -7527,6 +7673,8 @@ wait_handle_signals(Process *c_p)
c_p->sig_qs.flags &= ~FS_WAIT_HANDLE_SIGS;
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
+
+ return erts_atomic32_read_mb(&c_p->state);
}
static void
@@ -7708,6 +7856,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_PROCESS_INFO:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default: