summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/erl_proc_sig_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.h')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h385
1 files changed, 298 insertions, 87 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 66ee35d1f7..60c7af5e66 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2018-2021. All Rights Reserved.
+ * Copyright Ericsson AB 2018-2023. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -91,6 +91,9 @@
#if 0
# define ERTS_PROC_SIG_HARD_DEBUG_RECV_MARKER
#endif
+#if 0
+# define ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS
+#endif
struct erl_mesg;
struct erl_dist_external;
@@ -107,7 +110,7 @@ typedef struct {
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 18
+#define ERTS_SIG_Q_OP_MAX 19
#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
@@ -127,7 +130,8 @@ typedef struct {
#define ERTS_SIG_Q_OP_ALIAS_MSG 15
#define ERTS_SIG_Q_OP_RECV_MARK 16
#define ERTS_SIG_Q_OP_UNLINK_ACK 17
-#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_ADJ_MSGQ 18
+#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)
@@ -238,6 +242,27 @@ void erl_proc_sig_hdbg_chk_recv_marker_block(struct process *c_p);
#include "erl_process.h"
#include "erl_bif_unique.h"
+
+void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state);
+void erts_proc_sig_queue_flush_and_deinstall_buffers(Process* proc);
+void erts_proc_sig_queue_flush_buffers(Process* proc);
+ErtsSignalInQueueBufferArray*
+erts_proc_sig_queue_flush_get_buffers(Process* proc, int *need_unget_buffers);
+void erts_proc_sig_queue_lock(Process* proc);
+ErtsSignalInQueueBufferArray*
+erts_proc_sig_queue_get_buffers(Process* p, int *need_unread);
+void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers,
+ int need_unget);
+int erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from,
+ Process* receiver,
+ ErtsProcLocks receiver_locks,
+ ErtsMessage* first,
+ ErtsMessage** last,
+ ErtsMessage** last_next,
+ Uint len,
+ int is_signal);
+int erts_proc_sig_queue_force_buffers(Process*);
+
#define ERTS_SIG_Q_OP_BITS 8
#define ERTS_SIG_Q_OP_SHIFT 0
#define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1)
@@ -304,8 +329,10 @@ struct dist_entry_;
* @brief Send an exit signal to a process.
*
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] from Identifier of sender.
*
@@ -322,7 +349,7 @@ struct dist_entry_;
*
*/
void
-erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
+erts_proc_sig_send_exit(ErtsPTabElementCommon *sender, Eterm from, Eterm to,
Eterm reason, Eterm token, int normal_kills);
/**
@@ -359,11 +386,26 @@ erts_proc_sig_send_dist_exit(DistEntry *dep,
/**
*
+ * @brief Send an exit signal due to a link to a process being
+ * broken by connection loss.
+ *
+ * @param[in] lnk Pointer to link structure
+ * from the sending side. It
+ * should contain information
+ * about receiver.
+ */
+void
+erts_proc_sig_send_link_exit_noconnection(ErtsLink *lnk);
+
+/**
+ *
* @brief Send an exit signal due to broken link to a process.
*
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] from Identifier of sender.
*
@@ -378,16 +420,18 @@ erts_proc_sig_send_dist_exit(DistEntry *dep,
*
*/
void
-erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
- Eterm reason, Eterm token);
+erts_proc_sig_send_link_exit(ErtsPTabElementCommon *sender, Eterm from,
+ ErtsLink *lnk, Eterm reason, Eterm token);
/**
*
* @brief Send an link signal to a process.
*
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] to Identifier of receiver.
*
@@ -405,7 +449,8 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
*
*/
int
-erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk);
+erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from,
+ Eterm to, ErtsLink *lnk);
/**
*
@@ -414,15 +459,15 @@ erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk);
* The newly created unlink identifier is to be used in an
* unlink operation.
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ * @param[in] sender Pointer to the sending process/port.
*
* @return A new 64-bit unlink identifier
* unique in context of the
* calling process. The identifier
* may be any value but zero.
*/
-ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p);
+ERTS_GLB_INLINE
+Uint64 erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender);
/**
*
@@ -431,13 +476,10 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p);
* The structure will contain a newly created unlink
* identifier to be used in the operation.
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process
- * ('from' is a process
- * identifier), or NULL if not
- * called in the context of an
- * executing process ('from' is
- * a port identifier).
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] from Id (as an erlang term) of
* entity sending the unlink
@@ -447,7 +489,7 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p);
* structure.
*/
ErtsSigUnlinkOp *
-erts_proc_sig_make_unlink_op(Process *c_p, Eterm from);
+erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from);
/**
*
@@ -464,8 +506,10 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk);
* @brief Send an unlink signal to a process.
*
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] from Id (as an erlang term) of
* entity sending the unlink
@@ -477,15 +521,18 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk);
* receiver.
*/
Uint64
-erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk);
+erts_proc_sig_send_unlink(ErtsPTabElementCommon *sender, Eterm from,
+ ErtsLink *lnk);
/**
*
* @brief Send an unlink acknowledgment signal to a process.
*
- *
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
+ *
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
* @param[in] from Id (as an erlang term) of
* entity sending the unlink
@@ -498,7 +545,7 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk);
* signal.
*/
void
-erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from,
+erts_proc_sig_send_unlink_ack(ErtsPTabElementCommon *sender, Eterm from,
ErtsSigUnlinkOp *sulnk);
/**
@@ -560,11 +607,6 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id,
* This function is used instead of erts_proc_sig_send_unlink_ack()
* when the signal arrives via the distribution.
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process or
- * NULL if not called in the context
- * of an executing process.
- *
* @param[in] dep Distribution entry of channel
* that the signal arrived on.
*
@@ -575,7 +617,7 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id,
* @param[in] id Identifier of unlink operation.
*/
void
-erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep,
+erts_proc_sig_send_dist_unlink_ack(DistEntry *dep,
Uint32 conn_id, Eterm from, Eterm to,
Uint64 id);
@@ -583,6 +625,14 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep,
*
* @brief Send a monitor down signal to a process.
*
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
+ *
+ * @param[in] from Sending entity, must be provided
+ * to maintain signal order.
+ *
* @param[in] mon Pointer to target monitor
* structure from the sending
* side. It should contain
@@ -592,27 +642,48 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep,
*
*/
void
-erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason);
+erts_proc_sig_send_monitor_down(ErtsPTabElementCommon *sender, Eterm from,
+ ErtsMonitor *mon, Eterm reason);
/**
*
* @brief Send a demonitor signal to a process.
*
- * @param[in] mon Pointer to origin monitor
- * structure from the sending
- * side. It should contain
- * information about receiver.
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
*
- * @param[in] reason Exit reason.
+ * @param[in] from Sending entity, must be provided
+ * to maintain signal order.
+ *
+ * @param[in] system Whether the sender is considered a
+ * system service, e.g. a NIF monitor,
+ * and it's okay to order by `from`
+ * even when it's not a pid or port.
+ *
+ * @param[in] mon Pointer to origin monitor
+ * structure from the sending
+ * side. It should contain
+ * information about receiver.
*
*/
void
-erts_proc_sig_send_demonitor(ErtsMonitor *mon);
+erts_proc_sig_send_demonitor(ErtsPTabElementCommon *sender, Eterm from,
+ int system, ErtsMonitor *mon);
/**
*
* @brief Send a monitor signal to a process.
*
+ * @param[in] sender Pointer to the sending process/port,
+ * if any, as it may not be possible to
+ * resolve the sender (e.g. after it's
+ * dead).
+ *
+ * @param[in] from Sending entity, must be provided
+ * to maintain signal order.
+ *
* @param[in] mon Pointer to target monitor
* structure to insert on
* receiver side.
@@ -630,7 +701,8 @@ erts_proc_sig_send_demonitor(ErtsMonitor *mon);
*
*/
int
-erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
+erts_proc_sig_send_monitor(ErtsPTabElementCommon *sender, Eterm from,
+ ErtsMonitor *mon, Eterm to);
/**
*
@@ -673,27 +745,22 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
* when the signal arrives via the distribution and
* no monitor structure is available.
*
+ * @param[in] from Identifier of sender.
+ *
* @param[in] to Identifier of receiver.
*
* @param[in] ref Reference identifying the monitor.
*
*/
void
-erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref);
+erts_proc_sig_send_dist_demonitor(Eterm from, Eterm to, Eterm ref);
/**
*
- * @brief Send a persistent monitor triggered signal to a process.
- *
- * Used by monitors that are not auto disabled such as for
- * example 'time_offset' monitors.
- *
- * @param[in] type Monitor type.
+ * @brief Send a persistent "node down" monitor signal to a process
*
* @param[in] key Monitor key.
*
- * @param[in] from Identifier of sender.
- *
* @param[in] to Identifier of receiver.
*
* @param[in] msg Message template.
@@ -702,27 +769,25 @@ erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref);
*
*/
void
-erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key,
- Eterm from, Eterm to,
- Eterm msg, Uint msg_sz);
+erts_proc_sig_send_monitor_nodes_msg(Eterm key, Eterm to,
+ Eterm msg, Uint msg_sz);
/**
*
- * @brief Send a trace change signal to a process.
+ * @brief Send a persistent "time offset changed" monitor signal to a process
*
- * @param[in] to Identifier of receiver.
+ * @param[in] key Monitor key.
*
- * @param[in] on Trace flags to enable.
+ * @param[in] to Identifier of receiver.
*
- * @param[in] off Trace flags to disable.
+ * @param[in] msg Message template.
*
- * @param[in] tracer Tracer to set. If the non-value,
- * tracer will not be changed.
+ * @param[in] msg_sz Heap size of message template.
*
*/
void
-erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off,
- Eterm tracer);
+erts_proc_sig_send_monitor_time_offset_msg(Eterm key, Eterm to,
+ Eterm msg, Uint msg_sz);
/**
*
@@ -799,7 +864,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to,
* @param[in] item_ix Info index array to pass to
* erts_process_info()
*
- * @param[in] len Lenght of info index array
+ * @param[in] len Length of info index array
*
* @param[in] need_msgq_len Non-zero if message queue
* length is needed; otherwise,
@@ -1043,14 +1108,11 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id);
*
* When received, all on heap messages will be moved off heap.
*
- * @param[in] c_p Pointer to process struct of
- * currently executing process.
- *
* @param[in] to Identifier of receiver.
*
*/
void
-erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to);
+erts_proc_sig_send_move_msgq_off_heap(Eterm to);
/*
* End of send operations of currently supported process signals.
@@ -1072,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to);
*
* @param[out] statep Pointer to process state after
* signal handling. May not be NULL.
+ * The state should recently have
+ * been updated before calling
+ * this function.
*
* @param[in,out] redsp Pointer to an integer containing
* reductions. On input, the amount
@@ -1125,6 +1190,8 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
* used by the call. On output, the
* amount of reductions consumed.
*
+ * @param[in,out] pe_ctxtp Process exit context pointer.
+ *
* @return Returns a non-zero value, when
* no more signals to handle in the
* middle queue remain. A zero
@@ -1134,8 +1201,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
*/
int
erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
- ErtsMonitor **pend_spawn_mon_pp,
- Eterm reason);
+ ErtsProcExitContext *pe_ctxt_p);
/**
*
@@ -1191,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls,
int neg_o_reds, ErtsMessage **msgpp,
int *get_outp);
+/*
+ * CLEAN_SIGQ - Flush until middle queue is empty, i.e.
+ * the content of inner+middle queue equals
+ * the message queue.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0)
+/*
+ * FROM_ALL - Flush signals from all local senders (processes
+ * and ports).
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1)
+/*
+ * FROM_ID - Flush signals from process or port identified
+ * by 'id'.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2)
+
+/*
+ * All erts_proc_sig_init_flush_signals() flags.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLGS \
+ (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+
+/**
+ *
+ * @brief Initialize flush of signals from another process or port
+ *
+ * Inserts a flush signal in the outer signal queue of
+ * current process and sets the FS_FLUSHING_SIGS flag in
+ * 'c_p->sig_qs.flags'. When the flush signal has been
+ * handled the FS_FLUSHED_SIGS flag is set as well.
+ *
+ * While the flushing is ongoing the process *should* only
+ * handle incoming signals and not execute Erlang code. When
+ * the functionality that initiated the flush detects that
+ * the flush is done by the FS_FLUSHED_SIGS flag being set,
+ * it should clear both the FS_FLUSHED_SIGS flag and the
+ * FS_FLUSHING_SIGS flag.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ * flags Flags indicating how to flush.
+ * (see above).
+ * from Identifier of sender to flush
+ * signals from in case the
+ * FROM_ID flag is set.
+ */
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from);
+
/**
*
* @brief Fetch signals from the outer queue
@@ -1256,7 +1374,7 @@ erts_enqueue_signals(Process *rp, ErtsMessage *first,
*
*/
void
-erts_proc_sig_send_pending(ErtsSchedulerData* esdp);
+erts_proc_sig_send_pending(Process *c_p, ErtsSchedulerData* esdp);
void
@@ -1264,7 +1382,8 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to,
Eterm msg, Eterm token);
void
-erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep,
+erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias,
+ ErtsDistExternal *edep,
ErlHeapFragment *hfrag, Eterm token);
/**
@@ -1307,12 +1426,16 @@ typedef struct {
*
* @param[in] mip Pointer to array of
* ErtsMessageInfo structures.
+ *
+ * @param[out] msgq_lenp Pointer to integer containing
+ * amount of messages.
*/
Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip);
+ ErtsMessageInfo *mip,
+ Sint *msgq_lenp);
/**
*
@@ -1390,6 +1513,26 @@ int
erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
ErtsMessage *msgp, int force_off_heap);
+/**
+ *
+ * @brief Check if a newly scheduled process needs to wait for
+ * ongoing dirty handling of signals to complete.
+ *
+ * @param[in] c_p Pointer to executing process.
+ *
+ * @param[in] state_in State of process.
+ *
+ * @return Updated state of process if
+ * we had to wait; otherwise,
+ * state_in.
+ */
+ERTS_GLB_INLINE erts_aint32_t
+erts_proc_sig_check_wait_dirty_handle_signals(Process *c_p,
+ erts_aint32_t state_in);
+
+void erts_proc_sig_do_wait_dirty_handle_signals__(Process *c_p);
+
+
ErtsDistExternal *
erts_proc_sig_get_external(ErtsMessage *msgp);
@@ -1613,7 +1756,7 @@ Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id);
void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id);
ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p,
ErtsMessage **markpp);
-void erts_msgq_remove_leading_recv_markers(Process *c_p);
+void erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p);
#define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \
((int) ((MRKP) - &(BLKP)->marker[0]))
@@ -1621,14 +1764,30 @@ void erts_msgq_remove_leading_recv_markers(Process *c_p);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE Uint64
-erts_proc_sig_new_unlink_id(Process *c_p)
+erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender)
{
Uint64 id;
- ASSERT(c_p);
- id = (Uint64) c_p->uniq++;
- if (id == 0)
+ ASSERT(sender);
+
+ if (is_internal_pid(sender->id)) {
+ Process *c_p = ErtsContainerStruct(sender, Process, common);
id = (Uint64) c_p->uniq++;
+
+ if (id == 0) {
+ id = (Uint64) c_p->uniq++;
+ }
+ } else {
+ ASSERT(is_internal_port(sender->id));
+
+ id = (Uint64) erts_raw_get_unique_monotonic_integer();
+ if (id == 0) {
+ id = (Uint64) erts_raw_get_unique_monotonic_integer();
+ }
+ }
+
+ ASSERT(id != 0);
+
return id;
}
@@ -1637,7 +1796,8 @@ erts_proc_sig_fetch(Process *proc)
{
Sint res = 0;
ErtsSignal *sig;
-
+ ErtsSignalInQueueBufferArray* buffers;
+ int need_unget_buffers;
ERTS_LC_ASSERT(ERTS_PROC_IS_EXITING(proc)
|| ((erts_proc_lc_my_proc_locks(proc)
& (ERTS_PROC_LOCK_MAIN
@@ -1645,9 +1805,18 @@ erts_proc_sig_fetch(Process *proc)
== (ERTS_PROC_LOCK_MAIN
| ERTS_PROC_LOCK_MSGQ)));
+ ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS)
+ || ERTS_PROC_IS_EXITING(proc)
+ || ERTS_IS_CRASH_DUMPING);
+
+ ASSERT(!(proc->sig_qs.flags & FS_HANDLING_SIGS));
+
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc);
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
+ buffers = erts_proc_sig_queue_flush_get_buffers(proc,
+ &need_unget_buffers);
+
sig = (ErtsSignal *) proc->sig_inq.first;
if (sig) {
if (ERTS_LIKELY(sig->common.tag != ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK))
@@ -1655,7 +1824,20 @@ erts_proc_sig_fetch(Process *proc)
else
res = erts_proc_sig_fetch_msgq_len_offs__(proc);
}
-
+ if (buffers) {
+ Uint32 state = erts_atomic32_read_acqb(&proc->state);
+ if (!(ERTS_PSFLG_SIG_IN_Q & state) &&
+ erts_atomic64_read_nob(&buffers->nonmsg_slots)) {
+ /* We may have raced with a thread inserting into a buffer
+ * when resetting the flag ERTS_PSFLG_SIG_IN_Q in one of
+ * the fetch functions above so we have to make sure that
+ * it is set when there is a nonmsg signal in the buffers. */
+ erts_atomic32_read_bor_nob(&proc->state,
+ ERTS_PSFLG_SIG_IN_Q |
+ ERTS_PSFLG_ACTIVE);
+ }
+ erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers);
+ }
res += proc->sig_qs.len;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
@@ -1805,6 +1987,7 @@ erts_msgq_recv_marker_clear(Process *c_p, Eterm id)
{
ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
int ix;
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
if (!is_small(id) && !is_big(id) && !is_internal_ref(id))
return;
@@ -1832,7 +2015,8 @@ erts_msgq_recv_marker_clear(Process *c_p, Eterm id)
ERTS_GLB_INLINE Eterm
erts_msgq_recv_marker_insert(Process *c_p)
{
- erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
+ erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
@@ -1845,6 +2029,7 @@ ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p,
Eterm insert_id,
Eterm bind_id)
{
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
#ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
ASSERT(bind_id != erts_old_recv_marker_id);
#endif
@@ -1874,6 +2059,7 @@ ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p,
ERTS_GLB_INLINE void
erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id)
{
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
if (is_internal_ref(id)) {
#ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
@@ -1881,7 +2067,7 @@ erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id)
ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp);
#endif
- erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
@@ -1893,6 +2079,7 @@ erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id)
ERTS_GLB_INLINE void
erts_msgq_recv_marker_set_save(Process *c_p, Eterm id)
{
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
if (is_internal_ref(id)) {
ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
@@ -1913,6 +2100,7 @@ erts_msgq_recv_marker_set_save(Process *c_p, Eterm id)
ERTS_GLB_INLINE ErtsMessage *
erts_msgq_peek_msg(Process *c_p)
{
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
ASSERT(!(*c_p->sig_qs.save) || ERTS_SIG_IS_MSG(*c_p->sig_qs.save));
return *c_p->sig_qs.save;
}
@@ -1921,6 +2109,7 @@ ERTS_GLB_INLINE void
erts_msgq_unlink_msg(Process *c_p, ErtsMessage *msgp)
{
ErtsMessage *sigp = msgp->next;
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before");
*c_p->sig_qs.save = sigp;
c_p->sig_qs.len--;
@@ -1939,6 +2128,7 @@ ERTS_GLB_INLINE void
erts_msgq_set_save_first(Process *c_p)
{
ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
if (blkp) {
ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__(blkp);
#ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
@@ -1952,14 +2142,16 @@ erts_msgq_set_save_first(Process *c_p)
* anymore...
*/
if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first))
- erts_msgq_remove_leading_recv_markers(c_p);
- c_p->sig_qs.save = &c_p->sig_qs.first;
+ erts_msgq_remove_leading_recv_markers_set_save_first(c_p);
+ else
+ c_p->sig_qs.save = &c_p->sig_qs.first;
}
ERTS_GLB_INLINE void
erts_msgq_unlink_msg_set_save_first(Process *c_p, ErtsMessage *msgp)
{
ErtsMessage *sigp = msgp->next;
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before");
*c_p->sig_qs.save = sigp;
c_p->sig_qs.len--;
@@ -1976,6 +2168,7 @@ erts_msgq_set_save_next(Process *c_p)
{
ErtsMessage *sigp = (*c_p->sig_qs.save)->next;
ErtsMessage **sigpp = &(*c_p->sig_qs.save)->next;
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
if (sigp && ERTS_SIG_IS_RECV_MARKER(sigp))
sigpp = erts_msgq_pass_recv_markers(c_p, sigpp);
@@ -1987,8 +2180,9 @@ ERTS_GLB_INLINE void
erts_msgq_set_save_end(Process *c_p)
{
/* Set save pointer to end of message queue... */
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
- erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
@@ -2006,6 +2200,23 @@ erts_msgq_set_save_end(Process *c_p)
#undef ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__
#undef ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__
+ERTS_GLB_INLINE erts_aint32_t
+erts_proc_sig_check_wait_dirty_handle_signals(Process *c_p,
+ erts_aint32_t state_in)
+{
+ erts_aint32_t state = state_in;
+ ASSERT(erts_get_scheduler_data()->type == ERTS_SCHED_NORMAL);
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ if (c_p->sig_qs.flags & FS_HANDLING_SIGS) {
+ erts_proc_sig_do_wait_dirty_handle_signals__(c_p);
+ state = erts_atomic32_read_mb(&c_p->state);
+ }
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+ ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS));
+ return state;
+}
+
#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
#endif /* ERTS_PROC_SIG_QUEUE_H__ */