summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLukas Larsson <lukas@erlang.org>2022-04-14 17:07:43 +0200
committerLukas Larsson <lukas@erlang.org>2022-04-27 10:49:38 +0200
commitc78a8e97538fe444575779336fca7701af330ff1 (patch)
tree385fa2b1b13933c7b991b903a9207ffed4b55e03
parent6d5a5f31c36bbdaad21585d25974177bd1b75e66 (diff)
downloaderlang-c78a8e97538fe444575779336fca7701af330ff1.tar.gz
erts: Fix fragmented send to finish before exiting
If a process is suspended doing a fragmented send and then receives an exit signal it was terminated before it could finish sending the message leading to a memory leak on the receiving side. This change fixes that so that the message is allowed to finish being sent before the process exits. Closes #5876
-rw-r--r--erts/emulator/beam/bif.c5
-rw-r--r--erts/emulator/beam/dist.c7
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c6
-rw-r--r--erts/emulator/beam/erl_process.c34
-rw-r--r--erts/emulator/beam/erl_process.h1
-rw-r--r--erts/etc/unix/etp-commands.in7
6 files changed, 50 insertions, 10 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index c373fab934..b42c08ec40 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -54,7 +54,10 @@ static Export* flush_monitor_messages_trap = NULL;
static Export* set_cpu_topology_trap = NULL;
static Export* await_port_send_result_trap = NULL;
Export* erts_format_cpu_topology_trap = NULL;
-static Export dsend_continue_trap_export;
+#ifndef DEBUG
+static
+#endif
+Export dsend_continue_trap_export;
Export *erts_convert_time_unit_trap = NULL;
static Export *await_msacc_mod_trap = NULL;
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index f1cd97c3bf..cbbed5e4ac 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -2533,6 +2533,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
erts_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
+
erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
erts_mtx_lock(&dep->qlock);
@@ -2607,12 +2608,15 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
}
/* More fragments left to be sent, yield and re-schedule */
if (ctx->fragments) {
+ ctx->c_p->flags |= F_FRAGMENTED_SEND;
retval = ERTS_DSIG_SEND_CONTINUE;
if (!resume && erts_system_monitor_flags.busy_dist_port)
monitor_generic(ctx->c_p, am_busy_dist_port, cid);
goto done;
}
}
+
+ if (ctx->c_p) ctx->c_p->flags &= ~F_FRAGMENTED_SEND;
ctx->obuf = NULL;
if (suspended) {
@@ -2991,9 +2995,8 @@ erts_dist_command(Port *prt, int initial_reds)
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
&& de_busy && qsize < erts_dist_buf_busy_limit) {
- ErtsProcList *suspendees;
int resumed;
- suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
+ ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
erts_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 5a062a0302..5c83d05f3d 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -3444,9 +3444,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ErtsMonitorSuspend *msp;
erts_aint_t mstate;
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon);
- mstate = erts_atomic_read_acqb(&msp->state);
- if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)
+ mstate = erts_atomic_read_band_acqb(
+ &msp->state, ~ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) {
erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
+ }
break;
}
default:
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 406336fea7..26c11a6031 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12692,6 +12692,7 @@ enum continue_exit_phase {
ERTS_CONTINUE_EXIT_MONITORS,
ERTS_CONTINUE_EXIT_LT_MONITORS,
ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG,
+ ERTS_CONTINUE_EXIT_DIST_SEND,
ERTS_CONTINUE_EXIT_DIST_LINKS,
ERTS_CONTINUE_EXIT_DIST_MONITORS,
ERTS_CONTINUE_EXIT_DONE,
@@ -12709,6 +12710,10 @@ struct continue_exit_state {
Uint32 block_rla_ref;
};
+#ifdef DEBUG
+extern Export dsend_continue_trap_export;
+#endif
+
void
erts_continue_exit_process(Process *p)
{
@@ -12946,6 +12951,20 @@ restart:
trap_state->pectxt.dist_state = NIL;
trap_state->pectxt.yield = 0;
+ p->rcount = 0;
+
+ if (p->flags & F_FRAGMENTED_SEND) {
+ /* The process was re-scheduled while doing a fragmented
+ distributed send (possibly because it was suspended).
+ We need to finish doing that send as otherwise incomplete
+ fragmented messages will be sent to other nodes potentially
+ causing memory leaks.
+ */
+ ASSERT(p->current == &dsend_continue_trap_export.info.mfa);
+ /* arg_reg[0] is the argument used in dsend_continue_trap_export */
+ trap_state->pectxt.dist_state = p->arg_reg[0];
+ }
+
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(p);
@@ -13004,11 +13023,12 @@ restart:
reds -= r;
- trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_SEND;
}
- case ERTS_CONTINUE_EXIT_DIST_LINKS: {
+ case ERTS_CONTINUE_EXIT_DIST_SEND: {
- continue_dist_send:
+ continue_dist_send:
+ ASSERT(p->rcount == 0);
if (is_not_nil(trap_state->pectxt.dist_state)) {
Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state);
ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin);
@@ -13041,6 +13061,13 @@ restart:
goto restart;
}
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
+ }
+ case ERTS_CONTINUE_EXIT_DIST_LINKS: {
+
+ if (is_not_nil(trap_state->pectxt.dist_state))
+ goto continue_dist_send;
+
reds = erts_link_tree_foreach_delete_yielding(
&trap_state->pectxt.dist_links,
erts_proc_exit_handle_dist_link,
@@ -13049,6 +13076,7 @@ restart:
reds);
if (reds <= 0 || trap_state->pectxt.yield)
goto yield;
+
trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS;
}
case ERTS_CONTINUE_EXIT_DIST_MONITORS: {
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 3c606fe9eb..a6f0017e56 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -1435,6 +1435,7 @@ extern int erts_system_profile_ts_type;
#define F_DIRTY_MINOR_GC (1 << 21) /* Dirty minor GC scheduled */
#define F_HIBERNATED (1 << 22) /* Hibernated */
#define F_TRAP_EXIT (1 << 23) /* Trapping exit */
+#define F_FRAGMENTED_SEND (1 << 24) /* Process is doing a distributed fragmented send */
/* Signal queue flags */
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in
index 9104d47148..ff14600f7e 100644
--- a/erts/etc/unix/etp-commands.in
+++ b/erts/etc/unix/etp-commands.in
@@ -2091,8 +2091,11 @@ end
define etp-proc-flags-int
# Args: int
#
- if ($arg0 & ~((1 << 24)-1))
- printf "GARBAGE<%x> ", ($arg0 & ~((1 << 24)-1))
+ if ($arg0 & ~((1 << 25)-1))
+ printf "GARBAGE<%x> ", ($arg0 & ~((1 << 25)-1))
+ end
+ if ($arg0 & (1 << 24))
+ printf "fragmented-send "
end
if ($arg0 & (1 << 23))
printf "trap-exit "