diff options
author | Lukas Larsson <lukas@erlang.org> | 2022-04-14 17:07:43 +0200 |
---|---|---|
committer | Lukas Larsson <lukas@erlang.org> | 2022-04-27 10:49:38 +0200 |
commit | c78a8e97538fe444575779336fca7701af330ff1 (patch) | |
tree | 385fa2b1b13933c7b991b903a9207ffed4b55e03 | |
parent | 6d5a5f31c36bbdaad21585d25974177bd1b75e66 (diff) | |
download | erlang-c78a8e97538fe444575779336fca7701af330ff1.tar.gz |
erts: Fix fragmented send to finish before exiting
If a process is suspended doing a fragmented send and then
receives an exit signal it was terminated before it could
finish sending the message leading to a memory leak on the
receiving side.
This change fixes that so that the message is allowed to
finish being sent before the process exits.
Closes #5876
-rw-r--r-- | erts/emulator/beam/bif.c | 5 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 34 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 1 | ||||
-rw-r--r-- | erts/etc/unix/etp-commands.in | 7 |
6 files changed, 50 insertions, 10 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index c373fab934..b42c08ec40 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -54,7 +54,10 @@ static Export* flush_monitor_messages_trap = NULL; static Export* set_cpu_topology_trap = NULL; static Export* await_port_send_result_trap = NULL; Export* erts_format_cpu_topology_trap = NULL; -static Export dsend_continue_trap_export; +#ifndef DEBUG +static +#endif +Export dsend_continue_trap_export; Export *erts_convert_time_unit_trap = NULL; static Export *await_msacc_mod_trap = NULL; diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index f1cd97c3bf..cbbed5e4ac 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2533,6 +2533,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); + erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; erts_mtx_lock(&dep->qlock); @@ -2607,12 +2608,15 @@ erts_dsig_send(ErtsDSigSendContext *ctx) } /* More fragments left to be sent, yield and re-schedule */ if (ctx->fragments) { + ctx->c_p->flags |= F_FRAGMENTED_SEND; retval = ERTS_DSIG_SEND_CONTINUE; if (!resume && erts_system_monitor_flags.busy_dist_port) monitor_generic(ctx->c_p, am_busy_dist_port, cid); goto done; } } + + if (ctx->c_p) ctx->c_p->flags &= ~F_FRAGMENTED_SEND; ctx->obuf = NULL; if (suspended) { @@ -2991,9 +2995,8 @@ erts_dist_command(Port *prt, int initial_reds) obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && de_busy && qsize < erts_dist_buf_busy_limit) { - ErtsProcList *suspendees; int resumed; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 5a062a0302..5c83d05f3d 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -3444,9 +3444,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ErtsMonitorSuspend *msp; erts_aint_t mstate; msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon); - mstate = erts_atomic_read_acqb(&msp->state); - if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) + mstate = erts_atomic_read_band_acqb( + &msp->state, ~ERTS_MSUSPEND_STATE_FLG_ACTIVE); + if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) { erts_resume(c_p, ERTS_PROC_LOCK_MAIN); + } break; } default: diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 406336fea7..26c11a6031 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12692,6 +12692,7 @@ enum continue_exit_phase { ERTS_CONTINUE_EXIT_MONITORS, ERTS_CONTINUE_EXIT_LT_MONITORS, ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG, + ERTS_CONTINUE_EXIT_DIST_SEND, ERTS_CONTINUE_EXIT_DIST_LINKS, ERTS_CONTINUE_EXIT_DIST_MONITORS, ERTS_CONTINUE_EXIT_DONE, @@ -12709,6 +12710,10 @@ struct continue_exit_state { Uint32 block_rla_ref; }; +#ifdef DEBUG +extern Export dsend_continue_trap_export; +#endif + void erts_continue_exit_process(Process *p) { @@ -12946,6 +12951,20 @@ restart: trap_state->pectxt.dist_state = NIL; trap_state->pectxt.yield = 0; + p->rcount = 0; + + if (p->flags & F_FRAGMENTED_SEND) { + /* The process was re-scheduled while doing a fragmented + distributed send (possibly because it was suspended). + We need to finish doing that send as otherwise incomplete + fragmented messages will be sent to other nodes potentially + causing memory leaks. + */ + ASSERT(p->current == &dsend_continue_trap_export.info.mfa); + /* arg_reg[0] is the argument used in dsend_continue_trap_export */ + trap_state->pectxt.dist_state = p->arg_reg[0]; + } + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); @@ -13004,11 +13023,12 @@ restart: reds -= r; - trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS; + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_SEND; } - case ERTS_CONTINUE_EXIT_DIST_LINKS: { + case ERTS_CONTINUE_EXIT_DIST_SEND: { - continue_dist_send: + continue_dist_send: + ASSERT(p->rcount == 0); if (is_not_nil(trap_state->pectxt.dist_state)) { Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state); ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin); @@ -13041,6 +13061,13 @@ restart: goto restart; } + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS; + } + case ERTS_CONTINUE_EXIT_DIST_LINKS: { + + if (is_not_nil(trap_state->pectxt.dist_state)) + goto continue_dist_send; + reds = erts_link_tree_foreach_delete_yielding( &trap_state->pectxt.dist_links, erts_proc_exit_handle_dist_link, @@ -13049,6 +13076,7 @@ restart: reds); if (reds <= 0 || trap_state->pectxt.yield) goto yield; + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS; } case ERTS_CONTINUE_EXIT_DIST_MONITORS: { diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 3c606fe9eb..a6f0017e56 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1435,6 +1435,7 @@ extern int erts_system_profile_ts_type; #define F_DIRTY_MINOR_GC (1 << 21) /* Dirty minor GC scheduled */ #define F_HIBERNATED (1 << 22) /* Hibernated */ #define F_TRAP_EXIT (1 << 23) /* Trapping exit */ +#define F_FRAGMENTED_SEND (1 << 24) /* Process is doing a distributed fragmented send */ /* Signal queue flags */ #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 9104d47148..ff14600f7e 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -2091,8 +2091,11 @@ end define etp-proc-flags-int # Args: int # - if ($arg0 & ~((1 << 24)-1)) - printf "GARBAGE<%x> ", ($arg0 & ~((1 << 24)-1)) + if ($arg0 & ~((1 << 25)-1)) + printf "GARBAGE<%x> ", ($arg0 & ~((1 << 25)-1)) + end + if ($arg0 & (1 << 24)) + printf "fragmented-send " end if ($arg0 & (1 << 23)) printf "trap-exit " |