diff options
-rw-r--r-- | erts/doc/src/erl_cmd.xml | 22 | ||||
-rw-r--r-- | erts/doc/src/erlang.xml | 124 | ||||
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 16 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 471 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 14 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 26 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 13 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 19 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 7 | ||||
-rw-r--r-- | erts/emulator/test/distribution_SUITE.erl | 280 | ||||
-rw-r--r-- | erts/etc/common/erlexec.c | 4 | ||||
-rw-r--r-- | erts/preloaded/ebin/erlang.beam | bin | 132912 -> 133052 bytes | |||
-rw-r--r-- | erts/preloaded/src/erlang.erl | 11 |
16 files changed, 829 insertions, 187 deletions
diff --git a/erts/doc/src/erl_cmd.xml b/erts/doc/src/erl_cmd.xml index b56caee569..9069061ae4 100644 --- a/erts/doc/src/erl_cmd.xml +++ b/erts/doc/src/erl_cmd.xml @@ -1048,6 +1048,23 @@ $ <input>erl \ <p>Memory allocator-specific flags. For more information, see <seecref marker="erts_alloc"><c>erts_alloc(3)</c></seecref>.</p> </item> + <tag><marker id="+pad"/><c>+pad true|false</c></tag> + <item> + <p>Since: OTP @OTP-18374@</p> + <p> + The boolean value used with the <c>+pad</c> parameter determines + the default value of the + <seeerl marker="erlang#process_flag_async_dist"> + <c>async_dist</c></seeerl> process flag of newly spawned processes. + By default, if no <c>+pad</c> command line option is passed, + the <c>async_dist</c> flag will be set to <c>false</c>. + </p> + <p> + The value used in runtime can be inspected by calling + <seeerl marker="erlang#system_info_async_dist"> + <c>erlang:system_info(async_dist)</c></seeerl>. + </p> + </item> <tag><marker id="+pc"/><marker id="printable_character_range"/> <c><![CDATA[+pc Range]]></c></tag> <item> @@ -1793,6 +1810,11 @@ $ <input>erl \ limit is per distribution channel. A higher limit gives lower latency and higher throughput at the expense of higher memory use.</p> + <p> + This limit only affects processes that have disabled + <seeerl marker="erlang#process_flag_async_dist"><i>fully + asynchronous distributed signaling</i></seeerl>. + </p> </item> <tag><marker id="+zdntgc"/><c>+zdntgc time</c></tag> <item> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 27c72ce8b2..d409905ba3 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -6276,6 +6276,68 @@ receive_replies(ReqId, N, Acc) -> <func> <name name="process_flag" arity="2" clause_i="1" + anchor="process_flag_async_dist" since="OTP @OTP-18374@"/> + <fsummary> + Enable or disable fully asynchronous distributed signaling + for the calling process. + </fsummary> + <desc> + <p> + Enable or disable <i>fully asynchronous distributed signaling</i> + for the calling process. When disabled, which is the default, the + process sending a distributed signal will block in the send + operation if the buffer for the distribution channel reach the + <seecom marker="erts:erl#+zdbbl">distribution buffer busy + limit</seecom>. The process will remain blocked until the buffer + shrinks enough. This might in some cases take a substantial amount + of time. When <c>async_dist</c> is enabled, send operations of + distributed signals will always buffer the signal on the outgoing + distribution channel and then immediately return. That is, these + send operations will <em>never</em> block the sending process. + </p> + <note><p> + Since no flow control is enforced by the runtime system when + <c>async_dist</c> process flag is enabled, you need to make sure + that flow control for such data is implemented, or that the amount + of such data is known to always be limited. Unlimited signaling with + <c>async_dist</c> enabled in the absence of flow control will + typically cause the sending runtime system to crash on an out of + memory condition. + </p></note> + <p> + Blocking due to disabled <c>async_dist</c> can be monitored by + <seemfa marker="#system_monitor/2"><c>erlang:system_montor()</c></seemfa> + using the + <seeerl marker="#busy_dist_port"><c>busy_dist_port</c></seeerl> + option. Only data buffered by processes which (at the time of sending + a signal) have disabled <c>async_dist</c> will be counted when + determining whether or not an operation should block the caller. + </p> + <p> + The <c>async_dist</c> flag can also be set on a new process when + spawning it using the + <seemfa marker="#spawn_opt/4"><c>spawn_opt()</c></seemfa> BIF with the + option <seeerl marker="#spawn_opt_async_dist"><c>{async_dist, + Enable}</c></seeerl>. The default <c>async_dist</c> flag to use on + newly spawned processes can be set by passing the command line + argument <seecom marker="erl#+pad"><c>+pad + <boolean></c></seecom> when starting the runtime system. If the + <c>+pad <boolean></c> command line argument is not passed, the + default value of the <c>async_dist</c> flag will be <c>false</c>. + </p> + <p> + You can inspect the state of the <c>async_dist</c> process flag of a + process by calling <seeerl marker="#process_info_async_dist"> + <c>process_info(Pid, async_dist)</c></seeerl>. + </p> + <p> + Returns the old value of the <c>async_dist</c> flag. + </p> + </desc> + </func> + + <func> + <name name="process_flag" arity="2" clause_i="2" anchor="process_flag_trap_exit" since=""/> <fsummary>Set process flag trap_exit for the calling process.</fsummary> <desc> @@ -6293,7 +6355,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="2" since=""/> + <name name="process_flag" arity="2" clause_i="3" since=""/> <fsummary>Set process flag error_handler for the calling process. </fsummary> <desc> @@ -6307,7 +6369,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="3" since="OTP 24.0"/> + <name name="process_flag" arity="2" clause_i="4" since="OTP 24.0"/> <fsummary>Set process flag fullsweep_after for the calling process. </fsummary> <desc> @@ -6318,7 +6380,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="4" + <name name="process_flag" arity="2" clause_i="5" anchor="process_flag_min_heap_size" since=""/> <fsummary>Set process flag min_heap_size for the calling process. </fsummary> @@ -6329,7 +6391,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="5" since="OTP R13B04"/> + <name name="process_flag" arity="2" clause_i="6" since="OTP R13B04"/> <fsummary>Set process flag min_bin_vheap_size for the calling process. </fsummary> <desc> @@ -6340,7 +6402,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="6" + <name name="process_flag" arity="2" clause_i="7" anchor="process_flag_max_heap_size" since="OTP 19.0"/> <fsummary>Set process flag max_heap_size for the calling process. </fsummary> @@ -6419,7 +6481,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="7" + <name name="process_flag" arity="2" clause_i="8" anchor="process_flag_message_queue_data" since="OTP 19.0"/> <fsummary>Set process flag message_queue_data for the calling process. </fsummary> @@ -6461,7 +6523,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="8" + <name name="process_flag" arity="2" clause_i="9" anchor="process_flag_priority" since=""/> <fsummary>Set process flag priority for the calling process.</fsummary> <type name="priority_level"/> @@ -6534,7 +6596,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="9" since=""/> + <name name="process_flag" arity="2" clause_i="10" since=""/> <fsummary>Set process flag save_calls for the calling process.</fsummary> <desc> <p><c><anno>N</anno></c> must be an integer in the interval 0..10000. @@ -6565,7 +6627,7 @@ receive_replies(ReqId, N, Acc) -> </func> <func> - <name name="process_flag" arity="2" clause_i="10" since=""/> + <name name="process_flag" arity="2" clause_i="11" since=""/> <fsummary>Set process flag sensitive for the calling process.</fsummary> <desc> <p>Sets or clears flag <c>sensitive</c> for the current process. @@ -6710,6 +6772,18 @@ receive_replies(ReqId, N, Acc) -> <p>Valid <c><anno>InfoTuple</anno></c>s with corresponding <c><anno>Item</anno></c>s:</p> <taglist> + <tag> + <marker id="process_info_async_dist"/> + <c>{async_dist, Enabled}</c> + </tag> + <item> + <p>Since: OTP @OTP-18374@</p> + <p> + Current value of the + <seeerl marker="erlang#process_flag_async_dist"> + <c>async_dist</c></seeerl> process flag. + </p> + </item> <tag><c>{backtrace, <anno>Bin</anno>}</c></tag> <item> <p>Binary <c><anno>Bin</anno></c> contains the same information @@ -7953,6 +8027,21 @@ true</pre> <c>process_flag(message_queue_data, <anno>MQD</anno>)</c></seeerl>.</p> </item> + <tag> + <marker id="spawn_opt_async_dist"/> + <c>{async_dist, Enabled}</c> + </tag> + <item> + <p>Since: OTP @OTP-18374@</p> + <p> + Set the + <seeerl marker="erlang#process_flag_async_dist"> + <c>async_dist</c></seeerl> process flag of the spawned process. + This option will override the default value set by the command + line argument + <seecom marker="erl#+pad"><c>+pad <boolean></c></seecom>. + </p> + </item> </taglist> </desc> </func> @@ -10983,6 +11072,8 @@ Metadata = #{ pid => pid(), </func> <func> + <name name="system_info" arity="1" clause_i="79" + anchor="system_info_async_dist" since="OTP @OTP-18374@"/> <!-- async_dist --> <name name="system_info" arity="1" clause_i="14" anchor="system_info_dist" since=""/> <!-- creation --> <name name="system_info" arity="1" clause_i="16" since="OTP 18.0"/> <!-- delayed_node_table_gc --> @@ -10995,6 +11086,17 @@ Metadata = #{ pid => pid(), <p>Returns information about Erlang Distribution in the current system as specified by <c><anno>Item</anno></c>:</p> <taglist> + <tag><marker id="system_info_async_dist"/><c>async_dist</c></tag> + <item> + <p>Since: OTP @OTP-18374@</p> + <p> + Returns the value of the command line argument + <seecom marker="erl#+pad">+pad <boolean></seecom> + which the runtime system use. This value determines the default + <seeerl marker="erlang#process_flag_async_dist"> + <c>async_dist</c></seeerl> value for newly spawned processes. + </p> + </item> <tag><marker id="system_info_creation"/> <c>creation</c></tag> <item> @@ -11133,7 +11235,7 @@ Metadata = #{ pid => pid(), <!-- <name name="system_info" arity="1" clause_i="76"/> update_cpu_info --> <name name="system_info" arity="1" clause_i="77" since=""/> <!-- version --> <name name="system_info" arity="1" clause_i="78" since=""/> <!-- wordsize --> - <!-- <name name="system_info" arity="1" clause_i="79"/> overview --> + <!-- <name name="system_info" arity="1" clause_i="80"/> overview --> <!-- When adding any entry, make sure to update the overview clause_i --> <fsummary>Information about the system.</fsummary> <desc> @@ -11501,7 +11603,7 @@ Metadata = #{ pid => pid(), <c><anno>MonitorPid</anno></c>. <c>SusPid</c> is the pid that got suspended when sending to <c>Port</c>.</p> </item> - <tag><c>busy_dist_port</c></tag> + <tag><c>busy_dist_port</c><marker id="busy_dist_port"/></tag> <item> <p>If a process in the system gets suspended because it sends to a process on a remote node whose inter-node diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 502697a432..274a8e23d4 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -110,6 +110,7 @@ atom arg0 atom arity atom asn1 atom async +atom async_dist atom asynchronous atom atom atom atom_used diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 24b411633f..8d27d0b080 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1780,7 +1780,21 @@ static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val) BIF_RETTYPE process_flag_2(BIF_ALIST_2) { Eterm old_value; - if (BIF_ARG_1 == am_error_handler) { + + if (BIF_ARG_1 == am_async_dist) { + old_value = (BIF_P->flags & F_ASYNC_DIST) ? am_true : am_false; + if (BIF_ARG_2 == am_false) { + BIF_P->flags &= ~F_ASYNC_DIST; + } + else if (BIF_ARG_2 == am_true) { + BIF_P->flags |= F_ASYNC_DIST; + } + else { + goto error; + } + BIF_RET(old_value); + } + else if (BIF_ARG_1 == am_error_handler) { if (is_not_atom(BIF_ARG_2)) { goto error; } diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c35b8ef08b..6ac22dfab9 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -211,6 +211,37 @@ struct { ErlHeapFragment *bp; } nodedown; +/* + * Dist entry queue flags are only modified while + * the dist entry queue lock is held... + */ +static ERTS_INLINE erts_aint32_t +de_qflags_read(DistEntry *dep) +{ + return erts_atomic32_read_nob(&dep->qflgs); +} + +static ERTS_INLINE erts_aint32_t +de_qflags_read_set(DistEntry *dep, erts_aint32_t set) +{ + erts_aint32_t qflgs, new_qflgs; + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); + new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs); + new_qflgs |= set; + erts_atomic32_set_nob(&dep->qflgs, new_qflgs); + return qflgs; +} + +static ERTS_INLINE erts_aint32_t +de_qflags_read_unset(DistEntry *dep, erts_aint32_t unset) +{ + erts_aint32_t qflgs, new_qflgs; + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); + new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs); + new_qflgs &= ~unset; + erts_atomic32_set_nob(&dep->qflgs, new_qflgs); + return qflgs; +} static void delete_cache(ErtsAtomCache *cache) @@ -250,7 +281,7 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { erts_aint32_t qflgs; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); - qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); + qflgs = de_qflags_read_unset(dep, unset_qflgs); qflgs &= ~unset_qflgs; if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ @@ -1010,14 +1041,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) } if (dep->state == ERTS_DE_STATE_EXITING) { - ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); + ASSERT(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT); } else { ASSERT(dep->state == ERTS_DE_STATE_CONNECTED); dep->state = ERTS_DE_STATE_EXITING; erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); + ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT)); + de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); } @@ -1043,6 +1074,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); erts_mtx_unlock(&dep->qlock); + + erts_atomic32_set_relb(&dep->notify, 0); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; @@ -1144,7 +1177,8 @@ void init_dist(void) static ERTS_INLINE ErtsDistOutputBuf * alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx, - Uint data_size, Uint fragments, Uint vlen) + Uint data_size, Uint fragments, Uint vlen, + int ignore_busy) { int ix; ErtsDistOutputBuf *obuf; @@ -1175,6 +1209,7 @@ alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx, erts_refc_add(&bin->intern.refc, fragments - 1, 1); for (ix = 0; ix < fragments; ix++) { + obuf[ix].ignore_busy = ignore_busy; obuf[ix].bin = bin; obuf[ix].eiov = &ctx->fragment_eiovs[ix]; #ifdef DEBUG @@ -1220,6 +1255,81 @@ size_obuf(ErtsDistOutputBuf *obuf) return sz; } +static ERTS_INLINE void +get_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size) +{ + Sint sz = size_obuf(obuf); + ASSERT(sz >= 0); + *size = sz; + *ignore_size = obuf->ignore_busy ? sz : 0; +} + +static ERTS_INLINE void +add_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size) +{ + Sint sz, isz; + get_obuf_sizes(obuf, &sz, &isz); + *size += sz; + *ignore_size += isz; +} + +static ERTS_INLINE void +subtract_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size) +{ + Sint sz, isz; + get_obuf_sizes(obuf, &sz, &isz); + *size -= sz; + *ignore_size -= isz; +} + +static ERTS_INLINE void +update_qsizes(DistEntry *dep, int *empty_fillp, Sint *qsizep, + Sint add_total_qsize, Sint ignore_qsize) +{ + /* + * All modifications of the 'total_qsize' and 'qsize' fields are + * made while holding the 'qlock', so read/modify/write of each + * field does not need to be atomic. Readers without the lock will + * still see consistent updates of each 'field'. + */ + erts_aint_t qsize, add_qsize; + + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); + + if (empty_fillp) + *empty_fillp = 0; + + if (add_total_qsize) { + qsize = erts_atomic_read_nob(&dep->total_qsize); + qsize += (erts_aint_t) add_total_qsize; + if (empty_fillp && qsize == add_total_qsize) + *empty_fillp = !0; + erts_atomic_set_nob(&dep->total_qsize, (erts_aint_t) qsize); + } + + add_qsize = (erts_aint_t) (add_total_qsize - ignore_qsize); + if (add_qsize) { + qsize = erts_atomic_read_nob(&dep->qsize); + qsize += add_qsize; + if (qsizep) + *qsizep = qsize; + erts_atomic_set_nob(&dep->qsize, (erts_aint_t) qsize); + } + else if (qsizep) { + *qsizep = erts_atomic_read_nob(&dep->qsize); + } + +#ifdef DEBUG + { + erts_aint_t tqsize = erts_atomic_read_nob(&dep->total_qsize); + qsize = erts_atomic_read_nob(&dep->qsize); + ASSERT(tqsize >= 0); + ASSERT(qsize >= 0); + ASSERT(tqsize >= qsize); + } +#endif +} + static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { ErtsDistOutputBuf *obuf; @@ -1250,21 +1360,19 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) { - Sint obufsize = 0; + Sint obufsize = 0, ignore_obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; fobuf = obuf; obuf = obuf->next; - obufsize += size_obuf(fobuf); + add_obuf_sizes(fobuf, &obufsize, &ignore_obufsize); free_dist_obuf(fobuf, !0); } if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); - erts_atomic_add_nob(&dep->qsize, - (erts_aint_t) -obufsize); + update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize); erts_mtx_unlock(&dep->qlock); } } @@ -3031,11 +3139,17 @@ retry: goto fail; } - if (no_suspend && proc) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - res = ERTS_DSIG_PREP_WOULD_SUSPEND; - goto fail; - } + if (!proc || (proc->flags & F_ASYNC_DIST)) { + ctx->ignore_busy = !0; + } + else { + ctx->ignore_busy = 0; + if (no_suspend) { + if (de_qflags_read(dep) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; + goto fail; + } + } } ctx->c_p = proc; @@ -3209,7 +3323,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx) + ((ctx->fragments - 1) * ERTS_DIST_FRAGMENT_HEADER_SIZE), ctx->fragments, - ctx->vlen); + ctx->vlen, + ctx->ignore_busy); ctx->alloced_fragments = ctx->fragments; /* Encode internal version of dist header */ ctx->dhdrp = ctx->extp; @@ -3360,20 +3475,23 @@ erts_dsig_send(ErtsDSigSendContext *ctx) ctx->fragments = 0; } else { - Sint qsize = erts_atomic_read_nob(&dep->qsize); + Sint qsize = (Sint) erts_atomic_read_nob(&dep->qsize); erts_aint32_t qflgs; ErtsProcList *plp = NULL; Eterm notify_proc = NIL; Sint obsz; - int fragments; + int fragments, empty_fill; /* Calculate how many fragments to send. This depends on the available space in the distr queue and the amount of remaining reductions. */ for (fragments = 0, obsz = 0; - fragments < ctx->fragments && - ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) || - ctx->no_trap || ctx->no_suspend); + (fragments < ctx->fragments + && ((ctx->reds > 0 + && (ctx->ignore_busy + || (qsize + obsz < erts_dist_buf_busy_limit))) + || ctx->no_trap + || ctx->no_suspend)); fragments++) { #ifdef DEBUG int reds = 100; @@ -3389,33 +3507,27 @@ erts_dsig_send(ErtsDSigSendContext *ctx) (!ctx->no_trap && !ctx->no_suspend)); erts_mtx_lock(&dep->qlock); - qsize = erts_atomic_add_read_mb(&dep->qsize, (erts_aint_t) obsz); - ASSERT(qsize >= obsz); - qflgs = erts_atomic32_read_nob(&dep->qflgs); - if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { - erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); + update_qsizes(dep, &empty_fill, &qsize, obsz, + ctx->ignore_busy ? obsz : 0); + qflgs = de_qflags_read(dep); + if (!(qflgs & ERTS_DE_QFLG_BUSY) + && qsize >= erts_dist_buf_busy_limit) { + qflgs = de_qflags_read_set(dep, ERTS_DE_QFLG_BUSY); qflgs |= ERTS_DE_QFLG_BUSY; } - if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { - /* Previously empty queue and info requested... */ - qflgs = erts_atomic32_read_band_mb(&dep->qflgs, - ~ERTS_DE_QFLG_REQ_INFO); - if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + if (empty_fill && is_internal_pid(dep->cid)) { + erts_aint32_t notify; + notify = erts_atomic32_xchg_mb(&dep->notify, + (erts_aint32_t) 0); + if (notify) { + /* + * Previously empty queue and notification + * requested... + */ notify_proc = dep->cid; ASSERT(is_internal_pid(notify_proc)); } - /* else: requester will send itself the message... */ - qflgs &= ~ERTS_DE_QFLG_REQ_INFO; } - if (!ctx->no_suspend && (qflgs & ERTS_DE_QFLG_BUSY)) { - erts_mtx_unlock(&dep->qlock); - - plp = erts_proclist_create(ctx->c_p); - - erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); - suspended = 1; - erts_mtx_lock(&dep->qlock); - } ASSERT(fragments < 2 || (get_int64(&((char*)ctx->obuf->eiov->iov[1].iov_base)[10]) @@ -3433,30 +3545,41 @@ erts_dsig_send(ErtsDSigSendContext *ctx) ctx->obuf = &ctx->obuf[fragments]; } - if (!ctx->no_suspend) { - qflgs = erts_atomic32_read_nob(&dep->qflgs); - if (!(qflgs & ERTS_DE_QFLG_BUSY)) { - if (suspended) - resume = 1; /* was busy when we started, but isn't now */ + if ((qflgs & ERTS_DE_QFLG_BUSY) + && !ctx->ignore_busy + && !ctx->no_suspend) { + + erts_mtx_unlock(&dep->qlock); + + plp = erts_proclist_create(ctx->c_p); + + erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); + suspended = 1; + + erts_mtx_lock(&dep->qlock); + + qflgs = de_qflags_read(dep); + if (qflgs & ERTS_DE_QFLG_BUSY) { + /* Enqueue suspended process on dist entry */ + ASSERT(plp); + erts_proclist_store_last(&dep->suspended, plp); + } + else { + resume = 1; /* was busy, but isn't now */ #ifdef USE_VM_PROBES - if (resume && DTRACE_ENABLED(dist_port_not_busy)) { - DTRACE_CHARBUF(port_str, 64); - DTRACE_CHARBUF(remote_str, 64); - - erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), - "%T", cid); - erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", dep->sysname); - DTRACE3(dist_port_not_busy, erts_this_node_sysname, - port_str, remote_str); - } + if (resume && DTRACE_ENABLED(dist_port_not_busy)) { + DTRACE_CHARBUF(port_str, 64); + DTRACE_CHARBUF(remote_str, 64); + + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), + "%T", cid); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), + "%T", dep->sysname); + DTRACE3(dist_port_not_busy, erts_this_node_sysname, + port_str, remote_str); + } #endif - } - else { - /* Enqueue suspended process on dist entry */ - ASSERT(plp); - erts_proclist_store_last(&dep->suspended, plp); - } + } } erts_mtx_unlock(&dep->qlock); @@ -3674,13 +3797,64 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) ? ((Sint) 1) \ : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) +#ifndef DEBUG +#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT) +#else +#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT) \ + dbg_check_dist_qsize((DEP), (PRT)) + +static void +dbg_check_dist_qsize(DistEntry *dep, Port *prt) +{ + int ix; + Sint sz = 0, isz = 0, tqsz, qsz; + ErtsDistOutputBuf *qs[2]; + + ERTS_LC_ASSERT(dep && erts_lc_mtx_is_locked(&dep->qlock)); + ASSERT(prt && erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT((erts_atomic32_read_nob(&prt->sched.flags) + & ERTS_PTS_FLG_EXIT) + || prt->common.id == dep->cid); + + tqsz = erts_atomic_read_nob(&dep->total_qsize); + qsz = erts_atomic_read_nob(&dep->qsize); + + ASSERT(tqsz >= 0); + ASSERT(qsz >= 0); + ASSERT(tqsz >= qsz); + + qs[0] = dep->out_queue.first; + qs[1] = dep->finalized_out_queue.first; + + for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) { + ErtsDistOutputBuf *obuf = qs[ix]; + while (obuf) { + add_obuf_sizes(obuf, &sz, &isz); + obuf = obuf->next; + } + } + + ASSERT(tqsz == sz); + ASSERT(qsz == sz - isz); +} + +#endif + int erts_dist_command(Port *prt, int initial_reds) { Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; enum dist_entry_state state; Uint64 flags; - Sint qsize, obufsize = 0; + /* + * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed + * from the queue which will be updated (in dep->total_qsize and + * dep->qsize) before we return from this function. Note that + * 'obufsize' and 'ignore_obufsize' may be negative if we added to the + * queue size. This may occur since finalization of a buffer may increase + * buffer size. + */ + Sint qsize, obufsize = 0, ignore_obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); @@ -3716,6 +3890,7 @@ erts_dist_command(Port *prt, int initial_reds) */ erts_mtx_lock(&dep->qlock); + ERTS_DBG_CHK_DIST_QSIZE(dep, prt); oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; @@ -3727,23 +3902,6 @@ erts_dist_command(Port *prt, int initial_reds) dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; -#ifdef DEBUG - { - Uint sz = 0; - ErtsDistOutputBuf *curr = oq.first; - while (curr) { - sz += size_obuf(curr); - curr = curr->next; - } - curr = foq.first; - while (curr) { - sz += size_obuf(curr); - curr = curr->next; - } - ASSERT(sz <= erts_atomic_read_nob(&dep->qsize)); - } -#endif - sched_flags = erts_atomic32_read_nob(&prt->sched.flags); if (reds < 0) @@ -3754,7 +3912,7 @@ erts_dist_command(Port *prt, int initial_reds) do { Uint size; ErtsDistOutputBuf *fob; - obufsize += size_obuf(foq.first); + add_obuf_sizes(foq.first, &obufsize, &ignore_obufsize); size = (*send)(prt, foq.first); erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; @@ -3782,9 +3940,9 @@ erts_dist_command(Port *prt, int initial_reds) ob = oq.first; ASSERT(ob); do { - obufsize += size_obuf(ob); + add_obuf_sizes(ob, &obufsize, &ignore_obufsize); reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); - obufsize -= size_obuf(ob); + subtract_obuf_sizes(ob, &obufsize, &ignore_obufsize); if (reds < 0) break; /* finalize needs to be restarted... */ last_finalized = ob; @@ -3822,12 +3980,11 @@ erts_dist_command(Port *prt, int initial_reds) int preempt = 0; while (oq.first && !preempt) { ErtsDistOutputBuf *fob; - Uint size, obsz; - obufsize += size_obuf(oq.first); + Uint size; + add_obuf_sizes(oq.first, &obufsize, &ignore_obufsize); reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); - obsz = size_obuf(oq.first); - obufsize -= obsz; if (reds < 0) { /* finalize needs to be restarted... */ + subtract_obuf_sizes(oq.first, &obufsize, &ignore_obufsize); preempt = 1; break; } @@ -3836,7 +3993,6 @@ erts_dist_command(Port *prt, int initial_reds) esdp->io.out += (Uint64) size; reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = oq.first; - obufsize += obsz; oq.first = oq.first->next; free_dist_obuf(fob, !0); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); @@ -3867,13 +4023,12 @@ erts_dist_command(Port *prt, int initial_reds) * processes. */ erts_mtx_lock(&dep->qlock); - de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); - qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, - (erts_aint_t) -obufsize); - ASSERT(qsize >= 0); - obufsize = 0; + de_busy = !!(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY); + update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize); + obufsize = ignore_obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) - && de_busy && qsize < erts_dist_buf_busy_limit) { + && de_busy + && qsize < erts_dist_buf_busy_limit) { int resumed; ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); @@ -3889,17 +4044,7 @@ erts_dist_command(Port *prt, int initial_reds) done: - if (obufsize != 0) { - erts_mtx_lock(&dep->qlock); -#ifdef DEBUG - qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, - (erts_aint_t) -obufsize); - ASSERT(qsize >= 0); -#else - erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); -#endif - erts_mtx_unlock(&dep->qlock); - } + ASSERT(!ignore_obufsize || obufsize); ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); @@ -3910,7 +4055,21 @@ erts_dist_command(Port *prt, int initial_reds) dep->finalized_out_queue.last = foq.last; } - /* Avoid wrapping reduction counter... */ + if (obufsize != 0) { + erts_mtx_lock(&dep->qlock); + update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize); + ERTS_DBG_CHK_DIST_QSIZE(dep, prt); + erts_mtx_unlock(&dep->qlock); + } +#ifdef DEBUG + else { + erts_mtx_lock(&dep->qlock); + ERTS_DBG_CHK_DIST_QSIZE(dep, prt); + erts_mtx_unlock(&dep->qlock); + } +#endif + + /* Avoid wrapping reduction counter... */ if (reds < INT_MIN/2) reds = INT_MIN/2; @@ -3940,7 +4099,7 @@ erts_dist_command(Port *prt, int initial_reds) while (oq.first) { ErtsDistOutputBuf *fob = oq.first; oq.first = oq.first->next; - obufsize += size_obuf(fob); + add_obuf_sizes(fob, &obufsize, &ignore_obufsize); free_dist_obuf(fob, !0); } @@ -3949,14 +4108,15 @@ erts_dist_command(Port *prt, int initial_reds) } else { if (oq.first) { + erts_mtx_lock(&dep->qlock); + update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize); + obufsize = ignore_obufsize = 0; + /* - * Unhandle buffers need to be put back first + * Unhandled buffers need to be put back first * in out_queue. */ - erts_mtx_lock(&dep->qlock); - erts_atomic_add_nob(&dep->qsize, -obufsize); - obufsize = 0; - oq.last->next = dep->out_queue.first; + oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; if (!dep->out_queue.last) dep->out_queue.last = oq.last; @@ -3972,7 +4132,7 @@ BIF_RETTYPE dist_ctrl_get_data_notification_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - erts_aint32_t qflgs; + erts_aint32_t notify; erts_aint_t qsize; Eterm receiver = NIL; Uint32 conn_id; @@ -3985,7 +4145,7 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) /* * Caller is the only one that can consume from this queue - * and the only one that can set the req-info flag... + * and the only one that can set the notify field... */ erts_de_rlock(dep); @@ -3997,23 +4157,21 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) ASSERT(dep->cid == BIF_P->common.id); - qflgs = erts_atomic32_read_nob(&dep->qflgs); + notify = erts_atomic32_read_nob(&dep->notify); - if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + if (!notify) { ERTS_THR_READ_MEMORY_BARRIER; - qsize = erts_atomic_read_nob(&dep->qsize); + qsize = erts_atomic_read_nob(&dep->total_qsize); ASSERT(qsize >= 0); if (qsize > 0) receiver = BIF_P->common.id; /* Notify ourselves... */ - else { /* Empty queue; set req-info flag... */ - qflgs = erts_atomic32_read_bor_mb(&dep->qflgs, - ERTS_DE_QFLG_REQ_INFO); - qsize = erts_atomic_read_nob(&dep->qsize); + else { /* Empty queue; set the notify field... */ + notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) !0); + qsize = erts_atomic_read_nob(&dep->total_qsize); ASSERT(qsize >= 0); if (qsize > 0) { - qflgs = erts_atomic32_read_band_mb(&dep->qflgs, - ~ERTS_DE_QFLG_REQ_INFO); - if (qflgs & ERTS_DE_QFLG_REQ_INFO) + notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) 0); + if (notify) receiver = BIF_P->common.id; /* Notify ourselves... */ /* else: someone else will notify us... */ } @@ -4201,7 +4359,7 @@ dist_get_stat_1(BIF_ALIST_1) } read = (Sint64) erts_atomic64_read_nob(&dep->in); write = (Sint64) erts_atomic64_read_nob(&dep->out); - pend = (Sint64) erts_atomic_read_nob(&dep->qsize); + pend = (Sint64) erts_atomic_read_nob(&dep->total_qsize); erts_de_runlock(dep); @@ -4256,10 +4414,19 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); - Sint reds = initial_reds, obufsize = 0, ix, vlen; + Sint reds = initial_reds, ix, vlen; + /* + * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed + * from the queue which will be updated (in dep->total_qsize and + * dep->qsize) before we return from this function. Note that + * 'obufsize' and 'ignore_obufsize' may be negative if we added to the + * queue size. This may occur since finalization of a buffer may increase + * buffer size. + */ + Sint obufsize = 0, ignore_obufsize = 0; ErtsDistOutputBuf *obuf; Eterm *hp, res; - erts_aint_t qsize; + Sint qsize; Uint32 conn_id, get_size; Uint hsz = 0, data_sz; SysIOVec *iov; @@ -4298,7 +4465,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); - qsize = erts_atomic_read_acqb(&dep->qsize); + qsize = (Sint) erts_atomic_read_acqb(&dep->total_qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); dep->tmp_out_queue.first = dep->out_queue.first; @@ -4317,13 +4484,16 @@ dist_ctrl_get_data_1(BIF_ALIST_1) } obuf = dep->tmp_out_queue.first; - obufsize += size_obuf(obuf); + add_obuf_sizes(obuf, &obufsize, &ignore_obufsize); reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->dflags, reds); - obufsize -= size_obuf(obuf); + subtract_obuf_sizes(obuf, &obufsize, &ignore_obufsize); if (reds < 0) { /* finalize needs to be restarted... */ erts_de_runlock(dep); - if (obufsize) - erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); + if (obufsize) { + erts_mtx_lock(&dep->qlock); + update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize); + erts_mtx_unlock(&dep->qlock); + } ERTS_BIF_YIELD1(BIF_TRAP_EXPORT(BIF_dist_ctrl_get_data_1), BIF_P, BIF_ARG_1); } @@ -4404,16 +4574,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) hp += 2; } - obufsize += size_obuf(obuf); + add_obuf_sizes(obuf, &obufsize, &ignore_obufsize); - qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize); + erts_mtx_lock(&dep->qlock); - ASSERT(qsize >= 0); + update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize); - if (qsize < erts_dist_buf_busy_limit/2 - && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + if (qsize >= erts_dist_buf_busy_limit/2 + || !(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY)) { + erts_mtx_unlock(&dep->qlock); + } + else { ErtsProcList *resume_procs = NULL; - erts_mtx_lock(&dep->qlock); resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); if (resume_procs) { @@ -4468,8 +4640,8 @@ static void kill_connection(DistEntry *dep) dep->state = ERTS_DE_STATE_EXITING; erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT)); + de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); if (is_internal_port(dep->cid)) @@ -5069,7 +5241,6 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, Process *net_kernel) { Eterm notify_proc = NIL; - erts_aint32_t qflgs; ErtsProcLocks nk_locks; int success = 0; @@ -5105,17 +5276,18 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, erts_set_dist_entry_connected(dep, ctrlr, flags); notify_proc = NIL; - if (erts_atomic_read_nob(&dep->qsize)) { + if (erts_atomic_read_nob(&dep->total_qsize)) { if (is_internal_port(dep->cid)) { erts_schedule_dist_command(NULL, dep); } else { + erts_aint32_t notify; ERTS_THR_READ_MEMORY_BARRIER; - qflgs = erts_atomic32_read_nob(&dep->qflgs); - if (qflgs & ERTS_DE_QFLG_REQ_INFO) { - qflgs = erts_atomic32_read_band_mb(&dep->qflgs, - ~ERTS_DE_QFLG_REQ_INFO); - if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify = erts_atomic32_read_nob(&dep->notify); + if (notify) { + notify = erts_atomic32_xchg_mb(&dep->notify, + (erts_aint32_t) 0); + if (notify) { notify_proc = dep->cid; ASSERT(is_internal_pid(notify_proc)); } @@ -5335,6 +5507,7 @@ Sint erts_abort_pending_connection_rwunlock(DistEntry* dep, ASSERT(!dep->finalized_out_queue.first); resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); erts_mtx_unlock(&dep->qlock); + erts_atomic32_set_relb(&dep->notify, 0); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 372cdfc1a7..a5365fcb99 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -344,6 +344,7 @@ typedef struct erts_dsig_send_context { int connect; int no_suspend; int no_trap; + int ignore_busy; Eterm ctl; Eterm msg; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 35dfdd7983..b7642f2212 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -777,6 +777,7 @@ collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp, Sint reds) #define ERTS_PI_IX_MAGIC_REF 34 #define ERTS_PI_IX_FULLSWEEP_AFTER 35 #define ERTS_PI_IX_PARENT 36 +#define ERTS_PI_IX_ASYNC_DIST 37 #define ERTS_PI_FLAG_SINGELTON (1 << 0) #define ERTS_PI_FLAG_ALWAYS_WRAP (1 << 1) @@ -833,7 +834,8 @@ static ErtsProcessInfoArgs pi_args[] = { {am_garbage_collection_info, ERTS_PROCESS_GC_INFO_MAX_SIZE, 0, ERTS_PROC_LOCK_MAIN}, {am_magic_ref, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_fullsweep_after, 0, 0, ERTS_PROC_LOCK_MAIN}, - {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN} + {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN}, + {am_async_dist, 0, 0, ERTS_PROC_LOCK_MAIN} }; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(pi_args[0]))) @@ -954,6 +956,8 @@ pi_arg2ix(Eterm arg) return ERTS_PI_IX_FULLSWEEP_AFTER; case am_parent: return ERTS_PI_IX_PARENT; + case am_async_dist: + return ERTS_PI_IX_ASYNC_DIST; default: return -1; } @@ -2129,6 +2133,10 @@ process_info_aux(Process *c_p, } break; + case ERTS_PI_IX_ASYNC_DIST: + res = (rp->flags & F_ASYNC_DIST) ? am_true : am_false; + break; + case ERTS_PI_IX_MAGIC_REF: { Uint sz = 0; (void) bld_magic_ref_bin_list(NULL, &sz, &MSO(rp)); @@ -2785,6 +2793,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) res = new_binary(BIF_P, (byte *) dsbufp->str, dsbufp->str_len); erts_destroy_info_dsbuf(dsbufp); BIF_RET(res); + } else if (am_async_dist == BIF_ARG_1) { + BIF_RET((erts_default_spo_flags & SPO_ASYNC_DIST) + ? am_true + : am_false); } else if (ERTS_IS_ATOM_STR("dist_ctrl", BIF_ARG_1)) { DistEntry *dep; i = 0; diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index a4f6a1b723..577657bbb2 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -664,6 +664,7 @@ void erts_usage(void) erts_fprintf(stderr, "\n"); erts_fprintf(stderr, "-pc <set> control what characters are considered printable (default latin1)\n"); + erts_fprintf(stderr, "-pad bool set default process async data (default false)\n"); erts_fprintf(stderr, "-P number set maximum number of processes on this node;\n"); erts_fprintf(stderr, " valid range is [%d-%d]\n", ERTS_MIN_PROCESSES, ERTS_MAX_PROCESSES); @@ -1418,11 +1419,28 @@ erl_start(int argc, char **argv) erts_usage(); } erts_set_printable_characters(printable_chars); - break; - } else { - erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]); - erts_usage(); } + else { + char *sub_param = argv[i]+2; + if (has_prefix("ad", sub_param)) { + arg = get_arg(sub_param+2, argv[i+1], &i); + if (sys_strcmp("true", arg) == 0) { + erts_default_spo_flags |= SPO_ASYNC_DIST; + } + else if (sys_strcmp("false", arg) == 0) { + erts_default_spo_flags &= ~SPO_ASYNC_DIST; + } + else { + erts_fprintf(stderr, "bad async dist value %s\n", arg); + erts_usage(); + } + } + else { + erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]); + erts_usage(); + } + } + break; case 'f': if (!sys_strncmp(argv[i],"-fn",3)) { int warning_type = ERL_FILENAME_WARNING_WARNING; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 23a19064b1..2bec8ff20e 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -191,6 +191,8 @@ dist_table_alloc(void *dep_tmpl) erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname, ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); erts_atomic32_init_nob(&dep->qflgs, 0); + erts_atomic32_init_nob(&dep->notify, 0); + erts_atomic_init_nob(&dep->total_qsize, 0); erts_atomic_init_nob(&dep->qsize, 0); erts_atomic64_init_nob(&dep->in, 0); erts_atomic64_init_nob(&dep->out, 0); @@ -729,8 +731,6 @@ erts_set_dist_entry_pending(DistEntry *dep) void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags) { - erts_aint32_t set_qflgs; - ASSERT(dep->mld); ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); @@ -767,9 +767,6 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags) erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); - set_qflgs = (is_internal_port(cid) ? - ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL); - erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs); if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index f8c448de3c..9109c50e1d 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -77,15 +77,9 @@ enum dist_entry_state { #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) -#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2) -#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3) -#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4) #define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \ - | ERTS_DE_QFLG_EXIT \ - | ERTS_DE_QFLG_REQ_INFO \ - | ERTS_DE_QFLG_PORT_CTRL \ - | ERTS_DE_QFLG_PROC_CTRL) + | ERTS_DE_QFLG_EXIT) #if defined(ARCH_64) #define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL) @@ -105,6 +99,7 @@ struct ErtsDistOutputBuf_ { * iov[2 ... vsize-1] data */ ErlIOVec *eiov; + int ignore_busy; }; struct ErtsDistOutputBufsContainer_ { @@ -153,7 +148,9 @@ struct dist_entry_ { erts_mtx_t qlock; /* Protects qflgs and out_queue */ erts_atomic32_t qflgs; - erts_atomic_t qsize; + erts_atomic32_t notify; /* User wants queue notification? */ + erts_atomic_t qsize; /* Size of data in queue respecting busy dist */ + erts_atomic_t total_qsize; /* Total size of data in queue */ erts_atomic64_t in; erts_atomic64_t out; ErtsDistOutputQueue out_queue; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 1105f6c52d..31740bae08 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12042,6 +12042,22 @@ erts_parse_spawn_opts(ErlSpawnOpts *sop, Eterm opts_list, Eterm *tag, sop->priority = PRIORITY_LOW; else result = -1; + } else if (arg == am_async_dist) { + if (val == am_true) { + if (sop->flags & SPO_ASYNC_DIST) + sop->multi_set = !0; + else + sop->flags |= SPO_ASYNC_DIST; + } + else if (val == am_false) { + if (!(sop->flags & SPO_ASYNC_DIST)) + sop->multi_set = !0; + else + sop->flags &= ~SPO_ASYNC_DIST; + } + else { + result = -1; + } } else if (arg == am_message_queue_data) { if (sop->flags & (SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ)) sop->multi_set = !0; @@ -12279,6 +12295,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). /* Reserve place for continuation pointer, redzone, etc */ heap_need = arg_size + S_RESERVED; + if (so->flags & SPO_ASYNC_DIST) + flags |= F_ASYNC_DIST; + p->flags = flags; p->sig_qs.flags = qs_flags; diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 3f1f937052..bd281a2f65 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1413,8 +1413,9 @@ void erts_check_for_holes(Process* p); #define SPO_IX_ASYNC 11 #define SPO_IX_NO_SMSG 12 #define SPO_IX_NO_EMSG 13 +#define SPO_IX_ASYNC_DIST 14 -#define SPO_NO_INDICES (SPO_IX_ASYNC+1) +#define SPO_NO_INDICES (SPO_IX_ASYNC_DIST+1) #define SPO_LINK (1 << SPO_IX_LINK) #define SPO_MONITOR (1 << SPO_IX_MONITOR) @@ -1430,8 +1431,9 @@ void erts_check_for_holes(Process* p); #define SPO_ASYNC (1 << SPO_IX_ASYNC) #define SPO_NO_SMSG (1 << SPO_IX_NO_SMSG) #define SPO_NO_EMSG (1 << SPO_IX_NO_EMSG) +#define SPO_ASYNC_DIST (1 << SPO_IX_ASYNC_DIST) -#define SPO_MAX_FLAG SPO_NO_EMSG +#define SPO_MAX_FLAG SPO_ASYNC_DIST #define SPO_USE_ARGS \ (SPO_MIN_HEAP_SIZE \ @@ -1573,6 +1575,7 @@ extern int erts_system_profile_ts_type; #define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */ #define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */ #define F_DIRTY_CHECK_CLA (1 << 25) /* Check if copy literal area GC scheduled */ +#define F_ASYNC_DIST (1 << 26) /* Truly asynchronous distribution */ /* Signal queue flags */ #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index edab0be20b..c1a31c6abf 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -81,7 +81,10 @@ huge_iovec/1, is_alive/1, dyn_node_name_monitor_node/1, - dyn_node_name_monitor/1]). + dyn_node_name_monitor/1, + async_dist_flag/1, + async_dist_port_dctrlr/1, + async_dist_proc_dctrlr/1]). %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, @@ -114,7 +117,8 @@ all() -> dist_entry_refc_race, start_epmd_false, no_epmd, epmd_module, system_limit, hopefull_data_encoding, hopefull_export_fun_bug, - huge_iovec, is_alive, dyn_node_name_monitor_node, dyn_node_name_monitor]. + huge_iovec, is_alive, dyn_node_name_monitor_node, dyn_node_name_monitor, + {group, async_dist}]. groups() -> [{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]}, @@ -133,7 +137,11 @@ groups() -> [message_latency_large_message, message_latency_large_link_exit, message_latency_large_monitor_exit, - message_latency_large_exit2]} + message_latency_large_exit2]}, + {async_dist, [], + [async_dist_flag, + async_dist_port_dctrlr, + async_dist_proc_dctrlr]} ]. init_per_suite(Config) -> @@ -3320,6 +3328,272 @@ dyn_node_name_monitor_test(StartOpts, TestNode) -> end, ok. +async_dist_flag(Config) when is_list(Config) -> + {ok, Peer1, Node1} = ?CT_PEER(), + async_dist_flag_test(Node1, false), + peer:stop(Peer1), + {ok, Peer2, Node2} = ?CT_PEER(["+pad", "false"]), + async_dist_flag_test(Node2, false), + peer:stop(Peer2), + {ok, Peer3, Node3} = ?CT_PEER(["+pad", "true", "+pad", "false"]), + async_dist_flag_test(Node3, false), + peer:stop(Peer3), + + {ok, Peer4, Node4} = ?CT_PEER(["+pad", "true"]), + async_dist_flag_test(Node4, true), + peer:stop(Peer4), + {ok, Peer5, Node5} = ?CT_PEER(["+pad", "false", "+pad", "true"]), + async_dist_flag_test(Node5, true), + peer:stop(Peer5), + + ok. + +async_dist_flag_test(Node, Default) when is_atom(Node), is_boolean(Default) -> + Tester = self(), + NotDefault = not Default, + + Default = erpc:call(Node, erlang, system_info, [async_dist]), + + {P1, M1} = spawn_opt(Node, fun () -> + receive after infinity -> ok end + end, [link, monitor]), + {P2, M2} = spawn_opt(Node, fun () -> + receive after infinity -> ok end + end, [link, monitor, {async_dist, false}]), + {P3, M3} = spawn_opt(Node, fun () -> + receive after infinity -> ok end + end, [link, monitor, {async_dist, true}]), + {async_dist, Default} = erpc:call(Node, erlang, process_info, [P1, async_dist]), + {async_dist, false} = erpc:call(Node, erlang, process_info, [P2, async_dist]), + {async_dist, true} = erpc:call(Node, erlang, process_info, [P3, async_dist]), + + R4 = make_ref(), + {P4, M4} = spawn_opt(Node, fun () -> + Default = process_flag(async_dist, NotDefault), + Tester ! R4, + receive after infinity -> ok end + end, [link, monitor]), + + R5 = make_ref(), + {P5, M5} = spawn_opt(Node, fun () -> + false = process_flag(async_dist, true), + Tester ! R5, + receive after infinity -> ok end + end, [link, monitor, {async_dist, false}]), + R6 = make_ref(), + {P6, M6} = spawn_opt(Node, fun () -> + true = process_flag(async_dist, false), + Tester ! R6, + receive after infinity -> ok end + end, [link, monitor, {async_dist, true}]), + receive R4 -> ok end, + {async_dist, NotDefault} = erpc:call(Node, erlang, process_info, [P4, async_dist]), + receive R5 -> ok end, + {async_dist, true} = erpc:call(Node, erlang, process_info, [P5, async_dist]), + receive R6 -> ok end, + {async_dist, false} = erpc:call(Node, erlang, process_info, [P6, async_dist]), + + + R7 = make_ref(), + {P7, M7} = spawn_opt(Node, fun () -> + Default = process_flag(async_dist, NotDefault), + NotDefault = process_flag(async_dist, NotDefault), + NotDefault = process_flag(async_dist, NotDefault), + NotDefault = process_flag(async_dist, Default), + Default = process_flag(async_dist, Default), + Default = process_flag(async_dist, Default), + Tester ! R7, + receive after infinity -> ok end + end, [link, monitor]), + receive R7 -> ok end, + + unlink(P1), + exit(P1, bang), + unlink(P2), + exit(P2, bang), + unlink(P3), + exit(P3, bang), + unlink(P4), + exit(P4, bang), + unlink(P5), + exit(P5, bang), + unlink(P6), + exit(P6, bang), + unlink(P7), + exit(P7, bang), + + receive {'DOWN', M1, process, P1, bang} -> ok end, + receive {'DOWN', M2, process, P2, bang} -> ok end, + receive {'DOWN', M3, process, P3, bang} -> ok end, + receive {'DOWN', M4, process, P4, bang} -> ok end, + receive {'DOWN', M5, process, P5, bang} -> ok end, + receive {'DOWN', M6, process, P6, bang} -> ok end, + receive {'DOWN', M7, process, P7, bang} -> ok end, + + ok. + +async_dist_port_dctrlr(Config) when is_list(Config) -> + {ok, RecvPeer, RecvNode} = ?CT_PEER(), + ok = async_dist_test(RecvNode), + peer:stop(RecvPeer), + ok. + +async_dist_proc_dctrlr(Config) when is_list(Config) -> + {ok, SendPeer, SendNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), + {ok, RecvPeer, RecvNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), + {Pid, Mon} = spawn_monitor(SendNode, + fun () -> + ok = async_dist_test(RecvNode), + exit(test_success) + end), + receive + {'DOWN', Mon, process, Pid, Reason} -> + test_success = Reason + end, + peer:stop(SendPeer), + peer:stop(RecvPeer), + ok. + +async_dist_test(Node) -> + Scale = case round(test_server:timetrap_scale_factor()/3) of + S when S < 1 -> 1; + S -> S + end, + _ = process_flag(async_dist, false), + Tester = self(), + AliveReceiver1 = spawn_link(Node, fun () -> + register(alive_receiver_1, self()), + Tester ! {registered, self()}, + receive after infinity -> ok end + end), + receive {registered, AliveReceiver1} -> ok end, + AliveReceiver2 = spawn(Node, fun () -> receive after infinity -> ok end end), + {AliveReceiver3, AR3Mon} = spawn_monitor(Node, + fun () -> + receive after infinity -> ok end + end), + {DeadReceiver, DRMon} = spawn_monitor(Node, fun () -> ok end), + receive + {'DOWN', DRMon, process, DeadReceiver, DRReason} -> + normal = DRReason + end, + Data = lists:duplicate($x, 256), + GoNuts = fun GN () -> + DeadReceiver ! hello, + GN() + end, + erpc:call(Node, erts_debug, set_internal_state, [available_internal_state, true]), + erpc:cast(Node, erts_debug, set_internal_state, [block, 4000*Scale]), + DistBufFiller = spawn_link(fun () -> + process_flag(async_dist, false), + receive go_nuts -> ok end, + GoNuts() + end), + BDMon = spawn_link(fun SysMon () -> + receive + {monitor, Pid, busy_dist_port, _} -> + Tester ! {busy_dist_port, Pid} + end, + SysMon() + end), + _ = erlang:system_monitor(BDMon, [busy_dist_port]), + DistBufFiller ! go_nuts, + + %% Busy dist entry may release after it has triggered even + %% though noone is consuming anything at the receiving end. + %% Continue banging until we stop getting new busy_dist_port... + WaitFilled = fun WF (Tmo) -> + receive + {busy_dist_port, DistBufFiller} -> + WF(1000*Scale) + after + Tmo -> + ok + end + end, + WaitFilled(infinity), + + BusyDistChecker = spawn_link(fun () -> + process_flag(async_dist, false), + DeadReceiver ! hello, + exit(unexpected_return_from_bang) + end), + receive {busy_dist_port, BusyDistChecker} -> ok end, + {async_dist, false} = process_info(self(), async_dist), + {async_dist, false} = process_info(BusyDistChecker, async_dist), + false = process_flag(async_dist, true), + {async_dist, true} = process_info(self(), async_dist), + + Start = erlang:monotonic_time(millisecond), + M1 = erlang:monitor(process, AliveReceiver1), + true = is_reference(M1), + M2 = erlang:monitor(process, AliveReceiver2), + true = is_reference(M2), + {pid, Data} = AliveReceiver1 ! {pid, Data}, + {reg_name, Data} = {alive_receiver_1, Node} ! {reg_name, Data}, + true = erlang:demonitor(M1), + true = link(AliveReceiver2), + true = unlink(AliveReceiver1), + RId = spawn_request(Node, fun () -> receive bye -> ok end end, [link, monitor]), + true = is_reference(RId), + erlang:group_leader(self(), AliveReceiver2), + AR3XReason = make_ref(), + true = exit(AliveReceiver3, AR3XReason), + End = erlang:monotonic_time(millisecond), + + %% These signals should have been buffered immediately. Make sure + %% it did not take a long time... + true = 500*Scale >= End - Start, + + receive after 500*Scale -> ok end, + + unlink(BusyDistChecker), + exit(BusyDistChecker, bang), + false = is_process_alive(BusyDistChecker), + + unlink(DistBufFiller), + exit(DistBufFiller, bang), + false = is_process_alive(DistBufFiller), + + %% Verify that the signals eventually get trough when the other + %% node continue to work... + {links, []} + = erpc:call(Node, erlang, process_info, [AliveReceiver1, links]), + {links, [Tester]} + = erpc:call(Node, erlang, process_info, [AliveReceiver2, links]), + {monitored_by, []} + = erpc:call(Node, erlang, process_info, [AliveReceiver1, monitored_by]), + {monitored_by, [Tester]} + = erpc:call(Node, erlang, process_info, [AliveReceiver2, monitored_by]), + {messages, [{pid, Data}, {reg_name, Data}]} + = erpc:call(Node, erlang, process_info, [AliveReceiver1, messages]), + {group_leader, Tester} + = erpc:call(Node, erlang, process_info, [AliveReceiver2, group_leader]), + + Spawned = receive + {spawn_reply, RId, SpawnRes, Pid} -> + ok = SpawnRes, + true = is_pid(Pid), + Pid + end, + {links, [Tester]} + = erpc:call(Node, erlang, process_info, [Spawned, links]), + {monitored_by, [Tester]} + = erpc:call(Node, erlang, process_info, [Spawned, monitored_by]), + + receive + {'DOWN', AR3Mon, process, AliveReceiver3, ActualAR3XReason} -> + AR3XReason = ActualAR3XReason + end, + + unlink(AliveReceiver2), + unlink(Spawned), + + true = process_flag(async_dist, false), + {async_dist, false} = process_info(self(), async_dist), + + ok. + %%% Utilities wait_until(Fun) -> diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index f8e7fb09d7..c9a662e70f 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -1006,8 +1006,10 @@ int main(int argc, char **argv) } break; case 'p': - if (argv[i][2] != 'c' || argv[i][3] != '\0') + if (!(argv[i][2] == 'c' && argv[i][3] == '\0') + && !(argv[i][2] == 'a' && argv[i][3] == 'd' && argv[i][4] == '\0')) { goto the_default; + } NEXT_ARG_CHECK(); argv[i][0] = '-'; add_Eargs(argv[i]); diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex eb5b3a72e7..1a2a0de8c7 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index b650eb70aa..6ac524528d 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2444,7 +2444,10 @@ open_port(PortName, PortSettings) -> -type message_queue_data() :: off_heap | on_heap. --spec process_flag(trap_exit, Boolean) -> OldBoolean when +-spec process_flag(async_dist, Boolean) -> OldBoolean when + Boolean :: boolean(), + OldBoolean :: boolean(); + (trap_exit, Boolean) -> OldBoolean when Boolean :: boolean(), OldBoolean :: boolean(); (error_handler, Module) -> OldModule when @@ -2482,6 +2485,7 @@ process_flag(_Flag, _Value) -> erlang:nif_error(undefined). -type process_info_item() :: + async_dist | backtrace | binary | catchlevel | @@ -2518,6 +2522,7 @@ process_flag(_Flag, _Value) -> trap_exit. -type process_info_result_item() :: + {async_dist, Enabled :: boolean()} | {backtrace, Bin :: binary()} | {binary, BinInfo :: [{non_neg_integer(), non_neg_integer(), @@ -2969,6 +2974,7 @@ tuple_to_list(_Tuple) -> (update_cpu_info) -> changed | unchanged; (version) -> string(); (wordsize | {wordsize, internal} | {wordsize, external}) -> 4 | 8; + (async_dist) -> boolean(); (overview) -> boolean(); %% Deliberately left undocumented (sequential_tracer) -> {sequential_tracer, pid() | port() | {module(),term()} | false}. @@ -3098,7 +3104,8 @@ spawn_monitor(M, F, A) -> | {min_heap_size, Size :: non_neg_integer()} | {min_bin_vheap_size, VSize :: non_neg_integer()} | {max_heap_size, Size :: max_heap_size()} - | {message_queue_data, MQD :: message_queue_data()}. + | {message_queue_data, MQD :: message_queue_data()} + | {async_dist, Enabled :: boolean()}. -spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when Fun :: function(), |