diff options
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.h')
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 385 |
1 files changed, 298 insertions, 87 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 66ee35d1f7..60c7af5e66 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2018-2021. All Rights Reserved. + * Copyright Ericsson AB 2018-2023. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,6 +91,9 @@ #if 0 # define ERTS_PROC_SIG_HARD_DEBUG_RECV_MARKER #endif +#if 0 +# define ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS +#endif struct erl_mesg; struct erl_dist_external; @@ -107,7 +110,7 @@ typedef struct { * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 18 +#define ERTS_SIG_Q_OP_MAX 19 #define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */ #define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/ @@ -127,7 +130,8 @@ typedef struct { #define ERTS_SIG_Q_OP_ALIAS_MSG 15 #define ERTS_SIG_Q_OP_RECV_MARK 16 #define ERTS_SIG_Q_OP_UNLINK_ACK 17 -#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_ADJ_MSGQ 18 +#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10) @@ -238,6 +242,27 @@ void erl_proc_sig_hdbg_chk_recv_marker_block(struct process *c_p); #include "erl_process.h" #include "erl_bif_unique.h" + +void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state); +void erts_proc_sig_queue_flush_and_deinstall_buffers(Process* proc); +void erts_proc_sig_queue_flush_buffers(Process* proc); +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers); +void erts_proc_sig_queue_lock(Process* proc); +ErtsSignalInQueueBufferArray* +erts_proc_sig_queue_get_buffers(Process* p, int *need_unread); +void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, + int need_unget); +int erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from, + Process* receiver, + ErtsProcLocks receiver_locks, + ErtsMessage* first, + ErtsMessage** last, + ErtsMessage** last_next, + Uint len, + int is_signal); +int erts_proc_sig_queue_force_buffers(Process*); + #define ERTS_SIG_Q_OP_BITS 8 #define ERTS_SIG_Q_OP_SHIFT 0 #define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1) @@ -304,8 +329,10 @@ struct dist_entry_; * @brief Send an exit signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Identifier of sender. * @@ -322,7 +349,7 @@ struct dist_entry_; * */ void -erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, +erts_proc_sig_send_exit(ErtsPTabElementCommon *sender, Eterm from, Eterm to, Eterm reason, Eterm token, int normal_kills); /** @@ -359,11 +386,26 @@ erts_proc_sig_send_dist_exit(DistEntry *dep, /** * + * @brief Send an exit signal due to a link to a process being + * broken by connection loss. + * + * @param[in] lnk Pointer to link structure + * from the sending side. It + * should contain information + * about receiver. + */ +void +erts_proc_sig_send_link_exit_noconnection(ErtsLink *lnk); + +/** + * * @brief Send an exit signal due to broken link to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Identifier of sender. * @@ -378,16 +420,18 @@ erts_proc_sig_send_dist_exit(DistEntry *dep, * */ void -erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, - Eterm reason, Eterm token); +erts_proc_sig_send_link_exit(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk, Eterm reason, Eterm token); /** * * @brief Send an link signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] to Identifier of receiver. * @@ -405,7 +449,8 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, * */ int -erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk); +erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from, + Eterm to, ErtsLink *lnk); /** * @@ -414,15 +459,15 @@ erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk); * The newly created unlink identifier is to be used in an * unlink operation. * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port. * * @return A new 64-bit unlink identifier * unique in context of the * calling process. The identifier * may be any value but zero. */ -ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); +ERTS_GLB_INLINE +Uint64 erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender); /** * @@ -431,13 +476,10 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); * The structure will contain a newly created unlink * identifier to be used in the operation. * - * @param[in] c_p Pointer to process struct of - * currently executing process - * ('from' is a process - * identifier), or NULL if not - * called in the context of an - * executing process ('from' is - * a port identifier). + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -447,7 +489,7 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); * structure. */ ErtsSigUnlinkOp * -erts_proc_sig_make_unlink_op(Process *c_p, Eterm from); +erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from); /** * @@ -464,8 +506,10 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk); * @brief Send an unlink signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -477,15 +521,18 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk); * receiver. */ Uint64 -erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk); +erts_proc_sig_send_unlink(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk); /** * * @brief Send an unlink acknowledgment signal to a process. * - * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -498,7 +545,7 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk); * signal. */ void -erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from, +erts_proc_sig_send_unlink_ack(ErtsPTabElementCommon *sender, Eterm from, ErtsSigUnlinkOp *sulnk); /** @@ -560,11 +607,6 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id, * This function is used instead of erts_proc_sig_send_unlink_ack() * when the signal arrives via the distribution. * - * @param[in] c_p Pointer to process struct of - * currently executing process or - * NULL if not called in the context - * of an executing process. - * * @param[in] dep Distribution entry of channel * that the signal arrived on. * @@ -575,7 +617,7 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id, * @param[in] id Identifier of unlink operation. */ void -erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, +erts_proc_sig_send_dist_unlink_ack(DistEntry *dep, Uint32 conn_id, Eterm from, Eterm to, Uint64 id); @@ -583,6 +625,14 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, * * @brief Send a monitor down signal to a process. * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). + * + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * * @param[in] mon Pointer to target monitor * structure from the sending * side. It should contain @@ -592,27 +642,48 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, * */ void -erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason); +erts_proc_sig_send_monitor_down(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm reason); /** * * @brief Send a demonitor signal to a process. * - * @param[in] mon Pointer to origin monitor - * structure from the sending - * side. It should contain - * information about receiver. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * - * @param[in] reason Exit reason. + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * + * @param[in] system Whether the sender is considered a + * system service, e.g. a NIF monitor, + * and it's okay to order by `from` + * even when it's not a pid or port. + * + * @param[in] mon Pointer to origin monitor + * structure from the sending + * side. It should contain + * information about receiver. * */ void -erts_proc_sig_send_demonitor(ErtsMonitor *mon); +erts_proc_sig_send_demonitor(ErtsPTabElementCommon *sender, Eterm from, + int system, ErtsMonitor *mon); /** * * @brief Send a monitor signal to a process. * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). + * + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * * @param[in] mon Pointer to target monitor * structure to insert on * receiver side. @@ -630,7 +701,8 @@ erts_proc_sig_send_demonitor(ErtsMonitor *mon); * */ int -erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to); +erts_proc_sig_send_monitor(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm to); /** * @@ -673,27 +745,22 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, * when the signal arrives via the distribution and * no monitor structure is available. * + * @param[in] from Identifier of sender. + * * @param[in] to Identifier of receiver. * * @param[in] ref Reference identifying the monitor. * */ void -erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref); +erts_proc_sig_send_dist_demonitor(Eterm from, Eterm to, Eterm ref); /** * - * @brief Send a persistent monitor triggered signal to a process. - * - * Used by monitors that are not auto disabled such as for - * example 'time_offset' monitors. - * - * @param[in] type Monitor type. + * @brief Send a persistent "node down" monitor signal to a process * * @param[in] key Monitor key. * - * @param[in] from Identifier of sender. - * * @param[in] to Identifier of receiver. * * @param[in] msg Message template. @@ -702,27 +769,25 @@ erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref); * */ void -erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key, - Eterm from, Eterm to, - Eterm msg, Uint msg_sz); +erts_proc_sig_send_monitor_nodes_msg(Eterm key, Eterm to, + Eterm msg, Uint msg_sz); /** * - * @brief Send a trace change signal to a process. + * @brief Send a persistent "time offset changed" monitor signal to a process * - * @param[in] to Identifier of receiver. + * @param[in] key Monitor key. * - * @param[in] on Trace flags to enable. + * @param[in] to Identifier of receiver. * - * @param[in] off Trace flags to disable. + * @param[in] msg Message template. * - * @param[in] tracer Tracer to set. If the non-value, - * tracer will not be changed. + * @param[in] msg_sz Heap size of message template. * */ void -erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off, - Eterm tracer); +erts_proc_sig_send_monitor_time_offset_msg(Eterm key, Eterm to, + Eterm msg, Uint msg_sz); /** * @@ -799,7 +864,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, * @param[in] item_ix Info index array to pass to * erts_process_info() * - * @param[in] len Lenght of info index array + * @param[in] len Length of info index array * * @param[in] need_msgq_len Non-zero if message queue * length is needed; otherwise, @@ -1043,14 +1108,11 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id); * * When received, all on heap messages will be moved off heap. * - * @param[in] c_p Pointer to process struct of - * currently executing process. - * * @param[in] to Identifier of receiver. * */ void -erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to); +erts_proc_sig_send_move_msgq_off_heap(Eterm to); /* * End of send operations of currently supported process signals. @@ -1072,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to); * * @param[out] statep Pointer to process state after * signal handling. May not be NULL. + * The state should recently have + * been updated before calling + * this function. * * @param[in,out] redsp Pointer to an integer containing * reductions. On input, the amount @@ -1125,6 +1190,8 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, * used by the call. On output, the * amount of reductions consumed. * + * @param[in,out] pe_ctxtp Process exit context pointer. + * * @return Returns a non-zero value, when * no more signals to handle in the * middle queue remain. A zero @@ -1134,8 +1201,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, */ int erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, - ErtsMonitor **pend_spawn_mon_pp, - Eterm reason); + ErtsProcExitContext *pe_ctxt_p); /** * @@ -1191,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls, int neg_o_reds, ErtsMessage **msgpp, int *get_outp); +/* + * CLEAN_SIGQ - Flush until middle queue is empty, i.e. + * the content of inner+middle queue equals + * the message queue. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0) +/* + * FROM_ALL - Flush signals from all local senders (processes + * and ports). + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1) +/* + * FROM_ID - Flush signals from process or port identified + * by 'id'. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2) + +/* + * All erts_proc_sig_init_flush_signals() flags. + */ +#define ERTS_PROC_SIG_FLUSH_FLGS \ + (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + +/** + * + * @brief Initialize flush of signals from another process or port + * + * Inserts a flush signal in the outer signal queue of + * current process and sets the FS_FLUSHING_SIGS flag in + * 'c_p->sig_qs.flags'. When the flush signal has been + * handled the FS_FLUSHED_SIGS flag is set as well. + * + * While the flushing is ongoing the process *should* only + * handle incoming signals and not execute Erlang code. When + * the functionality that initiated the flush detects that + * the flush is done by the FS_FLUSHED_SIGS flag being set, + * it should clear both the FS_FLUSHED_SIGS flag and the + * FS_FLUSHING_SIGS flag. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * flags Flags indicating how to flush. + * (see above). + * from Identifier of sender to flush + * signals from in case the + * FROM_ID flag is set. + */ +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from); + /** * * @brief Fetch signals from the outer queue @@ -1256,7 +1374,7 @@ erts_enqueue_signals(Process *rp, ErtsMessage *first, * */ void -erts_proc_sig_send_pending(ErtsSchedulerData* esdp); +erts_proc_sig_send_pending(Process *c_p, ErtsSchedulerData* esdp); void @@ -1264,7 +1382,8 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm token); void -erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep, +erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias, + ErtsDistExternal *edep, ErlHeapFragment *hfrag, Eterm token); /** @@ -1307,12 +1426,16 @@ typedef struct { * * @param[in] mip Pointer to array of * ErtsMessageInfo structures. + * + * @param[out] msgq_lenp Pointer to integer containing + * amount of messages. */ Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip); + ErtsMessageInfo *mip, + Sint *msgq_lenp); /** * @@ -1390,6 +1513,26 @@ int erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, ErtsMessage *msgp, int force_off_heap); +/** + * + * @brief Check if a newly scheduled process needs to wait for + * ongoing dirty handling of signals to complete. + * + * @param[in] c_p Pointer to executing process. + * + * @param[in] state_in State of process. + * + * @return Updated state of process if + * we had to wait; otherwise, + * state_in. + */ +ERTS_GLB_INLINE erts_aint32_t +erts_proc_sig_check_wait_dirty_handle_signals(Process *c_p, + erts_aint32_t state_in); + +void erts_proc_sig_do_wait_dirty_handle_signals__(Process *c_p); + + ErtsDistExternal * erts_proc_sig_get_external(ErtsMessage *msgp); @@ -1613,7 +1756,7 @@ Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id); void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id); ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp); -void erts_msgq_remove_leading_recv_markers(Process *c_p); +void erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p); #define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \ ((int) ((MRKP) - &(BLKP)->marker[0])) @@ -1621,14 +1764,30 @@ void erts_msgq_remove_leading_recv_markers(Process *c_p); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE Uint64 -erts_proc_sig_new_unlink_id(Process *c_p) +erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender) { Uint64 id; - ASSERT(c_p); - id = (Uint64) c_p->uniq++; - if (id == 0) + ASSERT(sender); + + if (is_internal_pid(sender->id)) { + Process *c_p = ErtsContainerStruct(sender, Process, common); id = (Uint64) c_p->uniq++; + + if (id == 0) { + id = (Uint64) c_p->uniq++; + } + } else { + ASSERT(is_internal_port(sender->id)); + + id = (Uint64) erts_raw_get_unique_monotonic_integer(); + if (id == 0) { + id = (Uint64) erts_raw_get_unique_monotonic_integer(); + } + } + + ASSERT(id != 0); + return id; } @@ -1637,7 +1796,8 @@ erts_proc_sig_fetch(Process *proc) { Sint res = 0; ErtsSignal *sig; - + ErtsSignalInQueueBufferArray* buffers; + int need_unget_buffers; ERTS_LC_ASSERT(ERTS_PROC_IS_EXITING(proc) || ((erts_proc_lc_my_proc_locks(proc) & (ERTS_PROC_LOCK_MAIN @@ -1645,9 +1805,18 @@ erts_proc_sig_fetch(Process *proc) == (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_MSGQ))); + ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS) + || ERTS_PROC_IS_EXITING(proc) + || ERTS_IS_CRASH_DUMPING); + + ASSERT(!(proc->sig_qs.flags & FS_HANDLING_SIGS)); + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); + buffers = erts_proc_sig_queue_flush_get_buffers(proc, + &need_unget_buffers); + sig = (ErtsSignal *) proc->sig_inq.first; if (sig) { if (ERTS_LIKELY(sig->common.tag != ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK)) @@ -1655,7 +1824,20 @@ erts_proc_sig_fetch(Process *proc) else res = erts_proc_sig_fetch_msgq_len_offs__(proc); } - + if (buffers) { + Uint32 state = erts_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_SIG_IN_Q & state) && + erts_atomic64_read_nob(&buffers->nonmsg_slots)) { + /* We may have raced with a thread inserting into a buffer + * when resetting the flag ERTS_PSFLG_SIG_IN_Q in one of + * the fetch functions above so we have to make sure that + * it is set when there is a nonmsg signal in the buffers. */ + erts_atomic32_read_bor_nob(&proc->state, + ERTS_PSFLG_SIG_IN_Q | + ERTS_PSFLG_ACTIVE); + } + erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); + } res += proc->sig_qs.len; ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); @@ -1805,6 +1987,7 @@ erts_msgq_recv_marker_clear(Process *c_p, Eterm id) { ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; int ix; + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); if (!is_small(id) && !is_big(id) && !is_internal_ref(id)) return; @@ -1832,7 +2015,8 @@ erts_msgq_recv_marker_clear(Process *c_p, Eterm id) ERTS_GLB_INLINE Eterm erts_msgq_recv_marker_insert(Process *c_p) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -1845,6 +2029,7 @@ ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p, Eterm insert_id, Eterm bind_id) { + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS ASSERT(bind_id != erts_old_recv_marker_id); #endif @@ -1874,6 +2059,7 @@ ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p, ERTS_GLB_INLINE void erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id) { + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); if (is_internal_ref(id)) { #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; @@ -1881,7 +2067,7 @@ erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id) ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp); #endif - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -1893,6 +2079,7 @@ erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id) ERTS_GLB_INLINE void erts_msgq_recv_marker_set_save(Process *c_p, Eterm id) { + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); if (is_internal_ref(id)) { ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; @@ -1913,6 +2100,7 @@ erts_msgq_recv_marker_set_save(Process *c_p, Eterm id) ERTS_GLB_INLINE ErtsMessage * erts_msgq_peek_msg(Process *c_p) { + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ASSERT(!(*c_p->sig_qs.save) || ERTS_SIG_IS_MSG(*c_p->sig_qs.save)); return *c_p->sig_qs.save; } @@ -1921,6 +2109,7 @@ ERTS_GLB_INLINE void erts_msgq_unlink_msg(Process *c_p, ErtsMessage *msgp) { ErtsMessage *sigp = msgp->next; + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before"); *c_p->sig_qs.save = sigp; c_p->sig_qs.len--; @@ -1939,6 +2128,7 @@ ERTS_GLB_INLINE void erts_msgq_set_save_first(Process *c_p) { ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); if (blkp) { ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__(blkp); #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS @@ -1952,14 +2142,16 @@ erts_msgq_set_save_first(Process *c_p) * anymore... */ if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)) - erts_msgq_remove_leading_recv_markers(c_p); - c_p->sig_qs.save = &c_p->sig_qs.first; + erts_msgq_remove_leading_recv_markers_set_save_first(c_p); + else + c_p->sig_qs.save = &c_p->sig_qs.first; } ERTS_GLB_INLINE void erts_msgq_unlink_msg_set_save_first(Process *c_p, ErtsMessage *msgp) { ErtsMessage *sigp = msgp->next; + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before"); *c_p->sig_qs.save = sigp; c_p->sig_qs.len--; @@ -1976,6 +2168,7 @@ erts_msgq_set_save_next(Process *c_p) { ErtsMessage *sigp = (*c_p->sig_qs.save)->next; ErtsMessage **sigpp = &(*c_p->sig_qs.save)->next; + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); if (sigp && ERTS_SIG_IS_RECV_MARKER(sigp)) sigpp = erts_msgq_pass_recv_markers(c_p, sigpp); @@ -1987,8 +2180,9 @@ ERTS_GLB_INLINE void erts_msgq_set_save_end(Process *c_p) { /* Set save pointer to end of message queue... */ + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -2006,6 +2200,23 @@ erts_msgq_set_save_end(Process *c_p) #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__ #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__ +ERTS_GLB_INLINE erts_aint32_t +erts_proc_sig_check_wait_dirty_handle_signals(Process *c_p, + erts_aint32_t state_in) +{ + erts_aint32_t state = state_in; + ASSERT(erts_get_scheduler_data()->type == ERTS_SCHED_NORMAL); + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + if (c_p->sig_qs.flags & FS_HANDLING_SIGS) { + erts_proc_sig_do_wait_dirty_handle_signals__(c_p); + state = erts_atomic32_read_mb(&c_p->state); + } + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); + return state; +} + #endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ #endif /* ERTS_PROC_SIG_QUEUE_H__ */ |