summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2023-01-18 13:50:12 +0100
committerRickard Green <rickard@erlang.org>2023-01-18 13:50:12 +0100
commit3cc642374b340aa1509273b37b6725fd7f25f927 (patch)
treea0db6dbb52427a647a1170be66ea198860eedb0e
parent17c4460633931e061c769dfb5babc9a30e8b4ae9 (diff)
parent62df05e2eb123d94c594de77014526648b63de2e (diff)
downloaderlang-3cc642374b340aa1509273b37b6725fd7f25f927.tar.gz
Merge branch 'maint'
* maint: [erts] Support for truly asynchronous distributed signaling
-rw-r--r--erts/doc/src/erl_cmd.xml22
-rw-r--r--erts/doc/src/erlang.xml124
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/bif.c16
-rw-r--r--erts/emulator/beam/dist.c471
-rw-r--r--erts/emulator/beam/dist.h1
-rw-r--r--erts/emulator/beam/erl_bif_info.c14
-rw-r--r--erts/emulator/beam/erl_init.c26
-rw-r--r--erts/emulator/beam/erl_node_tables.c7
-rw-r--r--erts/emulator/beam/erl_node_tables.h13
-rw-r--r--erts/emulator/beam/erl_process.c19
-rw-r--r--erts/emulator/beam/erl_process.h7
-rw-r--r--erts/emulator/test/distribution_SUITE.erl280
-rw-r--r--erts/etc/common/erlexec.c4
-rw-r--r--erts/preloaded/ebin/erlang.beambin132912 -> 133052 bytes
-rw-r--r--erts/preloaded/src/erlang.erl11
16 files changed, 829 insertions, 187 deletions
diff --git a/erts/doc/src/erl_cmd.xml b/erts/doc/src/erl_cmd.xml
index b56caee569..9069061ae4 100644
--- a/erts/doc/src/erl_cmd.xml
+++ b/erts/doc/src/erl_cmd.xml
@@ -1048,6 +1048,23 @@ $ <input>erl \
<p>Memory allocator-specific flags. For more information, see
<seecref marker="erts_alloc"><c>erts_alloc(3)</c></seecref>.</p>
</item>
+ <tag><marker id="+pad"/><c>+pad true|false</c></tag>
+ <item>
+ <p>Since: OTP @OTP-18374@</p>
+ <p>
+ The boolean value used with the <c>+pad</c> parameter determines
+ the default value of the
+ <seeerl marker="erlang#process_flag_async_dist">
+ <c>async_dist</c></seeerl> process flag of newly spawned processes.
+ By default, if no <c>+pad</c> command line option is passed,
+ the <c>async_dist</c> flag will be set to <c>false</c>.
+ </p>
+ <p>
+ The value used in runtime can be inspected by calling
+ <seeerl marker="erlang#system_info_async_dist">
+ <c>erlang:system_info(async_dist)</c></seeerl>.
+ </p>
+ </item>
<tag><marker id="+pc"/><marker id="printable_character_range"/>
<c><![CDATA[+pc Range]]></c></tag>
<item>
@@ -1793,6 +1810,11 @@ $ <input>erl \
limit is per distribution channel. A higher limit
gives lower latency and higher throughput at the expense
of higher memory use.</p>
+ <p>
+ This limit only affects processes that have disabled
+ <seeerl marker="erlang#process_flag_async_dist"><i>fully
+ asynchronous distributed signaling</i></seeerl>.
+ </p>
</item>
<tag><marker id="+zdntgc"/><c>+zdntgc time</c></tag>
<item>
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml
index 27c72ce8b2..d409905ba3 100644
--- a/erts/doc/src/erlang.xml
+++ b/erts/doc/src/erlang.xml
@@ -6276,6 +6276,68 @@ receive_replies(ReqId, N, Acc) ->
<func>
<name name="process_flag" arity="2" clause_i="1"
+ anchor="process_flag_async_dist" since="OTP @OTP-18374@"/>
+ <fsummary>
+ Enable or disable fully asynchronous distributed signaling
+ for the calling process.
+ </fsummary>
+ <desc>
+ <p>
+ Enable or disable <i>fully asynchronous distributed signaling</i>
+ for the calling process. When disabled, which is the default, the
+ process sending a distributed signal will block in the send
+ operation if the buffer for the distribution channel reach the
+ <seecom marker="erts:erl#+zdbbl">distribution buffer busy
+ limit</seecom>. The process will remain blocked until the buffer
+ shrinks enough. This might in some cases take a substantial amount
+ of time. When <c>async_dist</c> is enabled, send operations of
+ distributed signals will always buffer the signal on the outgoing
+ distribution channel and then immediately return. That is, these
+ send operations will <em>never</em> block the sending process.
+ </p>
+ <note><p>
+ Since no flow control is enforced by the runtime system when
+ <c>async_dist</c> process flag is enabled, you need to make sure
+ that flow control for such data is implemented, or that the amount
+ of such data is known to always be limited. Unlimited signaling with
+ <c>async_dist</c> enabled in the absence of flow control will
+ typically cause the sending runtime system to crash on an out of
+ memory condition.
+ </p></note>
+ <p>
+ Blocking due to disabled <c>async_dist</c> can be monitored by
+ <seemfa marker="#system_monitor/2"><c>erlang:system_montor()</c></seemfa>
+ using the
+ <seeerl marker="#busy_dist_port"><c>busy_dist_port</c></seeerl>
+ option. Only data buffered by processes which (at the time of sending
+ a signal) have disabled <c>async_dist</c> will be counted when
+ determining whether or not an operation should block the caller.
+ </p>
+ <p>
+ The <c>async_dist</c> flag can also be set on a new process when
+ spawning it using the
+ <seemfa marker="#spawn_opt/4"><c>spawn_opt()</c></seemfa> BIF with the
+ option <seeerl marker="#spawn_opt_async_dist"><c>{async_dist,
+ Enable}</c></seeerl>. The default <c>async_dist</c> flag to use on
+ newly spawned processes can be set by passing the command line
+ argument <seecom marker="erl#+pad"><c>+pad
+ &lt;boolean&gt;</c></seecom> when starting the runtime system. If the
+ <c>+pad &lt;boolean&gt;</c> command line argument is not passed, the
+ default value of the <c>async_dist</c> flag will be <c>false</c>.
+ </p>
+ <p>
+ You can inspect the state of the <c>async_dist</c> process flag of a
+ process by calling <seeerl marker="#process_info_async_dist">
+ <c>process_info(Pid, async_dist)</c></seeerl>.
+ </p>
+ <p>
+ Returns the old value of the <c>async_dist</c> flag.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="process_flag" arity="2" clause_i="2"
anchor="process_flag_trap_exit" since=""/>
<fsummary>Set process flag trap_exit for the calling process.</fsummary>
<desc>
@@ -6293,7 +6355,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="2" since=""/>
+ <name name="process_flag" arity="2" clause_i="3" since=""/>
<fsummary>Set process flag error_handler for the calling process.
</fsummary>
<desc>
@@ -6307,7 +6369,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="3" since="OTP 24.0"/>
+ <name name="process_flag" arity="2" clause_i="4" since="OTP 24.0"/>
<fsummary>Set process flag fullsweep_after for the calling process.
</fsummary>
<desc>
@@ -6318,7 +6380,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="4"
+ <name name="process_flag" arity="2" clause_i="5"
anchor="process_flag_min_heap_size" since=""/>
<fsummary>Set process flag min_heap_size for the calling process.
</fsummary>
@@ -6329,7 +6391,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="5" since="OTP R13B04"/>
+ <name name="process_flag" arity="2" clause_i="6" since="OTP R13B04"/>
<fsummary>Set process flag min_bin_vheap_size for the calling process.
</fsummary>
<desc>
@@ -6340,7 +6402,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="6"
+ <name name="process_flag" arity="2" clause_i="7"
anchor="process_flag_max_heap_size" since="OTP 19.0"/>
<fsummary>Set process flag max_heap_size for the calling process.
</fsummary>
@@ -6419,7 +6481,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="7"
+ <name name="process_flag" arity="2" clause_i="8"
anchor="process_flag_message_queue_data" since="OTP 19.0"/>
<fsummary>Set process flag message_queue_data for the calling process.
</fsummary>
@@ -6461,7 +6523,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="8"
+ <name name="process_flag" arity="2" clause_i="9"
anchor="process_flag_priority" since=""/>
<fsummary>Set process flag priority for the calling process.</fsummary>
<type name="priority_level"/>
@@ -6534,7 +6596,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="9" since=""/>
+ <name name="process_flag" arity="2" clause_i="10" since=""/>
<fsummary>Set process flag save_calls for the calling process.</fsummary>
<desc>
<p><c><anno>N</anno></c> must be an integer in the interval 0..10000.
@@ -6565,7 +6627,7 @@ receive_replies(ReqId, N, Acc) ->
</func>
<func>
- <name name="process_flag" arity="2" clause_i="10" since=""/>
+ <name name="process_flag" arity="2" clause_i="11" since=""/>
<fsummary>Set process flag sensitive for the calling process.</fsummary>
<desc>
<p>Sets or clears flag <c>sensitive</c> for the current process.
@@ -6710,6 +6772,18 @@ receive_replies(ReqId, N, Acc) ->
<p>Valid <c><anno>InfoTuple</anno></c>s with corresponding
<c><anno>Item</anno></c>s:</p>
<taglist>
+ <tag>
+ <marker id="process_info_async_dist"/>
+ <c>{async_dist, Enabled}</c>
+ </tag>
+ <item>
+ <p>Since: OTP @OTP-18374@</p>
+ <p>
+ Current value of the
+ <seeerl marker="erlang#process_flag_async_dist">
+ <c>async_dist</c></seeerl> process flag.
+ </p>
+ </item>
<tag><c>{backtrace, <anno>Bin</anno>}</c></tag>
<item>
<p>Binary <c><anno>Bin</anno></c> contains the same information
@@ -7953,6 +8027,21 @@ true</pre>
<c>process_flag(message_queue_data,
<anno>MQD</anno>)</c></seeerl>.</p>
</item>
+ <tag>
+ <marker id="spawn_opt_async_dist"/>
+ <c>{async_dist, Enabled}</c>
+ </tag>
+ <item>
+ <p>Since: OTP @OTP-18374@</p>
+ <p>
+ Set the
+ <seeerl marker="erlang#process_flag_async_dist">
+ <c>async_dist</c></seeerl> process flag of the spawned process.
+ This option will override the default value set by the command
+ line argument
+ <seecom marker="erl#+pad"><c>+pad &lt;boolean&gt;</c></seecom>.
+ </p>
+ </item>
</taglist>
</desc>
</func>
@@ -10983,6 +11072,8 @@ Metadata = #{ pid => pid(),
</func>
<func>
+ <name name="system_info" arity="1" clause_i="79"
+ anchor="system_info_async_dist" since="OTP @OTP-18374@"/> <!-- async_dist -->
<name name="system_info" arity="1" clause_i="14"
anchor="system_info_dist" since=""/> <!-- creation -->
<name name="system_info" arity="1" clause_i="16" since="OTP 18.0"/> <!-- delayed_node_table_gc -->
@@ -10995,6 +11086,17 @@ Metadata = #{ pid => pid(),
<p>Returns information about Erlang Distribution in the
current system as specified by <c><anno>Item</anno></c>:</p>
<taglist>
+ <tag><marker id="system_info_async_dist"/><c>async_dist</c></tag>
+ <item>
+ <p>Since: OTP @OTP-18374@</p>
+ <p>
+ Returns the value of the command line argument
+ <seecom marker="erl#+pad">+pad &lt;boolean&gt;</seecom>
+ which the runtime system use. This value determines the default
+ <seeerl marker="erlang#process_flag_async_dist">
+ <c>async_dist</c></seeerl> value for newly spawned processes.
+ </p>
+ </item>
<tag><marker id="system_info_creation"/>
<c>creation</c></tag>
<item>
@@ -11133,7 +11235,7 @@ Metadata = #{ pid => pid(),
<!-- <name name="system_info" arity="1" clause_i="76"/> update_cpu_info -->
<name name="system_info" arity="1" clause_i="77" since=""/> <!-- version -->
<name name="system_info" arity="1" clause_i="78" since=""/> <!-- wordsize -->
- <!-- <name name="system_info" arity="1" clause_i="79"/> overview -->
+ <!-- <name name="system_info" arity="1" clause_i="80"/> overview -->
<!-- When adding any entry, make sure to update the overview clause_i -->
<fsummary>Information about the system.</fsummary>
<desc>
@@ -11501,7 +11603,7 @@ Metadata = #{ pid => pid(),
<c><anno>MonitorPid</anno></c>. <c>SusPid</c> is the pid
that got suspended when sending to <c>Port</c>.</p>
</item>
- <tag><c>busy_dist_port</c></tag>
+ <tag><c>busy_dist_port</c><marker id="busy_dist_port"/></tag>
<item>
<p>If a process in the system gets suspended because it
sends to a process on a remote node whose inter-node
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 502697a432..274a8e23d4 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -110,6 +110,7 @@ atom arg0
atom arity
atom asn1
atom async
+atom async_dist
atom asynchronous
atom atom
atom atom_used
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 24b411633f..8d27d0b080 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1780,7 +1780,21 @@ static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val)
BIF_RETTYPE process_flag_2(BIF_ALIST_2)
{
Eterm old_value;
- if (BIF_ARG_1 == am_error_handler) {
+
+ if (BIF_ARG_1 == am_async_dist) {
+ old_value = (BIF_P->flags & F_ASYNC_DIST) ? am_true : am_false;
+ if (BIF_ARG_2 == am_false) {
+ BIF_P->flags &= ~F_ASYNC_DIST;
+ }
+ else if (BIF_ARG_2 == am_true) {
+ BIF_P->flags |= F_ASYNC_DIST;
+ }
+ else {
+ goto error;
+ }
+ BIF_RET(old_value);
+ }
+ else if (BIF_ARG_1 == am_error_handler) {
if (is_not_atom(BIF_ARG_2)) {
goto error;
}
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index c35b8ef08b..6ac22dfab9 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -211,6 +211,37 @@ struct {
ErlHeapFragment *bp;
} nodedown;
+/*
+ * Dist entry queue flags are only modified while
+ * the dist entry queue lock is held...
+ */
+static ERTS_INLINE erts_aint32_t
+de_qflags_read(DistEntry *dep)
+{
+ return erts_atomic32_read_nob(&dep->qflgs);
+}
+
+static ERTS_INLINE erts_aint32_t
+de_qflags_read_set(DistEntry *dep, erts_aint32_t set)
+{
+ erts_aint32_t qflgs, new_qflgs;
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+ new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ new_qflgs |= set;
+ erts_atomic32_set_nob(&dep->qflgs, new_qflgs);
+ return qflgs;
+}
+
+static ERTS_INLINE erts_aint32_t
+de_qflags_read_unset(DistEntry *dep, erts_aint32_t unset)
+{
+ erts_aint32_t qflgs, new_qflgs;
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+ new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ new_qflgs &= ~unset;
+ erts_atomic32_set_nob(&dep->qflgs, new_qflgs);
+ return qflgs;
+}
static void
delete_cache(ErtsAtomCache *cache)
@@ -250,7 +281,7 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
erts_aint32_t qflgs;
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
- qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs = de_qflags_read_unset(dep, unset_qflgs);
qflgs &= ~unset_qflgs;
if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
@@ -1010,14 +1041,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
}
if (dep->state == ERTS_DE_STATE_EXITING) {
- ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
+ ASSERT(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT);
}
else {
ASSERT(dep->state == ERTS_DE_STATE_CONNECTED);
dep->state = ERTS_DE_STATE_EXITING;
erts_mtx_lock(&dep->qlock);
- ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
- erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT));
+ de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT);
erts_mtx_unlock(&dep->qlock);
}
@@ -1043,6 +1074,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
erts_mtx_unlock(&dep->qlock);
+
+ erts_atomic32_set_relb(&dep->notify, 0);
erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
@@ -1144,7 +1177,8 @@ void init_dist(void)
static ERTS_INLINE ErtsDistOutputBuf *
alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx,
- Uint data_size, Uint fragments, Uint vlen)
+ Uint data_size, Uint fragments, Uint vlen,
+ int ignore_busy)
{
int ix;
ErtsDistOutputBuf *obuf;
@@ -1175,6 +1209,7 @@ alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx,
erts_refc_add(&bin->intern.refc, fragments - 1, 1);
for (ix = 0; ix < fragments; ix++) {
+ obuf[ix].ignore_busy = ignore_busy;
obuf[ix].bin = bin;
obuf[ix].eiov = &ctx->fragment_eiovs[ix];
#ifdef DEBUG
@@ -1220,6 +1255,81 @@ size_obuf(ErtsDistOutputBuf *obuf)
return sz;
}
+static ERTS_INLINE void
+get_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+ Sint sz = size_obuf(obuf);
+ ASSERT(sz >= 0);
+ *size = sz;
+ *ignore_size = obuf->ignore_busy ? sz : 0;
+}
+
+static ERTS_INLINE void
+add_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+ Sint sz, isz;
+ get_obuf_sizes(obuf, &sz, &isz);
+ *size += sz;
+ *ignore_size += isz;
+}
+
+static ERTS_INLINE void
+subtract_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+ Sint sz, isz;
+ get_obuf_sizes(obuf, &sz, &isz);
+ *size -= sz;
+ *ignore_size -= isz;
+}
+
+static ERTS_INLINE void
+update_qsizes(DistEntry *dep, int *empty_fillp, Sint *qsizep,
+ Sint add_total_qsize, Sint ignore_qsize)
+{
+ /*
+ * All modifications of the 'total_qsize' and 'qsize' fields are
+ * made while holding the 'qlock', so read/modify/write of each
+ * field does not need to be atomic. Readers without the lock will
+ * still see consistent updates of each 'field'.
+ */
+ erts_aint_t qsize, add_qsize;
+
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+
+ if (empty_fillp)
+ *empty_fillp = 0;
+
+ if (add_total_qsize) {
+ qsize = erts_atomic_read_nob(&dep->total_qsize);
+ qsize += (erts_aint_t) add_total_qsize;
+ if (empty_fillp && qsize == add_total_qsize)
+ *empty_fillp = !0;
+ erts_atomic_set_nob(&dep->total_qsize, (erts_aint_t) qsize);
+ }
+
+ add_qsize = (erts_aint_t) (add_total_qsize - ignore_qsize);
+ if (add_qsize) {
+ qsize = erts_atomic_read_nob(&dep->qsize);
+ qsize += add_qsize;
+ if (qsizep)
+ *qsizep = qsize;
+ erts_atomic_set_nob(&dep->qsize, (erts_aint_t) qsize);
+ }
+ else if (qsizep) {
+ *qsizep = erts_atomic_read_nob(&dep->qsize);
+ }
+
+#ifdef DEBUG
+ {
+ erts_aint_t tqsize = erts_atomic_read_nob(&dep->total_qsize);
+ qsize = erts_atomic_read_nob(&dep->qsize);
+ ASSERT(tqsize >= 0);
+ ASSERT(qsize >= 0);
+ ASSERT(tqsize >= qsize);
+ }
+#endif
+}
+
static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
{
ErtsDistOutputBuf *obuf;
@@ -1250,21 +1360,19 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
{
- Sint obufsize = 0;
+ Sint obufsize = 0, ignore_obufsize = 0;
while (obuf) {
ErtsDistOutputBuf *fobuf;
fobuf = obuf;
obuf = obuf->next;
- obufsize += size_obuf(fobuf);
+ add_obuf_sizes(fobuf, &obufsize, &ignore_obufsize);
free_dist_obuf(fobuf, !0);
}
if (obufsize) {
erts_mtx_lock(&dep->qlock);
- ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
- erts_atomic_add_nob(&dep->qsize,
- (erts_aint_t) -obufsize);
+ update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
erts_mtx_unlock(&dep->qlock);
}
}
@@ -3031,11 +3139,17 @@ retry:
goto fail;
}
- if (no_suspend && proc) {
- if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
- res = ERTS_DSIG_PREP_WOULD_SUSPEND;
- goto fail;
- }
+ if (!proc || (proc->flags & F_ASYNC_DIST)) {
+ ctx->ignore_busy = !0;
+ }
+ else {
+ ctx->ignore_busy = 0;
+ if (no_suspend) {
+ if (de_qflags_read(dep) & ERTS_DE_QFLG_BUSY) {
+ res = ERTS_DSIG_PREP_WOULD_SUSPEND;
+ goto fail;
+ }
+ }
}
ctx->c_p = proc;
@@ -3209,7 +3323,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
+ ((ctx->fragments - 1)
* ERTS_DIST_FRAGMENT_HEADER_SIZE),
ctx->fragments,
- ctx->vlen);
+ ctx->vlen,
+ ctx->ignore_busy);
ctx->alloced_fragments = ctx->fragments;
/* Encode internal version of dist header */
ctx->dhdrp = ctx->extp;
@@ -3360,20 +3475,23 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
ctx->fragments = 0;
}
else {
- Sint qsize = erts_atomic_read_nob(&dep->qsize);
+ Sint qsize = (Sint) erts_atomic_read_nob(&dep->qsize);
erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
Eterm notify_proc = NIL;
Sint obsz;
- int fragments;
+ int fragments, empty_fill;
/* Calculate how many fragments to send. This depends on
the available space in the distr queue and the amount
of remaining reductions. */
for (fragments = 0, obsz = 0;
- fragments < ctx->fragments &&
- ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) ||
- ctx->no_trap || ctx->no_suspend);
+ (fragments < ctx->fragments
+ && ((ctx->reds > 0
+ && (ctx->ignore_busy
+ || (qsize + obsz < erts_dist_buf_busy_limit)))
+ || ctx->no_trap
+ || ctx->no_suspend));
fragments++) {
#ifdef DEBUG
int reds = 100;
@@ -3389,33 +3507,27 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
(!ctx->no_trap && !ctx->no_suspend));
erts_mtx_lock(&dep->qlock);
- qsize = erts_atomic_add_read_mb(&dep->qsize, (erts_aint_t) obsz);
- ASSERT(qsize >= obsz);
- qflgs = erts_atomic32_read_nob(&dep->qflgs);
- if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
- erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ update_qsizes(dep, &empty_fill, &qsize, obsz,
+ ctx->ignore_busy ? obsz : 0);
+ qflgs = de_qflags_read(dep);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)
+ && qsize >= erts_dist_buf_busy_limit) {
+ qflgs = de_qflags_read_set(dep, ERTS_DE_QFLG_BUSY);
qflgs |= ERTS_DE_QFLG_BUSY;
}
- if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
- /* Previously empty queue and info requested... */
- qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
- ~ERTS_DE_QFLG_REQ_INFO);
- if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ if (empty_fill && is_internal_pid(dep->cid)) {
+ erts_aint32_t notify;
+ notify = erts_atomic32_xchg_mb(&dep->notify,
+ (erts_aint32_t) 0);
+ if (notify) {
+ /*
+ * Previously empty queue and notification
+ * requested...
+ */
notify_proc = dep->cid;
ASSERT(is_internal_pid(notify_proc));
}
- /* else: requester will send itself the message... */
- qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
}
- if (!ctx->no_suspend && (qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_mtx_unlock(&dep->qlock);
-
- plp = erts_proclist_create(ctx->c_p);
-
- erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
- suspended = 1;
- erts_mtx_lock(&dep->qlock);
- }
ASSERT(fragments < 2
|| (get_int64(&((char*)ctx->obuf->eiov->iov[1].iov_base)[10])
@@ -3433,30 +3545,41 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
ctx->obuf = &ctx->obuf[fragments];
}
- if (!ctx->no_suspend) {
- qflgs = erts_atomic32_read_nob(&dep->qflgs);
- if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
- if (suspended)
- resume = 1; /* was busy when we started, but isn't now */
+ if ((qflgs & ERTS_DE_QFLG_BUSY)
+ && !ctx->ignore_busy
+ && !ctx->no_suspend) {
+
+ erts_mtx_unlock(&dep->qlock);
+
+ plp = erts_proclist_create(ctx->c_p);
+
+ erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ suspended = 1;
+
+ erts_mtx_lock(&dep->qlock);
+
+ qflgs = de_qflags_read(dep);
+ if (qflgs & ERTS_DE_QFLG_BUSY) {
+ /* Enqueue suspended process on dist entry */
+ ASSERT(plp);
+ erts_proclist_store_last(&dep->suspended, plp);
+ }
+ else {
+ resume = 1; /* was busy, but isn't now */
#ifdef USE_VM_PROBES
- if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
- DTRACE_CHARBUF(port_str, 64);
- DTRACE_CHARBUF(remote_str, 64);
-
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
- "%T", cid);
- erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", dep->sysname);
- DTRACE3(dist_port_not_busy, erts_this_node_sysname,
- port_str, remote_str);
- }
+ if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
+ DTRACE_CHARBUF(port_str, 64);
+ DTRACE_CHARBUF(remote_str, 64);
+
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
+ "%T", cid);
+ erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+ "%T", dep->sysname);
+ DTRACE3(dist_port_not_busy, erts_this_node_sysname,
+ port_str, remote_str);
+ }
#endif
- }
- else {
- /* Enqueue suspended process on dist entry */
- ASSERT(plp);
- erts_proclist_store_last(&dep->suspended, plp);
- }
+ }
}
erts_mtx_unlock(&dep->qlock);
@@ -3674,13 +3797,64 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
? ((Sint) 1) \
: ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__)))
+#ifndef DEBUG
+#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT)
+#else
+#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT) \
+ dbg_check_dist_qsize((DEP), (PRT))
+
+static void
+dbg_check_dist_qsize(DistEntry *dep, Port *prt)
+{
+ int ix;
+ Sint sz = 0, isz = 0, tqsz, qsz;
+ ErtsDistOutputBuf *qs[2];
+
+ ERTS_LC_ASSERT(dep && erts_lc_mtx_is_locked(&dep->qlock));
+ ASSERT(prt && erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT((erts_atomic32_read_nob(&prt->sched.flags)
+ & ERTS_PTS_FLG_EXIT)
+ || prt->common.id == dep->cid);
+
+ tqsz = erts_atomic_read_nob(&dep->total_qsize);
+ qsz = erts_atomic_read_nob(&dep->qsize);
+
+ ASSERT(tqsz >= 0);
+ ASSERT(qsz >= 0);
+ ASSERT(tqsz >= qsz);
+
+ qs[0] = dep->out_queue.first;
+ qs[1] = dep->finalized_out_queue.first;
+
+ for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) {
+ ErtsDistOutputBuf *obuf = qs[ix];
+ while (obuf) {
+ add_obuf_sizes(obuf, &sz, &isz);
+ obuf = obuf->next;
+ }
+ }
+
+ ASSERT(tqsz == sz);
+ ASSERT(qsz == sz - isz);
+}
+
+#endif
+
int
erts_dist_command(Port *prt, int initial_reds)
{
Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START;
enum dist_entry_state state;
Uint64 flags;
- Sint qsize, obufsize = 0;
+ /*
+ * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed
+ * from the queue which will be updated (in dep->total_qsize and
+ * dep->qsize) before we return from this function. Note that
+ * 'obufsize' and 'ignore_obufsize' may be negative if we added to the
+ * queue size. This may occur since finalization of a buffer may increase
+ * buffer size.
+ */
+ Sint qsize, obufsize = 0, ignore_obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
@@ -3716,6 +3890,7 @@ erts_dist_command(Port *prt, int initial_reds)
*/
erts_mtx_lock(&dep->qlock);
+ ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
@@ -3727,23 +3902,6 @@ erts_dist_command(Port *prt, int initial_reds)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
-#ifdef DEBUG
- {
- Uint sz = 0;
- ErtsDistOutputBuf *curr = oq.first;
- while (curr) {
- sz += size_obuf(curr);
- curr = curr->next;
- }
- curr = foq.first;
- while (curr) {
- sz += size_obuf(curr);
- curr = curr->next;
- }
- ASSERT(sz <= erts_atomic_read_nob(&dep->qsize));
- }
-#endif
-
sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (reds < 0)
@@ -3754,7 +3912,7 @@ erts_dist_command(Port *prt, int initial_reds)
do {
Uint size;
ErtsDistOutputBuf *fob;
- obufsize += size_obuf(foq.first);
+ add_obuf_sizes(foq.first, &obufsize, &ignore_obufsize);
size = (*send)(prt, foq.first);
erts_atomic64_inc_nob(&dep->out);
esdp->io.out += (Uint64) size;
@@ -3782,9 +3940,9 @@ erts_dist_command(Port *prt, int initial_reds)
ob = oq.first;
ASSERT(ob);
do {
- obufsize += size_obuf(ob);
+ add_obuf_sizes(ob, &obufsize, &ignore_obufsize);
reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds);
- obufsize -= size_obuf(ob);
+ subtract_obuf_sizes(ob, &obufsize, &ignore_obufsize);
if (reds < 0)
break; /* finalize needs to be restarted... */
last_finalized = ob;
@@ -3822,12 +3980,11 @@ erts_dist_command(Port *prt, int initial_reds)
int preempt = 0;
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
- Uint size, obsz;
- obufsize += size_obuf(oq.first);
+ Uint size;
+ add_obuf_sizes(oq.first, &obufsize, &ignore_obufsize);
reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds);
- obsz = size_obuf(oq.first);
- obufsize -= obsz;
if (reds < 0) { /* finalize needs to be restarted... */
+ subtract_obuf_sizes(oq.first, &obufsize, &ignore_obufsize);
preempt = 1;
break;
}
@@ -3836,7 +3993,6 @@ erts_dist_command(Port *prt, int initial_reds)
esdp->io.out += (Uint64) size;
reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
fob = oq.first;
- obufsize += obsz;
oq.first = oq.first->next;
free_dist_obuf(fob, !0);
sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
@@ -3867,13 +4023,12 @@ erts_dist_command(Port *prt, int initial_reds)
* processes.
*/
erts_mtx_lock(&dep->qlock);
- de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
- qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
- (erts_aint_t) -obufsize);
- ASSERT(qsize >= 0);
- obufsize = 0;
+ de_busy = !!(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY);
+ update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize);
+ obufsize = ignore_obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && de_busy && qsize < erts_dist_buf_busy_limit) {
+ && de_busy
+ && qsize < erts_dist_buf_busy_limit) {
int resumed;
ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
erts_mtx_unlock(&dep->qlock);
@@ -3889,17 +4044,7 @@ erts_dist_command(Port *prt, int initial_reds)
done:
- if (obufsize != 0) {
- erts_mtx_lock(&dep->qlock);
-#ifdef DEBUG
- qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
- (erts_aint_t) -obufsize);
- ASSERT(qsize >= 0);
-#else
- erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
-#endif
- erts_mtx_unlock(&dep->qlock);
- }
+ ASSERT(!ignore_obufsize || obufsize);
ASSERT(!!foq.first == !!foq.last);
ASSERT(!dep->finalized_out_queue.first);
@@ -3910,7 +4055,21 @@ erts_dist_command(Port *prt, int initial_reds)
dep->finalized_out_queue.last = foq.last;
}
- /* Avoid wrapping reduction counter... */
+ if (obufsize != 0) {
+ erts_mtx_lock(&dep->qlock);
+ update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+ ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
+ erts_mtx_unlock(&dep->qlock);
+ }
+#ifdef DEBUG
+ else {
+ erts_mtx_lock(&dep->qlock);
+ ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
+ erts_mtx_unlock(&dep->qlock);
+ }
+#endif
+
+ /* Avoid wrapping reduction counter... */
if (reds < INT_MIN/2)
reds = INT_MIN/2;
@@ -3940,7 +4099,7 @@ erts_dist_command(Port *prt, int initial_reds)
while (oq.first) {
ErtsDistOutputBuf *fob = oq.first;
oq.first = oq.first->next;
- obufsize += size_obuf(fob);
+ add_obuf_sizes(fob, &obufsize, &ignore_obufsize);
free_dist_obuf(fob, !0);
}
@@ -3949,14 +4108,15 @@ erts_dist_command(Port *prt, int initial_reds)
}
else {
if (oq.first) {
+ erts_mtx_lock(&dep->qlock);
+ update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+ obufsize = ignore_obufsize = 0;
+
/*
- * Unhandle buffers need to be put back first
+ * Unhandled buffers need to be put back first
* in out_queue.
*/
- erts_mtx_lock(&dep->qlock);
- erts_atomic_add_nob(&dep->qsize, -obufsize);
- obufsize = 0;
- oq.last->next = dep->out_queue.first;
+ oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
@@ -3972,7 +4132,7 @@ BIF_RETTYPE
dist_ctrl_get_data_notification_1(BIF_ALIST_1)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
- erts_aint32_t qflgs;
+ erts_aint32_t notify;
erts_aint_t qsize;
Eterm receiver = NIL;
Uint32 conn_id;
@@ -3985,7 +4145,7 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
/*
* Caller is the only one that can consume from this queue
- * and the only one that can set the req-info flag...
+ * and the only one that can set the notify field...
*/
erts_de_rlock(dep);
@@ -3997,23 +4157,21 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
ASSERT(dep->cid == BIF_P->common.id);
- qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ notify = erts_atomic32_read_nob(&dep->notify);
- if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ if (!notify) {
ERTS_THR_READ_MEMORY_BARRIER;
- qsize = erts_atomic_read_nob(&dep->qsize);
+ qsize = erts_atomic_read_nob(&dep->total_qsize);
ASSERT(qsize >= 0);
if (qsize > 0)
receiver = BIF_P->common.id; /* Notify ourselves... */
- else { /* Empty queue; set req-info flag... */
- qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
- ERTS_DE_QFLG_REQ_INFO);
- qsize = erts_atomic_read_nob(&dep->qsize);
+ else { /* Empty queue; set the notify field... */
+ notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) !0);
+ qsize = erts_atomic_read_nob(&dep->total_qsize);
ASSERT(qsize >= 0);
if (qsize > 0) {
- qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
- ~ERTS_DE_QFLG_REQ_INFO);
- if (qflgs & ERTS_DE_QFLG_REQ_INFO)
+ notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) 0);
+ if (notify)
receiver = BIF_P->common.id; /* Notify ourselves... */
/* else: someone else will notify us... */
}
@@ -4201,7 +4359,7 @@ dist_get_stat_1(BIF_ALIST_1)
}
read = (Sint64) erts_atomic64_read_nob(&dep->in);
write = (Sint64) erts_atomic64_read_nob(&dep->out);
- pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
+ pend = (Sint64) erts_atomic_read_nob(&dep->total_qsize);
erts_de_runlock(dep);
@@ -4256,10 +4414,19 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
- Sint reds = initial_reds, obufsize = 0, ix, vlen;
+ Sint reds = initial_reds, ix, vlen;
+ /*
+ * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed
+ * from the queue which will be updated (in dep->total_qsize and
+ * dep->qsize) before we return from this function. Note that
+ * 'obufsize' and 'ignore_obufsize' may be negative if we added to the
+ * queue size. This may occur since finalization of a buffer may increase
+ * buffer size.
+ */
+ Sint obufsize = 0, ignore_obufsize = 0;
ErtsDistOutputBuf *obuf;
Eterm *hp, res;
- erts_aint_t qsize;
+ Sint qsize;
Uint32 conn_id, get_size;
Uint hsz = 0, data_sz;
SysIOVec *iov;
@@ -4298,7 +4465,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
{
if (!dep->tmp_out_queue.first) {
ASSERT(!dep->tmp_out_queue.last);
- qsize = erts_atomic_read_acqb(&dep->qsize);
+ qsize = (Sint) erts_atomic_read_acqb(&dep->total_qsize);
if (qsize > 0) {
erts_mtx_lock(&dep->qlock);
dep->tmp_out_queue.first = dep->out_queue.first;
@@ -4317,13 +4484,16 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
}
obuf = dep->tmp_out_queue.first;
- obufsize += size_obuf(obuf);
+ add_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->dflags, reds);
- obufsize -= size_obuf(obuf);
+ subtract_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
if (reds < 0) { /* finalize needs to be restarted... */
erts_de_runlock(dep);
- if (obufsize)
- erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+ if (obufsize) {
+ erts_mtx_lock(&dep->qlock);
+ update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+ erts_mtx_unlock(&dep->qlock);
+ }
ERTS_BIF_YIELD1(BIF_TRAP_EXPORT(BIF_dist_ctrl_get_data_1),
BIF_P, BIF_ARG_1);
}
@@ -4404,16 +4574,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
hp += 2;
}
- obufsize += size_obuf(obuf);
+ add_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
- qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize);
+ erts_mtx_lock(&dep->qlock);
- ASSERT(qsize >= 0);
+ update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize);
- if (qsize < erts_dist_buf_busy_limit/2
- && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
+ if (qsize >= erts_dist_buf_busy_limit/2
+ || !(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY)) {
+ erts_mtx_unlock(&dep->qlock);
+ }
+ else {
ErtsProcList *resume_procs = NULL;
- erts_mtx_lock(&dep->qlock);
resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
erts_mtx_unlock(&dep->qlock);
if (resume_procs) {
@@ -4468,8 +4640,8 @@ static void kill_connection(DistEntry *dep)
dep->state = ERTS_DE_STATE_EXITING;
erts_mtx_lock(&dep->qlock);
- ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
- erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT));
+ de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT);
erts_mtx_unlock(&dep->qlock);
if (is_internal_port(dep->cid))
@@ -5069,7 +5241,6 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
Process *net_kernel)
{
Eterm notify_proc = NIL;
- erts_aint32_t qflgs;
ErtsProcLocks nk_locks;
int success = 0;
@@ -5105,17 +5276,18 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
erts_set_dist_entry_connected(dep, ctrlr, flags);
notify_proc = NIL;
- if (erts_atomic_read_nob(&dep->qsize)) {
+ if (erts_atomic_read_nob(&dep->total_qsize)) {
if (is_internal_port(dep->cid)) {
erts_schedule_dist_command(NULL, dep);
}
else {
+ erts_aint32_t notify;
ERTS_THR_READ_MEMORY_BARRIER;
- qflgs = erts_atomic32_read_nob(&dep->qflgs);
- if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
- qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
- ~ERTS_DE_QFLG_REQ_INFO);
- if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify = erts_atomic32_read_nob(&dep->notify);
+ if (notify) {
+ notify = erts_atomic32_xchg_mb(&dep->notify,
+ (erts_aint32_t) 0);
+ if (notify) {
notify_proc = dep->cid;
ASSERT(is_internal_pid(notify_proc));
}
@@ -5335,6 +5507,7 @@ Sint erts_abort_pending_connection_rwunlock(DistEntry* dep,
ASSERT(!dep->finalized_out_queue.first);
resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
erts_mtx_unlock(&dep->qlock);
+ erts_atomic32_set_relb(&dep->notify, 0);
erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 372cdfc1a7..a5365fcb99 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -344,6 +344,7 @@ typedef struct erts_dsig_send_context {
int connect;
int no_suspend;
int no_trap;
+ int ignore_busy;
Eterm ctl;
Eterm msg;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 35dfdd7983..b7642f2212 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -777,6 +777,7 @@ collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp, Sint reds)
#define ERTS_PI_IX_MAGIC_REF 34
#define ERTS_PI_IX_FULLSWEEP_AFTER 35
#define ERTS_PI_IX_PARENT 36
+#define ERTS_PI_IX_ASYNC_DIST 37
#define ERTS_PI_FLAG_SINGELTON (1 << 0)
#define ERTS_PI_FLAG_ALWAYS_WRAP (1 << 1)
@@ -833,7 +834,8 @@ static ErtsProcessInfoArgs pi_args[] = {
{am_garbage_collection_info, ERTS_PROCESS_GC_INFO_MAX_SIZE, 0, ERTS_PROC_LOCK_MAIN},
{am_magic_ref, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_fullsweep_after, 0, 0, ERTS_PROC_LOCK_MAIN},
- {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN}
+ {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN},
+ {am_async_dist, 0, 0, ERTS_PROC_LOCK_MAIN}
};
#define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(pi_args[0])))
@@ -954,6 +956,8 @@ pi_arg2ix(Eterm arg)
return ERTS_PI_IX_FULLSWEEP_AFTER;
case am_parent:
return ERTS_PI_IX_PARENT;
+ case am_async_dist:
+ return ERTS_PI_IX_ASYNC_DIST;
default:
return -1;
}
@@ -2129,6 +2133,10 @@ process_info_aux(Process *c_p,
}
break;
+ case ERTS_PI_IX_ASYNC_DIST:
+ res = (rp->flags & F_ASYNC_DIST) ? am_true : am_false;
+ break;
+
case ERTS_PI_IX_MAGIC_REF: {
Uint sz = 0;
(void) bld_magic_ref_bin_list(NULL, &sz, &MSO(rp));
@@ -2785,6 +2793,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
res = new_binary(BIF_P, (byte *) dsbufp->str, dsbufp->str_len);
erts_destroy_info_dsbuf(dsbufp);
BIF_RET(res);
+ } else if (am_async_dist == BIF_ARG_1) {
+ BIF_RET((erts_default_spo_flags & SPO_ASYNC_DIST)
+ ? am_true
+ : am_false);
} else if (ERTS_IS_ATOM_STR("dist_ctrl", BIF_ARG_1)) {
DistEntry *dep;
i = 0;
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index a4f6a1b723..577657bbb2 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -664,6 +664,7 @@ void erts_usage(void)
erts_fprintf(stderr, "\n");
erts_fprintf(stderr, "-pc <set> control what characters are considered printable (default latin1)\n");
+ erts_fprintf(stderr, "-pad bool set default process async data (default false)\n");
erts_fprintf(stderr, "-P number set maximum number of processes on this node;\n");
erts_fprintf(stderr, " valid range is [%d-%d]\n",
ERTS_MIN_PROCESSES, ERTS_MAX_PROCESSES);
@@ -1418,11 +1419,28 @@ erl_start(int argc, char **argv)
erts_usage();
}
erts_set_printable_characters(printable_chars);
- break;
- } else {
- erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]);
- erts_usage();
}
+ else {
+ char *sub_param = argv[i]+2;
+ if (has_prefix("ad", sub_param)) {
+ arg = get_arg(sub_param+2, argv[i+1], &i);
+ if (sys_strcmp("true", arg) == 0) {
+ erts_default_spo_flags |= SPO_ASYNC_DIST;
+ }
+ else if (sys_strcmp("false", arg) == 0) {
+ erts_default_spo_flags &= ~SPO_ASYNC_DIST;
+ }
+ else {
+ erts_fprintf(stderr, "bad async dist value %s\n", arg);
+ erts_usage();
+ }
+ }
+ else {
+ erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]);
+ erts_usage();
+ }
+ }
+ break;
case 'f':
if (!sys_strncmp(argv[i],"-fn",3)) {
int warning_type = ERL_FILENAME_WARNING_WARNING;
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 23a19064b1..2bec8ff20e 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -191,6 +191,8 @@ dist_table_alloc(void *dep_tmpl)
erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname,
ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
erts_atomic32_init_nob(&dep->qflgs, 0);
+ erts_atomic32_init_nob(&dep->notify, 0);
+ erts_atomic_init_nob(&dep->total_qsize, 0);
erts_atomic_init_nob(&dep->qsize, 0);
erts_atomic64_init_nob(&dep->in, 0);
erts_atomic64_init_nob(&dep->out, 0);
@@ -729,8 +731,6 @@ erts_set_dist_entry_pending(DistEntry *dep)
void
erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags)
{
- erts_aint32_t set_qflgs;
-
ASSERT(dep->mld);
ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
@@ -767,9 +767,6 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags)
erts_atomic64_set_nob(&dep->in, 0);
erts_atomic64_set_nob(&dep->out, 0);
- set_qflgs = (is_internal_port(cid) ?
- ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL);
- erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs);
if(flags & DFLAG_PUBLISHED) {
dep->next = erts_visible_dist_entries;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index f8c448de3c..9109c50e1d 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -77,15 +77,9 @@ enum dist_entry_state {
#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0)
#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1)
-#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2)
-#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3)
-#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4)
#define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \
- | ERTS_DE_QFLG_EXIT \
- | ERTS_DE_QFLG_REQ_INFO \
- | ERTS_DE_QFLG_PORT_CTRL \
- | ERTS_DE_QFLG_PROC_CTRL)
+ | ERTS_DE_QFLG_EXIT)
#if defined(ARCH_64)
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL)
@@ -105,6 +99,7 @@ struct ErtsDistOutputBuf_ {
* iov[2 ... vsize-1] data
*/
ErlIOVec *eiov;
+ int ignore_busy;
};
struct ErtsDistOutputBufsContainer_ {
@@ -153,7 +148,9 @@ struct dist_entry_ {
erts_mtx_t qlock; /* Protects qflgs and out_queue */
erts_atomic32_t qflgs;
- erts_atomic_t qsize;
+ erts_atomic32_t notify; /* User wants queue notification? */
+ erts_atomic_t qsize; /* Size of data in queue respecting busy dist */
+ erts_atomic_t total_qsize; /* Total size of data in queue */
erts_atomic64_t in;
erts_atomic64_t out;
ErtsDistOutputQueue out_queue;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 1105f6c52d..31740bae08 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12042,6 +12042,22 @@ erts_parse_spawn_opts(ErlSpawnOpts *sop, Eterm opts_list, Eterm *tag,
sop->priority = PRIORITY_LOW;
else
result = -1;
+ } else if (arg == am_async_dist) {
+ if (val == am_true) {
+ if (sop->flags & SPO_ASYNC_DIST)
+ sop->multi_set = !0;
+ else
+ sop->flags |= SPO_ASYNC_DIST;
+ }
+ else if (val == am_false) {
+ if (!(sop->flags & SPO_ASYNC_DIST))
+ sop->multi_set = !0;
+ else
+ sop->flags &= ~SPO_ASYNC_DIST;
+ }
+ else {
+ result = -1;
+ }
} else if (arg == am_message_queue_data) {
if (sop->flags & (SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ))
sop->multi_set = !0;
@@ -12279,6 +12295,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
/* Reserve place for continuation pointer, redzone, etc */
heap_need = arg_size + S_RESERVED;
+ if (so->flags & SPO_ASYNC_DIST)
+ flags |= F_ASYNC_DIST;
+
p->flags = flags;
p->sig_qs.flags = qs_flags;
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 3f1f937052..bd281a2f65 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -1413,8 +1413,9 @@ void erts_check_for_holes(Process* p);
#define SPO_IX_ASYNC 11
#define SPO_IX_NO_SMSG 12
#define SPO_IX_NO_EMSG 13
+#define SPO_IX_ASYNC_DIST 14
-#define SPO_NO_INDICES (SPO_IX_ASYNC+1)
+#define SPO_NO_INDICES (SPO_IX_ASYNC_DIST+1)
#define SPO_LINK (1 << SPO_IX_LINK)
#define SPO_MONITOR (1 << SPO_IX_MONITOR)
@@ -1430,8 +1431,9 @@ void erts_check_for_holes(Process* p);
#define SPO_ASYNC (1 << SPO_IX_ASYNC)
#define SPO_NO_SMSG (1 << SPO_IX_NO_SMSG)
#define SPO_NO_EMSG (1 << SPO_IX_NO_EMSG)
+#define SPO_ASYNC_DIST (1 << SPO_IX_ASYNC_DIST)
-#define SPO_MAX_FLAG SPO_NO_EMSG
+#define SPO_MAX_FLAG SPO_ASYNC_DIST
#define SPO_USE_ARGS \
(SPO_MIN_HEAP_SIZE \
@@ -1573,6 +1575,7 @@ extern int erts_system_profile_ts_type;
#define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */
#define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */
#define F_DIRTY_CHECK_CLA (1 << 25) /* Check if copy literal area GC scheduled */
+#define F_ASYNC_DIST (1 << 26) /* Truly asynchronous distribution */
/* Signal queue flags */
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl
index edab0be20b..c1a31c6abf 100644
--- a/erts/emulator/test/distribution_SUITE.erl
+++ b/erts/emulator/test/distribution_SUITE.erl
@@ -81,7 +81,10 @@
huge_iovec/1,
is_alive/1,
dyn_node_name_monitor_node/1,
- dyn_node_name_monitor/1]).
+ dyn_node_name_monitor/1,
+ async_dist_flag/1,
+ async_dist_port_dctrlr/1,
+ async_dist_proc_dctrlr/1]).
%% Internal exports.
-export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0,
@@ -114,7 +117,8 @@ all() ->
dist_entry_refc_race,
start_epmd_false, no_epmd, epmd_module, system_limit,
hopefull_data_encoding, hopefull_export_fun_bug,
- huge_iovec, is_alive, dyn_node_name_monitor_node, dyn_node_name_monitor].
+ huge_iovec, is_alive, dyn_node_name_monitor_node, dyn_node_name_monitor,
+ {group, async_dist}].
groups() ->
[{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]},
@@ -133,7 +137,11 @@ groups() ->
[message_latency_large_message,
message_latency_large_link_exit,
message_latency_large_monitor_exit,
- message_latency_large_exit2]}
+ message_latency_large_exit2]},
+ {async_dist, [],
+ [async_dist_flag,
+ async_dist_port_dctrlr,
+ async_dist_proc_dctrlr]}
].
init_per_suite(Config) ->
@@ -3320,6 +3328,272 @@ dyn_node_name_monitor_test(StartOpts, TestNode) ->
end,
ok.
+async_dist_flag(Config) when is_list(Config) ->
+ {ok, Peer1, Node1} = ?CT_PEER(),
+ async_dist_flag_test(Node1, false),
+ peer:stop(Peer1),
+ {ok, Peer2, Node2} = ?CT_PEER(["+pad", "false"]),
+ async_dist_flag_test(Node2, false),
+ peer:stop(Peer2),
+ {ok, Peer3, Node3} = ?CT_PEER(["+pad", "true", "+pad", "false"]),
+ async_dist_flag_test(Node3, false),
+ peer:stop(Peer3),
+
+ {ok, Peer4, Node4} = ?CT_PEER(["+pad", "true"]),
+ async_dist_flag_test(Node4, true),
+ peer:stop(Peer4),
+ {ok, Peer5, Node5} = ?CT_PEER(["+pad", "false", "+pad", "true"]),
+ async_dist_flag_test(Node5, true),
+ peer:stop(Peer5),
+
+ ok.
+
+async_dist_flag_test(Node, Default) when is_atom(Node), is_boolean(Default) ->
+ Tester = self(),
+ NotDefault = not Default,
+
+ Default = erpc:call(Node, erlang, system_info, [async_dist]),
+
+ {P1, M1} = spawn_opt(Node, fun () ->
+ receive after infinity -> ok end
+ end, [link, monitor]),
+ {P2, M2} = spawn_opt(Node, fun () ->
+ receive after infinity -> ok end
+ end, [link, monitor, {async_dist, false}]),
+ {P3, M3} = spawn_opt(Node, fun () ->
+ receive after infinity -> ok end
+ end, [link, monitor, {async_dist, true}]),
+ {async_dist, Default} = erpc:call(Node, erlang, process_info, [P1, async_dist]),
+ {async_dist, false} = erpc:call(Node, erlang, process_info, [P2, async_dist]),
+ {async_dist, true} = erpc:call(Node, erlang, process_info, [P3, async_dist]),
+
+ R4 = make_ref(),
+ {P4, M4} = spawn_opt(Node, fun () ->
+ Default = process_flag(async_dist, NotDefault),
+ Tester ! R4,
+ receive after infinity -> ok end
+ end, [link, monitor]),
+
+ R5 = make_ref(),
+ {P5, M5} = spawn_opt(Node, fun () ->
+ false = process_flag(async_dist, true),
+ Tester ! R5,
+ receive after infinity -> ok end
+ end, [link, monitor, {async_dist, false}]),
+ R6 = make_ref(),
+ {P6, M6} = spawn_opt(Node, fun () ->
+ true = process_flag(async_dist, false),
+ Tester ! R6,
+ receive after infinity -> ok end
+ end, [link, monitor, {async_dist, true}]),
+ receive R4 -> ok end,
+ {async_dist, NotDefault} = erpc:call(Node, erlang, process_info, [P4, async_dist]),
+ receive R5 -> ok end,
+ {async_dist, true} = erpc:call(Node, erlang, process_info, [P5, async_dist]),
+ receive R6 -> ok end,
+ {async_dist, false} = erpc:call(Node, erlang, process_info, [P6, async_dist]),
+
+
+ R7 = make_ref(),
+ {P7, M7} = spawn_opt(Node, fun () ->
+ Default = process_flag(async_dist, NotDefault),
+ NotDefault = process_flag(async_dist, NotDefault),
+ NotDefault = process_flag(async_dist, NotDefault),
+ NotDefault = process_flag(async_dist, Default),
+ Default = process_flag(async_dist, Default),
+ Default = process_flag(async_dist, Default),
+ Tester ! R7,
+ receive after infinity -> ok end
+ end, [link, monitor]),
+ receive R7 -> ok end,
+
+ unlink(P1),
+ exit(P1, bang),
+ unlink(P2),
+ exit(P2, bang),
+ unlink(P3),
+ exit(P3, bang),
+ unlink(P4),
+ exit(P4, bang),
+ unlink(P5),
+ exit(P5, bang),
+ unlink(P6),
+ exit(P6, bang),
+ unlink(P7),
+ exit(P7, bang),
+
+ receive {'DOWN', M1, process, P1, bang} -> ok end,
+ receive {'DOWN', M2, process, P2, bang} -> ok end,
+ receive {'DOWN', M3, process, P3, bang} -> ok end,
+ receive {'DOWN', M4, process, P4, bang} -> ok end,
+ receive {'DOWN', M5, process, P5, bang} -> ok end,
+ receive {'DOWN', M6, process, P6, bang} -> ok end,
+ receive {'DOWN', M7, process, P7, bang} -> ok end,
+
+ ok.
+
+async_dist_port_dctrlr(Config) when is_list(Config) ->
+ {ok, RecvPeer, RecvNode} = ?CT_PEER(),
+ ok = async_dist_test(RecvNode),
+ peer:stop(RecvPeer),
+ ok.
+
+async_dist_proc_dctrlr(Config) when is_list(Config) ->
+ {ok, SendPeer, SendNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]),
+ {ok, RecvPeer, RecvNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]),
+ {Pid, Mon} = spawn_monitor(SendNode,
+ fun () ->
+ ok = async_dist_test(RecvNode),
+ exit(test_success)
+ end),
+ receive
+ {'DOWN', Mon, process, Pid, Reason} ->
+ test_success = Reason
+ end,
+ peer:stop(SendPeer),
+ peer:stop(RecvPeer),
+ ok.
+
+async_dist_test(Node) ->
+ Scale = case round(test_server:timetrap_scale_factor()/3) of
+ S when S < 1 -> 1;
+ S -> S
+ end,
+ _ = process_flag(async_dist, false),
+ Tester = self(),
+ AliveReceiver1 = spawn_link(Node, fun () ->
+ register(alive_receiver_1, self()),
+ Tester ! {registered, self()},
+ receive after infinity -> ok end
+ end),
+ receive {registered, AliveReceiver1} -> ok end,
+ AliveReceiver2 = spawn(Node, fun () -> receive after infinity -> ok end end),
+ {AliveReceiver3, AR3Mon} = spawn_monitor(Node,
+ fun () ->
+ receive after infinity -> ok end
+ end),
+ {DeadReceiver, DRMon} = spawn_monitor(Node, fun () -> ok end),
+ receive
+ {'DOWN', DRMon, process, DeadReceiver, DRReason} ->
+ normal = DRReason
+ end,
+ Data = lists:duplicate($x, 256),
+ GoNuts = fun GN () ->
+ DeadReceiver ! hello,
+ GN()
+ end,
+ erpc:call(Node, erts_debug, set_internal_state, [available_internal_state, true]),
+ erpc:cast(Node, erts_debug, set_internal_state, [block, 4000*Scale]),
+ DistBufFiller = spawn_link(fun () ->
+ process_flag(async_dist, false),
+ receive go_nuts -> ok end,
+ GoNuts()
+ end),
+ BDMon = spawn_link(fun SysMon () ->
+ receive
+ {monitor, Pid, busy_dist_port, _} ->
+ Tester ! {busy_dist_port, Pid}
+ end,
+ SysMon()
+ end),
+ _ = erlang:system_monitor(BDMon, [busy_dist_port]),
+ DistBufFiller ! go_nuts,
+
+ %% Busy dist entry may release after it has triggered even
+ %% though noone is consuming anything at the receiving end.
+ %% Continue banging until we stop getting new busy_dist_port...
+ WaitFilled = fun WF (Tmo) ->
+ receive
+ {busy_dist_port, DistBufFiller} ->
+ WF(1000*Scale)
+ after
+ Tmo ->
+ ok
+ end
+ end,
+ WaitFilled(infinity),
+
+ BusyDistChecker = spawn_link(fun () ->
+ process_flag(async_dist, false),
+ DeadReceiver ! hello,
+ exit(unexpected_return_from_bang)
+ end),
+ receive {busy_dist_port, BusyDistChecker} -> ok end,
+ {async_dist, false} = process_info(self(), async_dist),
+ {async_dist, false} = process_info(BusyDistChecker, async_dist),
+ false = process_flag(async_dist, true),
+ {async_dist, true} = process_info(self(), async_dist),
+
+ Start = erlang:monotonic_time(millisecond),
+ M1 = erlang:monitor(process, AliveReceiver1),
+ true = is_reference(M1),
+ M2 = erlang:monitor(process, AliveReceiver2),
+ true = is_reference(M2),
+ {pid, Data} = AliveReceiver1 ! {pid, Data},
+ {reg_name, Data} = {alive_receiver_1, Node} ! {reg_name, Data},
+ true = erlang:demonitor(M1),
+ true = link(AliveReceiver2),
+ true = unlink(AliveReceiver1),
+ RId = spawn_request(Node, fun () -> receive bye -> ok end end, [link, monitor]),
+ true = is_reference(RId),
+ erlang:group_leader(self(), AliveReceiver2),
+ AR3XReason = make_ref(),
+ true = exit(AliveReceiver3, AR3XReason),
+ End = erlang:monotonic_time(millisecond),
+
+ %% These signals should have been buffered immediately. Make sure
+ %% it did not take a long time...
+ true = 500*Scale >= End - Start,
+
+ receive after 500*Scale -> ok end,
+
+ unlink(BusyDistChecker),
+ exit(BusyDistChecker, bang),
+ false = is_process_alive(BusyDistChecker),
+
+ unlink(DistBufFiller),
+ exit(DistBufFiller, bang),
+ false = is_process_alive(DistBufFiller),
+
+ %% Verify that the signals eventually get trough when the other
+ %% node continue to work...
+ {links, []}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver1, links]),
+ {links, [Tester]}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver2, links]),
+ {monitored_by, []}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver1, monitored_by]),
+ {monitored_by, [Tester]}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver2, monitored_by]),
+ {messages, [{pid, Data}, {reg_name, Data}]}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver1, messages]),
+ {group_leader, Tester}
+ = erpc:call(Node, erlang, process_info, [AliveReceiver2, group_leader]),
+
+ Spawned = receive
+ {spawn_reply, RId, SpawnRes, Pid} ->
+ ok = SpawnRes,
+ true = is_pid(Pid),
+ Pid
+ end,
+ {links, [Tester]}
+ = erpc:call(Node, erlang, process_info, [Spawned, links]),
+ {monitored_by, [Tester]}
+ = erpc:call(Node, erlang, process_info, [Spawned, monitored_by]),
+
+ receive
+ {'DOWN', AR3Mon, process, AliveReceiver3, ActualAR3XReason} ->
+ AR3XReason = ActualAR3XReason
+ end,
+
+ unlink(AliveReceiver2),
+ unlink(Spawned),
+
+ true = process_flag(async_dist, false),
+ {async_dist, false} = process_info(self(), async_dist),
+
+ ok.
+
%%% Utilities
wait_until(Fun) ->
diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c
index f8e7fb09d7..c9a662e70f 100644
--- a/erts/etc/common/erlexec.c
+++ b/erts/etc/common/erlexec.c
@@ -1006,8 +1006,10 @@ int main(int argc, char **argv)
}
break;
case 'p':
- if (argv[i][2] != 'c' || argv[i][3] != '\0')
+ if (!(argv[i][2] == 'c' && argv[i][3] == '\0')
+ && !(argv[i][2] == 'a' && argv[i][3] == 'd' && argv[i][4] == '\0')) {
goto the_default;
+ }
NEXT_ARG_CHECK();
argv[i][0] = '-';
add_Eargs(argv[i]);
diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam
index eb5b3a72e7..1a2a0de8c7 100644
--- a/erts/preloaded/ebin/erlang.beam
+++ b/erts/preloaded/ebin/erlang.beam
Binary files differ
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index b650eb70aa..6ac524528d 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -2444,7 +2444,10 @@ open_port(PortName, PortSettings) ->
-type message_queue_data() ::
off_heap | on_heap.
--spec process_flag(trap_exit, Boolean) -> OldBoolean when
+-spec process_flag(async_dist, Boolean) -> OldBoolean when
+ Boolean :: boolean(),
+ OldBoolean :: boolean();
+ (trap_exit, Boolean) -> OldBoolean when
Boolean :: boolean(),
OldBoolean :: boolean();
(error_handler, Module) -> OldModule when
@@ -2482,6 +2485,7 @@ process_flag(_Flag, _Value) ->
erlang:nif_error(undefined).
-type process_info_item() ::
+ async_dist |
backtrace |
binary |
catchlevel |
@@ -2518,6 +2522,7 @@ process_flag(_Flag, _Value) ->
trap_exit.
-type process_info_result_item() ::
+ {async_dist, Enabled :: boolean()} |
{backtrace, Bin :: binary()} |
{binary, BinInfo :: [{non_neg_integer(),
non_neg_integer(),
@@ -2969,6 +2974,7 @@ tuple_to_list(_Tuple) ->
(update_cpu_info) -> changed | unchanged;
(version) -> string();
(wordsize | {wordsize, internal} | {wordsize, external}) -> 4 | 8;
+ (async_dist) -> boolean();
(overview) -> boolean();
%% Deliberately left undocumented
(sequential_tracer) -> {sequential_tracer, pid() | port() | {module(),term()} | false}.
@@ -3098,7 +3104,8 @@ spawn_monitor(M, F, A) ->
| {min_heap_size, Size :: non_neg_integer()}
| {min_bin_vheap_size, VSize :: non_neg_integer()}
| {max_heap_size, Size :: max_heap_size()}
- | {message_queue_data, MQD :: message_queue_data()}.
+ | {message_queue_data, MQD :: message_queue_data()}
+ | {async_dist, Enabled :: boolean()}.
-spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when
Fun :: function(),