diff options
author | Rickard Green <rickard@erlang.org> | 2021-05-11 19:17:32 +0200 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2022-07-13 19:09:44 +0200 |
commit | ac579bbec058b313af19de4a6d536c6ab446bfed (patch) | |
tree | 406f890da197933180a7c6f860686aa505e75b8d /erts/emulator/beam/erl_proc_sig_queue.c | |
parent | e053340291b303064023707aff03f297cf219552 (diff) | |
download | erlang-ac579bbec058b313af19de4a6d536c6ab446bfed.tar.gz |
[erts] Improve flushing of signals
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 181 |
1 files changed, 165 insertions, 16 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 7cb5f9e491..bfddf9e48f 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -220,7 +220,7 @@ typedef struct { Eterm request_id; } ErtsCLAData; -static void wait_handle_signals(Process *c_p); +static erts_aint32_t wait_handle_signals(Process *c_p); static void wake_handle_signals(Process *proc); static int handle_msg_tracing(Process *c_p, @@ -729,7 +729,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, /* Tracing requires sender for local procs and ports. The assertions below * will not catch errors after time-of-death, but ought to find most * problems. */ - ASSERT(sender != NULL || + ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH || (is_normal_sched && esdp->pending_signal.sig == sig) || (!(is_internal_pid(from) && erts_proc_lookup(from) != NULL) && @@ -824,6 +824,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, sigp = &first; first_last_done: + + if ((void *) sender == (void *) rp) + (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); + sig->common.specific.next = NULL; /* may add signals before sig */ @@ -2751,6 +2756,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to) } } +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) +{ + int force_flush_buffers = 0, enqueue_mq, fetch_sigs; + ErtsSignal *sig; + + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS))); + ASSERT(flags); + ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0); + ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + || is_internal_pid(id) || is_internal_port(id)); + + sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon)); + sig->common.next = NULL; + sig->common.specific.attachment = NULL; + sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + switch (flags) { + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL: + id = c_p->common.id; + force_flush_buffers = !0; + /* Fall through... */ + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID: + if (!proc_queue_signal(NULL, id, c_p->common.id, sig, + force_flush_buffers, ERTS_SIG_Q_OP_FLUSH)) + ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves"); + enqueue_mq = 0; + fetch_sigs = !0; + break; + case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ: + enqueue_mq = !0; + fetch_sigs = 0; + break; + default: + enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ); + fetch_sigs = !0; + break; + } + + erts_set_gc_state(c_p, 0); + + if (fetch_sigs) { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } + + c_p->sig_qs.flags |= FS_FLUSHING_SIGS; + + if (enqueue_mq) { + if (!c_p->sig_qs.cont) { + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + } + else { + if (!c_p->sig_qs.nmsigs.last) { + ASSERT(!c_p->sig_qs.nmsigs.next); + c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last; + } + else { + ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last; + ASSERT(c_p->sig_qs.nmsigs.next); + ASSERT(lsig && !lsig->common.specific.next); + lsig->common.specific.next = c_p->sig_qs.cont_last; + } + + c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last; + *c_p->sig_qs.cont_last = (ErtsMessage *) sig; + c_p->sig_qs.cont_last = &sig->common.next; + } + } + +} + static int handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp) { @@ -5149,22 +5231,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, int *redsp, int max_reds, int local_only) { Eterm tag; - erts_aint32_t state; + erts_aint32_t state = *statep; int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS)); if (c_p->sig_qs.flags & FS_HANDLING_SIGS) - wait_handle_signals(c_p); + state = wait_handle_signals(c_p); else c_p->sig_qs.flags |= FS_HANDLING_SIGS; ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); - state = erts_atomic32_read_nob(&c_p->state); - if (!local_only) { + if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { if (ERTS_PSFLG_SIG_IN_Q & state) { erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); @@ -5694,6 +5775,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + case ERTS_SIG_Q_OP_FLUSH: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + remove_nm_sig(c_p, sig, next_nm_sig); + erts_free(ERTS_ALC_T_SIG_DATA, sig); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + /* + * The caller has been exclusively handling signals until this + * point. Break out and let the process continue with other + * things as well... + */ + goto stop; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -5810,7 +5905,7 @@ stop: { *next_nm_sig = &c_p->sig_qs.cont; if (c_p->sig_qs.nmsigs.last == tracing.messages.next) c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); } else { ASSERT(!c_p->sig_qs.nmsigs.next); @@ -5818,7 +5913,6 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; } if (tracing.messages.next != &c_p->sig_qs.cont) { @@ -5864,7 +5958,7 @@ stop: { ASSERT(c_p->sig_qs.cont); - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); res = 0; } @@ -5897,10 +5991,21 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; res = !0; } + if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS) + & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) { + /* + * We know we do not have any outstanding signals + * from ourselves... + */ + (void) erts_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_MAYBE_SELF_SIGS); + state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS; + } + *statep = state; + /* Ensure that 'save' doesn't point to a receive marker... */ if (*c_p->sig_qs.save && ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) { @@ -6183,6 +6288,12 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, } break; + case ERTS_SIG_Q_OP_FLUSH: + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: destroy_trace_info((ErtsSigTraceInfo *) sig); break; @@ -6280,6 +6391,7 @@ clear_seq_trace_token(ErtsMessage *sig) case ERTS_SIG_Q_OP_RPC: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_ADJ_MSGQ: + case ERTS_SIG_Q_OP_FLUSH: break; default: @@ -6292,8 +6404,33 @@ clear_seq_trace_token(ErtsMessage *sig) void erts_proc_sig_clear_seq_trace_tokens(Process *c_p) { - erts_proc_sig_fetch(c_p); - ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig)); + int ix; + ErtsSignalInQueueBufferArray *bap; + int unget_info; + ErtsMessage *qs[] = {c_p->sig_qs.first, + c_p->sig_qs.cont, + c_p->sig_inq.first}; + + ERTS_LC_ASSERT(erts_thr_progress_is_blocking()); + + for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) { + ErtsMessage *sigp; + for (sigp = qs[ix]; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + + bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info); + if (bap) { + for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) { + ErtsSignalInQueueBuffer* bp = &bap->slots[ix]; + if (bp->b.alive) { + ErtsMessage *sigp; + for (sigp = bp->b.queue.first; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + } + erts_proc_sig_queue_unget_buffers(bap, unget_info); + } } Uint @@ -6419,6 +6556,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; } + case ERTS_SIG_Q_OP_FLUSH: + size = sizeof(ErtsSignalCommon); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: size = sizeof(ErtsSigTraceInfo); break; @@ -6537,6 +6678,7 @@ erts_proc_sig_receive_helper(Process *c_p, if (max_reds < reds) max_reds = reds; #endif + state = erts_atomic32_read_nob(&c_p->state); (void) erts_proc_sig_handle_incoming(c_p, &state, &reds, max_reds, !0); consumed_reds += reds; @@ -7246,7 +7388,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip) + ErtsMessageInfo *mip, + Sint *msgq_len_p) { Uint tot_heap_size; ErtsMessage *mp, **mpp; @@ -7320,7 +7463,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, mp = mp->next; } - ASSERT(c_p->sig_qs.len == i); + + ASSERT(info_on_self || c_p->sig_qs.len == i); + ASSERT(!info_on_self || c_p->sig_qs.len >= i); + + *msgq_len_p = i; return tot_heap_size; } @@ -7409,7 +7556,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1) BIF_RET(am_normal); /* will handle signals itself... */ } else { - erts_aint32_t state; int done; Eterm res = am_false; int reds = 0; @@ -7477,7 +7623,7 @@ erts_proc_sig_cleanup_queues(Process *c_p) /* Debug */ -static void +static erts_aint32_t wait_handle_signals(Process *c_p) { /* @@ -7527,6 +7673,8 @@ wait_handle_signals(Process *c_p) c_p->sig_qs.flags &= ~FS_WAIT_HANDLE_SIGS; c_p->sig_qs.flags |= FS_HANDLING_SIGS; + + return erts_atomic32_read_mb(&c_p->state); } static void @@ -7708,6 +7856,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_PROCESS_INFO: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK: + case ERTS_SIG_Q_OP_FLUSH: break; default: |