summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2019-10-24 14:32:50 +0200
committerRickard Green <rickard@erlang.org>2019-11-21 11:54:30 +0100
commit5e5579e881c586421ab781ec7cb039d4ea8696ef (patch)
treea98c706087a96a224396fa4149c4518cdeccad6e
parent6611181ae71422a1c66798718b37474641a090a9 (diff)
downloaderlang-5e5579e881c586421ab781ec7cb039d4ea8696ef.tar.gz
Fix race causing nodedown reason to be lost
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/bif.tab8
-rw-r--r--erts/emulator/beam/dist.c198
-rw-r--r--erts/emulator/beam/dist.h2
-rw-r--r--erts/emulator/beam/erl_lock_check.c2
-rw-r--r--erts/emulator/beam/erl_node_tables.c2
-rw-r--r--erts/emulator/beam/erl_process.h4
-rw-r--r--erts/emulator/beam/io.c20
-rw-r--r--erts/preloaded/ebin/erts_internal.beambin20156 -> 20176 bytes
-rw-r--r--erts/preloaded/src/erts.app.src2
-rw-r--r--erts/preloaded/src/erts_internal.erl6
-rw-r--r--lib/kernel/src/kernel.app.src2
-rw-r--r--lib/kernel/src/net_kernel.erl206
13 files changed, 316 insertions, 137 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 93ba56dccd..64e12f7a3b 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -212,6 +212,7 @@ atom discard
atom dist
atom dist_cmd
atom dist_ctrl_put_data
+atom dist_ctrlr
atom dist_data
atom Div='/'
atom div
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index c9f5177bd3..a5f46c4264 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -695,7 +695,6 @@ bif erlang:iolist_to_iovec/1
bif erts_internal:get_dflags/0
bif erts_internal:new_connection/1
-bif erts_internal:abort_connection/2
bif erts_internal:map_next/3
bif ets:whereis/1
bif erts_internal:gather_alloc_histograms/1
@@ -752,3 +751,10 @@ bif persistent_term:get/2
bif erts_internal:ets_lookup_binary_info/2
bif erts_internal:ets_raw_first/1
bif erts_internal:ets_raw_next/2
+
+#
+# New in 22.2
+#
+
+bif erts_internal:abort_pending_connection/2
+
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index eb9e749a08..4fe3de29d3 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -182,7 +182,8 @@ static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg);
static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
-static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
+static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id,
+ int *was_connected_p);
static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);
int erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root,
@@ -612,7 +613,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (no_pending) {
for (i = 0; i < no_pending; i++) {
- abort_connection(pending[i], pending[i]->connection_id);
+ abort_pending_connection(pending[i], pending[i]->connection_id, NULL);
erts_deref_dist_entry(pending[i]);
}
erts_free(ERTS_ALC_T_TMP, pending);
@@ -3761,8 +3762,9 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */
BIF_RETTYPE setnode_2(BIF_ALIST_2)
{
- Process *net_kernel;
+ Process *net_kernel = NULL;
Uint creation;
+ int success;
/* valid creation ? */
if(!term_to_Uint(BIF_ARG_2, &creation))
@@ -3787,21 +3789,11 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
net_kernel = erts_whereis_process(BIF_P,
ERTS_PROC_LOCK_MAIN,
am_net_kernel,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
- 0);
- if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel))
+ 0,
+ ERTS_P2P_FLG_INC_REFC);
+ if (!net_kernel)
goto error;
- /* By setting F_DISTRIBUTION on net_kernel,
- * erts_do_net_exits will be called when net_kernel is terminated !! */
- net_kernel->flags |= F_DISTRIBUTION;
-
- erts_proc_unlock(net_kernel,
- (ERTS_PROC_LOCK_STATUS
- | ((net_kernel != BIF_P)
- ? ERTS_PROC_LOCK_MAIN
- : 0)));
-
#ifdef DEBUG
erts_rwmtx_rlock(&erts_dist_table_rwmtx);
ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries);
@@ -3809,25 +3801,44 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
#endif
erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
erts_thr_progress_block();
- inc_no_nodes();
- erts_set_this_node(BIF_ARG_1, (Uint32) creation);
- erts_is_alive = 1;
- send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
- erts_thr_progress_unblock();
- erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
- /*
- * Note erts_this_dist_entry is changed by erts_set_this_node(),
- * so we *need* to use the new one after erts_set_this_node()
- * is called.
- */
- erts_ref_dist_entry(erts_this_dist_entry);
- ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);
+ success = (!ERTS_PROC_IS_EXITING(net_kernel)
+ & !ERTS_PROC_GET_DIST_ENTRY(net_kernel));
+ if (success) {
+ inc_no_nodes();
+ erts_set_this_node(BIF_ARG_1, (Uint32) creation);
+ erts_is_alive = 1;
+ send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
+ erts_proc_lock(net_kernel, ERTS_PROC_LOCKS_ALL);
+
+ /* By setting F_DISTRIBUTION on net_kernel,
+ * erts_do_net_exits will be called when net_kernel is terminated !! */
+ net_kernel->flags |= F_DISTRIBUTION;
+
+ /*
+ * Note erts_this_dist_entry is changed by erts_set_this_node(),
+ * so we *need* to use the new one after erts_set_this_node()
+ * is called.
+ */
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);
+ erts_proc_unlock(net_kernel, ERTS_PROC_LOCKS_ALL);
+ }
+
+ erts_thr_progress_unblock();
- BIF_RET(am_true);
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ if (success) {
+ erts_proc_dec_refc(net_kernel);
+ BIF_RET(am_true);
+ }
error:
+ if (net_kernel)
+ erts_proc_dec_refc(net_kernel);
BIF_ERROR(BIF_P, BADARG);
}
@@ -3840,12 +3851,15 @@ typedef struct {
DistEntry *dep;
Uint flags;
Uint version;
+ Eterm setup_pid;
+ Process *net_kernel;
} ErtsSetupConnDistCtrl;
-static void
+static int
setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
Eterm ctrlr, Uint flags,
- Uint version);
+ Uint version, Eterm setup_pid,
+ Process *net_kernel);
static Eterm
setup_connection_distctrl(Process *c_p, void *arg,
@@ -3860,7 +3874,20 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
DistEntry *dep = NULL;
int de_locked = 0;
Port *pp = NULL;
+ int true_nk;
+ Process *net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN,
+ am_net_kernel,
+ ERTS_PROC_LOCK_STATUS,
+ ERTS_P2P_FLG_INC_REFC);
+
+ if (!net_kernel)
+ goto badarg;
+ true_nk = ERTS_PROC_GET_DIST_ENTRY(net_kernel) == erts_this_dist_entry;
+ erts_proc_unlock(net_kernel, ERTS_PROC_LOCK_STATUS);
+ if (!true_nk)
+ goto badarg;
+
/*
* Check and pick out arguments
*/
@@ -3922,8 +3949,12 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
scdc.dep = dep;
scdc.flags = flags;
scdc.version = version;
+ scdc.setup_pid = BIF_P->common.id;
+ scdc.net_kernel = net_kernel;
res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
+ /* Dec of refc on net_kernel by setup_connection_distctrl() */
+ net_kernel = NULL;
BUMP_REDS(BIF_P, 5);
dep = NULL;
@@ -3942,6 +3973,8 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
scdcp->dep = dep;
scdcp->flags = flags;
scdcp->version = version;
+ scdcp->setup_pid = BIF_P->common.id;
+ scdcp->net_kernel = net_kernel;
res = erts_proc_sig_send_rpc_request(BIF_P,
BIF_ARG_2,
@@ -3949,8 +3982,11 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
setup_connection_distctrl,
(void *) scdcp);
if (is_non_value(res))
- goto badarg;
+ goto badarg; /* Was not able to send signal... */
+ /* Dec of refc on net_kernel by setup_connection_distctrl() */
+ net_kernel = NULL;
+
dep = NULL;
ASSERT(is_internal_ordinary_ref(res));
@@ -3961,6 +3997,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
}
else {
Uint32 conn_id;
+ int set_res;
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
@@ -4005,7 +4042,13 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
}
conn_id = dep->connection_id;
- setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ set_res = setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags,
+ version, BIF_P->common.id,
+ net_kernel);
+ /* Dec of refc on net_kernel by setup_connection_epiloge_rwunlock() */
+ net_kernel = NULL;
+ if (set_res == 0)
+ goto badarg;
de_locked = 0;
hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE);
@@ -4022,6 +4065,9 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
done:
+ if (net_kernel)
+ erts_proc_dec_refc(net_kernel);
+
if (dep && dep != erts_this_dist_entry) {
if (de_locked) {
if (de_locked > 0)
@@ -4046,14 +4092,38 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
goto done;
}
-static void
+static int
setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
Eterm ctrlr, Uint flags,
- Uint version)
+ Uint version, Eterm setup_pid,
+ Process *net_kernel)
{
Eterm notify_proc = NIL;
erts_aint32_t qflgs;
-
+ ErtsProcLocks nk_locks;
+ int success = 0;
+
+ /* Notify net_kernel about the new dist controller... */
+ ASSERT(net_kernel);
+ nk_locks = ERTS_PROC_LOCK_MSGQ;
+ erts_proc_lock(net_kernel, nk_locks);
+ if (!ERTS_PROC_IS_EXITING(net_kernel)
+ && ERTS_PROC_GET_DIST_ENTRY(net_kernel) == erts_this_dist_entry) {
+ Eterm *hp;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp = erts_alloc_message_heap(net_kernel, &nk_locks,
+ 5 /* 4-tuple */,
+ &hp, &ohp);
+ Eterm msg = TUPLE4(hp, am_dist_ctrlr, ctrlr, dep->sysname, setup_pid);
+ erts_queue_message(net_kernel, nk_locks, mp, msg, am_system);
+ success = !0;
+ }
+ erts_proc_unlock(net_kernel, nk_locks);
+ erts_proc_dec_refc(net_kernel);
+
+ if (!success)
+ return 0;
+
dep->version = version;
dep->creation = 0;
@@ -4095,6 +4165,8 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
dep->sysname,
flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
NIL);
+
+ return !0;
}
static Eterm
@@ -4105,11 +4177,13 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
int dep_locked = 0;
Eterm *hp;
Uint32 conn_id;
+ int dec_net_kernel_on_error = !0;
if (redsp)
*redsp = 1;
- ASSERT(!ERTS_PROC_IS_EXITING(c_p));
+ if (ERTS_PROC_IS_EXITING(c_p))
+ goto badarg;
erts_de_rwlock(dep);
dep_locked = !0;
@@ -4122,16 +4196,30 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
if (is_not_nil(dep->cid))
goto badarg;
+ if (ERTS_PROC_GET_DIST_ENTRY(c_p))
+ goto badarg;
+
+ erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
c_p->flags |= F_DISTRIBUTION;
ERTS_PROC_SET_DIST_ENTRY(c_p, dep);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
dep->send = NULL; /* Only for distr ports... */
if (redsp)
*redsp = 5;
- setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
- scdcp->flags, scdcp->version);
+ if (!setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
+ scdcp->flags, scdcp->version,
+ scdcp->setup_pid,
+ scdcp->net_kernel)) {
+ erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
+ c_p->flags &= ~F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(c_p, NULL);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
+ dec_net_kernel_on_error = 0; /* dec:ed in epilog... */
+ goto badarg;
+ }
/* we take over previous inc in refc of dep */
@@ -4146,6 +4234,9 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
badarg:
+ if (dec_net_kernel_on_error)
+ erts_proc_dec_refc(scdcp->net_kernel);
+
if (bpp) /* not called directly */
erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
@@ -4202,14 +4293,15 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
BIF_RET(dhandle);
}
-Sint erts_abort_connection_rwunlock(DistEntry* dep)
+Sint erts_abort_pending_connection_rwunlock(DistEntry* dep,
+ int *was_connected_p)
{
ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
- if (dep->state == ERTS_DE_STATE_CONNECTED) {
- kill_connection(dep);
- }
- else if (dep->state == ERTS_DE_STATE_PENDING) {
+ if (was_connected_p)
+ *was_connected_p = dep->state == ERTS_DE_STATE_CONNECTED;
+
+ if (dep->state == ERTS_DE_STATE_PENDING) {
ErtsAtomCache *cache;
ErtsDistOutputBuf *obuf;
ErtsProcList *resume_procs;
@@ -4254,20 +4346,24 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep)
return 0;
}
-static Sint abort_connection(DistEntry *dep, Uint32 conn_id)
+static Sint abort_pending_connection(DistEntry *dep, Uint32 conn_id,
+ int *was_connected_p)
{
erts_de_rwlock(dep);
if (dep->connection_id == conn_id)
- return erts_abort_connection_rwunlock(dep);
+ return erts_abort_pending_connection_rwunlock(dep, was_connected_p);
erts_de_rwunlock(dep);
+ if (was_connected_p)
+ *was_connected_p = 0;
return 0;
}
-BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
+BIF_RETTYPE erts_internal_abort_pending_connection_2(BIF_ALIST_2)
{
DistEntry* dep;
Uint32 conn_id;
Sint reds;
+ int was_connected;
if (is_not_atom(BIF_ARG_1))
BIF_ERROR(BIF_P, BADARG);
@@ -4277,9 +4373,9 @@ BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
}
- reds = abort_connection(dep, conn_id);
+ reds = abort_pending_connection(dep, conn_id, &was_connected);
BUMP_REDS(BIF_P, reds);
- BIF_RET(am_true);
+ BIF_RET(was_connected ? am_false : am_true);
}
int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
@@ -4304,7 +4400,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
net_kernel = erts_whereis_process(proc, proc_locks,
am_net_kernel, nk_locks, 0);
if (!net_kernel) {
- abort_connection(dep, conn_id);
+ abort_pending_connection(dep, conn_id, NULL);
return 0;
}
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index a33fb7efcf..d31269bd08 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -316,7 +316,7 @@ extern void erts_kill_dist_connection(DistEntry *dep, Uint32);
extern Uint erts_dist_cache_size(void);
-extern Sint erts_abort_connection_rwunlock(DistEntry *dep);
+extern Sint erts_abort_pending_connection_rwunlock(DistEntry *dep, int *);
extern void erts_debug_dist_seq_tree_foreach(
DistEntry *dep,
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 3aab4828cc..ba61f721b4 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -96,9 +96,9 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "erl_db_catree_route_node", "index" },
{ "resource_monitors", "address" },
{ "driver_list", NULL },
+ { "dist_entry", "address" },
{ "proc_msgq", "pid" },
{ "proc_btm", "pid" },
- { "dist_entry", "address" },
{ "dist_entry_links", "address" },
{ "update_persistent_term_permission", NULL },
{ "persistent_term_delete_permission", NULL },
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 8863e219e2..8eed7fc354 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -493,7 +493,7 @@ static void try_delete_dist_entry(DistEntry* dep)
if (dep->state != ERTS_DE_STATE_PENDING)
ERTS_INTERNAL_ERROR("Garbage collecting connected distribution entry");
- erts_abort_connection_rwunlock(dep);
+ erts_abort_pending_connection_rwunlock(dep, NULL);
report_gc_active_dist_entry(sysname, state);
}
else
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index a0aec06208..91fe290f70 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -858,8 +858,8 @@ typedef struct {
#define ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS ERTS_PROC_LOCK_MAIN
#define ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS ERTS_PROC_LOCK_MAIN
-#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN
-#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN
+#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_LC_PSD_ANY_LOCK
+#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCKS_ALL
#define ERTS_PSD_PENDING_SUSPEND_GET_LOCKS ERTS_PROC_LOCK_MAIN
#define ERTS_PSD_PENDING_SUSPEND_SET_LOCKS ERTS_PROC_LOCK_MAIN
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 382b1cff77..20a155e1d8 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -3773,6 +3773,16 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
pectxt.port_id = prt->common.id;
pectxt.reason = modified_reason;
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
+ erts_do_net_exits(dep, modified_reason);
+ erts_deref_dist_entry(dep);
+ erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL);
+ erts_atomic32_read_band_relb(&prt->state,
+ ~ERTS_PORT_SFLG_DISTRIBUTION);
+ }
+
if (links)
erts_monitor_tree_foreach_delete(&links,
link_port_exit,
@@ -3790,16 +3800,6 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
(void *) &pectxt);
DRV_MONITOR_UNLOCK_PDL(prt);
}
-
- if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
- DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
- ASSERT(dep);
- erts_do_net_exits(dep, modified_reason);
- erts_deref_dist_entry(dep);
- erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL);
- erts_atomic32_read_band_relb(&prt->state,
- ~ERTS_PORT_SFLG_DISTRIBUTION);
- }
if ((reason != am_kill) && !is_port_ioq_empty(prt)) {
/* must turn exiting flag off */
diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam
index d5f6c16f09..8d00280daa 100644
--- a/erts/preloaded/ebin/erts_internal.beam
+++ b/erts/preloaded/ebin/erts_internal.beam
Binary files differ
diff --git a/erts/preloaded/src/erts.app.src b/erts/preloaded/src/erts.app.src
index 132397b478..6c265f6e60 100644
--- a/erts/preloaded/src/erts.app.src
+++ b/erts/preloaded/src/erts.app.src
@@ -41,7 +41,7 @@
{registered, []},
{applications, []},
{env, []},
- {runtime_dependencies, ["stdlib-3.5", "kernel-6.1", "sasl-3.3"]}
+ {runtime_dependencies, ["stdlib-3.5", "kernel-@OTP-16216@", "sasl-3.3"]}
]}.
%% vim: ft=erlang
diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl
index 33363a3c82..dd2410f5ce 100644
--- a/erts/preloaded/src/erts_internal.erl
+++ b/erts/preloaded/src/erts_internal.erl
@@ -67,7 +67,7 @@
-export([get_dflags/0]).
-export([new_connection/1]).
--export([abort_connection/2]).
+-export([abort_pending_connection/2]).
-export([scheduler_wall_time/1, system_flag_scheduler_wall_time/1,
gather_sched_wall_time_result/1,
@@ -558,10 +558,10 @@ get_dflags() ->
new_connection(_Node) ->
erlang:nif_error(undefined).
--spec erts_internal:abort_connection(Node, ConnId) -> boolean() when
+-spec erts_internal:abort_pending_connection(Node, ConnId) -> boolean() when
Node :: atom(),
ConnId :: {integer(), erlang:dist_handle()}.
-abort_connection(_Node, _ConnId) ->
+abort_pending_connection(_Node, _ConnId) ->
erlang:nif_error(undefined).
%% Scheduler wall time
diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src
index 4f5e6d782f..1fdb390a32 100644
--- a/lib/kernel/src/kernel.app.src
+++ b/lib/kernel/src/kernel.app.src
@@ -149,6 +149,6 @@
{logger_sasl_compatible, false}
]},
{mod, {kernel, []}},
- {runtime_dependencies, ["erts-10.2.5", "stdlib-3.5", "sasl-3.0"]}
+ {runtime_dependencies, ["erts-@OTP-16216@", "stdlib-3.5", "sasl-3.0"]}
]
}.
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index e333bcd307..677db1e33e 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -103,6 +103,7 @@
connecttime, %% the connection setuptime.
connections, %% table of connections
conn_owners = [], %% List of connection owner pids,
+ dist_ctrlrs = #{}, %% Map of dist controllers (local ports or pids),
pend_owners = [], %% List of potential owners
listen, %% list of #listen
allowed, %% list of allowed nodes in a restricted system
@@ -125,6 +126,7 @@
conn_id, %% Connection identity
state, %% pending | up | up_pending
owner, %% owner pid
+ ctrlr, %% Controller port or pid
pending_owner, %% possible new owner
address, %% #net_address
waiting = [], %% queued processes
@@ -356,7 +358,7 @@ do_auto_connect_1(Node, ConnId, From, State) ->
spawn(?MODULE,passive_connect_monitor,[From,Node]),
{noreply, State};
_ ->
- erts_internal:abort_connection(Node, ConnId),
+ erts_internal:abort_pending_connection(Node, ConnId),
{reply, false, State}
end;
@@ -389,7 +391,7 @@ do_auto_connect_2(Node, ConnId, From, State, ConnLookup) ->
case application:get_env(kernel, dist_auto_connect) of
{ok, never} ->
?connect_failure(Node,{dist_auto_connect,never}),
- erts_internal:abort_connection(Node, ConnId),
+ erts_internal:abort_pending_connection(Node, ConnId),
{reply, false, State};
%% This might happen due to connection close
@@ -399,7 +401,6 @@ do_auto_connect_2(Node, ConnId, From, State, ConnLookup) ->
(hd(ConnLookup))#connection.state =:= up ->
?connect_failure(Node,{barred_connection,
ets:lookup(sys_dist, Node)}),
- erts_internal:abort_connection(Node, ConnId),
{reply, false, State};
_ ->
case setup(Node, ConnId, normal, From, State) of
@@ -408,7 +409,7 @@ do_auto_connect_2(Node, ConnId, From, State, ConnLookup) ->
{noreply,State#state{conn_owners=Owners}};
_Error ->
?connect_failure(Node, {setup_call, failed, _Error}),
- erts_internal:abort_connection(Node, ConnId),
+ erts_internal:abort_pending_connection(Node, ConnId),
{reply, false, State}
end
end
@@ -468,7 +469,7 @@ handle_call({connect, Type, Node}, From, State) ->
{noreply, _S} -> %% connection pending
ok;
{reply, false, _S} -> %% connection refused
- erts_internal:abort_connection(Node, ConnId)
+ erts_internal:abort_pending_connection(Node, ConnId)
end,
R1
catch
@@ -715,20 +716,39 @@ handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
end;
%%
+%% New dist controller has been registered
+%%
+handle_info({dist_ctrlr, Ctrlr, Node, SetupPid} = Msg,
+ #state{dist_ctrlrs = DistCtrlrs} = State) ->
+ case ets:lookup(sys_dist, Node) of
+ [Conn] when (Conn#connection.state =:= pending)
+ andalso (Conn#connection.owner =:= SetupPid)
+ andalso (Conn#connection.ctrlr =:= undefined)
+ andalso (is_port(Ctrlr) orelse is_pid(Ctrlr))
+ andalso (node(Ctrlr) == node()) ->
+ link(Ctrlr),
+ ets:insert(sys_dist, Conn#connection{ctrlr = Ctrlr}),
+ {noreply, State#state{dist_ctrlrs = DistCtrlrs#{Ctrlr => Node}}};
+ _ ->
+ error_msg("Net kernel got ~tw~n",[Msg]),
+ {noreply, State}
+ end;
+
+%%
%% A node has successfully been connected.
%%
-handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
- State) ->
+handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, State) ->
case {Immediate, ets:lookup(sys_dist, Node)} of
- {true, [Conn]} when Conn#connection.state =:= pending,
- Conn#connection.owner =:= SetupPid ->
- ets:insert(sys_dist, Conn#connection{state = up,
- address = Address,
- waiting = [],
- type = Type}),
- SetupPid ! {self(), inserted},
- reply_waiting(Node,Conn#connection.waiting, true),
- {noreply, State};
+ {true, [Conn]} when (Conn#connection.state =:= pending)
+ andalso (Conn#connection.owner =:= SetupPid)
+ andalso (Conn#connection.ctrlr /= undefined) ->
+ ets:insert(sys_dist, Conn#connection{state = up,
+ address = Address,
+ waiting = [],
+ type = Type}),
+ SetupPid ! {self(), inserted},
+ reply_waiting(Node,Conn#connection.waiting, true),
+ {noreply, State};
_ ->
SetupPid ! {self(), bad_request},
{noreply, State}
@@ -803,7 +823,7 @@ handle_info({SetupPid, {is_pending, Node}}, State) ->
%%
%% Handle different types of process terminations.
%%
-handle_info({'EXIT', From, Reason}, State) when is_pid(From) ->
+handle_info({'EXIT', From, Reason}, State) ->
verbose({'EXIT', From, Reason}, 1, State),
handle_exit(From, Reason, State);
@@ -873,6 +893,7 @@ do_handle_exit(Pid, Reason, State) ->
listen_exit(Pid, State),
accept_exit(Pid, State),
conn_own_exit(Pid, Reason, State),
+ dist_ctrlr_exit(Pid, Reason, State),
pending_own_exit(Pid, State),
ticker_exit(Pid, State),
{noreply,State}.
@@ -909,6 +930,12 @@ conn_own_exit(Pid, Reason, State) ->
false
end.
+dist_ctrlr_exit(Pid, Reason, #state{dist_ctrlrs = DCs} = State) ->
+ case maps:get(Pid, DCs, undefined) of
+ undefined -> false;
+ Node -> throw({noreply, nodedown(Pid, Node, Reason, State)})
+ end.
+
pending_own_exit(Pid, State) ->
Pend = State#state.pend_owners,
case lists:keysearch(Pid, 1, Pend) of
@@ -945,10 +972,10 @@ ticker_exit(_, _) ->
%% nodedown(Owner, Node, Reason, State) -> State'
%% -----------------------------------------------------------
-nodedown(Owner, Node, Reason, State) ->
+nodedown(Exited, Node, Reason, State) ->
case get_conn(Node) of
{ok, Conn} ->
- nodedown(Conn, Owner, Node, Reason, Conn#connection.type, State);
+ nodedown(Conn, Exited, Node, Reason, Conn#connection.type, State);
_ ->
State
end.
@@ -959,58 +986,117 @@ get_conn(Node) ->
_ -> error
end.
-nodedown(Conn, Owner, Node, Reason, Type, OldState) ->
- Owners = lists:keydelete(Owner, 1, OldState#state.conn_owners),
- State = OldState#state{conn_owners = Owners},
+delete_owner(Owner, #state{conn_owners = Owners} = State) ->
+ State#state{conn_owners = lists:keydelete(Owner, 1, Owners)}.
+
+delete_ctrlr(Ctrlr, #state{dist_ctrlrs = DCs} = State) ->
+ State#state{dist_ctrlrs = maps:remove(Ctrlr, DCs)}.
+
+nodedown(Conn, Exited, Node, Reason, Type, State) ->
case Conn#connection.state of
- pending when Conn#connection.owner =:= Owner ->
- pending_nodedown(Conn, Node, Type, State);
- up when Conn#connection.owner =:= Owner ->
- up_nodedown(Conn, Node, Reason, Type, State);
- up_pending when Conn#connection.owner =:= Owner ->
- up_pending_nodedown(Conn, Node, Reason, Type, State);
+ pending ->
+ pending_nodedown(Conn, Exited, Node, Type, State);
+ up ->
+ up_nodedown(Conn, Exited, Node, Reason, Type, State);
+ up_pending ->
+ up_pending_nodedown(Conn, Exited, Node, Reason, Type, State);
_ ->
- OldState
+ State
end.
-pending_nodedown(Conn, Node, Type, State) ->
- % Don't bar connections that have never been alive
- %mark_sys_dist_nodedown(Node),
- % - instead just delete the node:
- erts_internal:abort_connection(Node, Conn#connection.conn_id),
- ets:delete(sys_dist, Node),
- reply_waiting(Node,Conn#connection.waiting, false),
+pending_nodedown(#connection{owner = Owner,
+ waiting = Waiting,
+ conn_id = CID},
+ Exited, Node, Type, State) when Owner =:= Exited ->
+ %% Owner exited!
+ case erts_internal:abort_pending_connection(Node, CID) of
+ false ->
+ %% Just got connected but that message has not
+ %% reached us yet. Wait for controller to exit and
+ %% handle this then...
+ ok;
+ true ->
+ %% Don't bar connections that have never been alive, i.e.
+ %% no 'mark_sys_dist_nodedown(Node)'; instead just delete
+ %% the node:
+ ets:delete(sys_dist, Node),
+ reply_waiting(Node, Waiting, false),
+ case Type of
+ normal ->
+ ?nodedown(Node, State);
+ _ ->
+ ok
+ end
+ end,
+ delete_owner(Owner, State);
+pending_nodedown(#connection{owner = Owner,
+ ctrlr = Ctrlr,
+ waiting = Waiting},
+ Exited, Node, Type, State) when Ctrlr =:= Exited ->
+ %% Controller exited!
+ %%
+ %% Controller has been registered but crashed
+ %% before sending mark up message...
+ %%
+ %% 'nodeup' messages has been sent by the emulator,
+ %% so bar the connection...
+ mark_sys_dist_nodedown(Node),
+ reply_waiting(Node,Waiting, true),
case Type of
- normal ->
- ?nodedown(Node, State);
- _ ->
- ok
+ normal ->
+ ?nodedown(Node, State);
+ _ ->
+ ok
end,
+ delete_owner(Owner, delete_ctrlr(Ctrlr, State));
+pending_nodedown(_Conn, _Exited, _Node, _Type, State) ->
State.
-up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
- AcceptPid = Conn#connection.pending_owner,
+up_pending_nodedown(#connection{owner = Owner,
+ ctrlr = Ctrlr,
+ pending_owner = AcceptPid} = Conn,
+ Exited, Node, _Reason,
+ _Type, State) when Ctrlr =:= Exited ->
+ %% Controller exited!
Owners = State#state.conn_owners,
Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners),
- erts_internal:abort_connection(Node, Conn#connection.conn_id),
Conn1 = Conn#connection { owner = AcceptPid,
conn_id = erts_internal:new_connection(Node),
+ ctrlr = undefined,
pending_owner = undefined,
state = pending },
ets:insert(sys_dist, Conn1),
AcceptPid ! {self(), pending},
- State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}.
+ State1 = State#state{conn_owners = [{AcceptPid,Node}|Owners],
+ pend_owners = Pend},
+ delete_owner(Owner, delete_ctrlr(Ctrlr, State1));
+up_pending_nodedown(#connection{owner = Owner},
+ Exited, _Node, _Reason,
+ _Type, State) when Owner =:= Exited ->
+ %% Owner exited!
+ delete_owner(Owner, State);
+up_pending_nodedown(_Conn, _Exited, _Node, _Reason, _Type, State) ->
+ State.
-up_nodedown(Conn, Node, _Reason, Type, State) ->
- mark_sys_dist_nodedown(Conn, Node),
+up_nodedown(#connection{owner = Owner,
+ ctrlr = Ctrlr},
+ Exited, Node, _Reason, Type, State) when Ctrlr =:= Exited ->
+ %% Controller exited!
+ mark_sys_dist_nodedown(Node),
case Type of
normal -> ?nodedown(Node, State);
_ -> ok
end,
+ delete_owner(Owner, delete_ctrlr(Ctrlr, State));
+up_nodedown(#connection{owner = Owner},
+ Exited, _Node, _Reason,
+ _Type, State) when Owner =:= Exited ->
+ %% Owner exited!
+ delete_owner(Owner, State);
+up_nodedown(_Conn, _Exited, _Node, _Reason, _Type, State) ->
State.
-mark_sys_dist_nodedown(Conn, Node) ->
- erts_internal:abort_connection(Node, Conn#connection.conn_id),
+mark_sys_dist_nodedown(Node) ->
case application:get_env(kernel, dist_auto_connect) of
{ok, once} ->
ets:insert(sys_dist, #barred_connection{node = Node});
@@ -1098,29 +1184,19 @@ mk_monitor_nodes_error(_Flag, Opts) ->
do_disconnect(Node, State) ->
case ets:lookup(sys_dist, Node) of
[Conn] when Conn#connection.state =:= up ->
- disconnect_pid(Conn#connection.owner, State);
+ disconnect_ctrlr(Conn#connection.ctrlr, State);
[Conn] when Conn#connection.state =:= up_pending ->
- disconnect_pid(Conn#connection.owner, State);
+ disconnect_ctrlr(Conn#connection.ctrlr, State);
_ ->
{false, State}
end.
-disconnect_pid(Pid, State) ->
- exit(Pid, disconnect),
-
- %% This code used to only use exit + recv 'EXIT' to sync,
- %% but since OTP-22 links are no longer broken atomically
- %% so the exit message below can arrive before any remaining
- %% exit messages have killed the distribution port
- Ref = erlang:monitor(process, Pid),
- %% Sync wait for connection to die!!!
+disconnect_ctrlr(Ctrlr, State) ->
+ exit(Ctrlr, disconnect),
receive
- {'DOWN',Ref,_,_,_} ->
- receive
- {'EXIT',Pid,Reason} ->
- {_,State1} = handle_exit(Pid, Reason, State),
- {true, State1}
- end
+ {'EXIT',Ctrlr,Reason} ->
+ {_,State1} = handle_exit(Ctrlr, Reason, State),
+ {true, State1}
end.
%%