diff options
author | Rickard Green <rickard@erlang.org> | 2019-11-27 18:23:10 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2019-11-27 18:23:10 +0100 |
commit | 6901bf27ade015c3bb6ff5766d7f71fbec90dd8f (patch) | |
tree | e2e879b262a3d82981fba4a6abd69886817e8f9a | |
parent | a09985fc298ead645210359f1fc2626da4ffe780 (diff) | |
parent | 46f4421c754c30b35dfd79fcb3c5c5fbb584e5e9 (diff) | |
download | erlang-6901bf27ade015c3bb6ff5766d7f71fbec90dd8f.tar.gz |
Merge branch 'rickard/nodedown-reason/OTP-16216' into maint
* rickard/nodedown-reason/OTP-16216:
net_kernel: save connection pending owners in map
net_kernel: save connection owners in map
Fix race causing nodedown reason to be lost
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/bif.tab | 8 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 198 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_lock_check.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 20 | ||||
-rw-r--r-- | erts/preloaded/ebin/erts_internal.beam | bin | 20408 -> 20428 bytes | |||
-rw-r--r-- | erts/preloaded/src/erts.app.src | 2 | ||||
-rw-r--r-- | erts/preloaded/src/erts_internal.erl | 6 | ||||
-rw-r--r-- | lib/kernel/src/kernel.app.src | 2 | ||||
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 286 |
13 files changed, 357 insertions, 176 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index cd388f94a0..57f3a53481 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -212,6 +212,7 @@ atom discard atom dist atom dist_cmd atom dist_ctrl_put_data +atom dist_ctrlr atom dist_data atom Div='/' atom div diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index c9f5177bd3..a5f46c4264 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -695,7 +695,6 @@ bif erlang:iolist_to_iovec/1 bif erts_internal:get_dflags/0 bif erts_internal:new_connection/1 -bif erts_internal:abort_connection/2 bif erts_internal:map_next/3 bif ets:whereis/1 bif erts_internal:gather_alloc_histograms/1 @@ -752,3 +751,10 @@ bif persistent_term:get/2 bif erts_internal:ets_lookup_binary_info/2 bif erts_internal:ets_raw_first/1 bif erts_internal:ets_raw_next/2 + +# +# New in 22.2 +# + +bif erts_internal:abort_pending_connection/2 + diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7baba4f1e3..ecb5291400 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -182,7 +182,8 @@ static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg); static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static Sint abort_connection(DistEntry* dep, Uint32 conn_id); +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id, + int *was_connected_p); static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*); static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*); int erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root, @@ -612,7 +613,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (no_pending) { for (i = 0; i < no_pending; i++) { - abort_connection(pending[i], pending[i]->connection_id); + abort_pending_connection(pending[i], pending[i]->connection_id, NULL); erts_deref_dist_entry(pending[i]); } erts_free(ERTS_ALC_T_TMP, pending); @@ -3762,8 +3763,9 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ BIF_RETTYPE setnode_2(BIF_ALIST_2) { - Process *net_kernel; + Process *net_kernel = NULL; Uint creation; + int success; /* valid creation ? */ if(!term_to_Uint(BIF_ARG_2, &creation)) @@ -3788,21 +3790,11 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, am_net_kernel, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, - 0); - if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) + 0, + ERTS_P2P_FLG_INC_REFC); + if (!net_kernel) goto error; - /* By setting F_DISTRIBUTION on net_kernel, - * erts_do_net_exits will be called when net_kernel is terminated !! */ - net_kernel->flags |= F_DISTRIBUTION; - - erts_proc_unlock(net_kernel, - (ERTS_PROC_LOCK_STATUS - | ((net_kernel != BIF_P) - ? ERTS_PROC_LOCK_MAIN - : 0))); - #ifdef DEBUG erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries); @@ -3810,25 +3802,44 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) #endif erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); - inc_no_nodes(); - erts_set_this_node(BIF_ARG_1, (Uint32) creation); - erts_is_alive = 1; - send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); - erts_thr_progress_unblock(); - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); - /* - * Note erts_this_dist_entry is changed by erts_set_this_node(), - * so we *need* to use the new one after erts_set_this_node() - * is called. - */ - erts_ref_dist_entry(erts_this_dist_entry); - ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); + success = (!ERTS_PROC_IS_EXITING(net_kernel) + & !ERTS_PROC_GET_DIST_ENTRY(net_kernel)); + if (success) { + inc_no_nodes(); + erts_set_this_node(BIF_ARG_1, (Uint32) creation); + erts_is_alive = 1; + send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); + erts_proc_lock(net_kernel, ERTS_PROC_LOCKS_ALL); + + /* By setting F_DISTRIBUTION on net_kernel, + * erts_do_net_exits will be called when net_kernel is terminated !! */ + net_kernel->flags |= F_DISTRIBUTION; + + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_ref_dist_entry(erts_this_dist_entry); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); + erts_proc_unlock(net_kernel, ERTS_PROC_LOCKS_ALL); + } + + erts_thr_progress_unblock(); - BIF_RET(am_true); + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + if (success) { + erts_proc_dec_refc(net_kernel); + BIF_RET(am_true); + } error: + if (net_kernel) + erts_proc_dec_refc(net_kernel); BIF_ERROR(BIF_P, BADARG); } @@ -3841,12 +3852,15 @@ typedef struct { DistEntry *dep; Uint flags; Uint version; + Eterm setup_pid; + Process *net_kernel; } ErtsSetupConnDistCtrl; -static void +static int setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, Eterm ctrlr, Uint flags, - Uint version); + Uint version, Eterm setup_pid, + Process *net_kernel); static Eterm setup_connection_distctrl(Process *c_p, void *arg, @@ -3861,7 +3875,20 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) DistEntry *dep = NULL; int de_locked = 0; Port *pp = NULL; + int true_nk; + Process *net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_STATUS, + ERTS_P2P_FLG_INC_REFC); + + if (!net_kernel) + goto badarg; + true_nk = ERTS_PROC_GET_DIST_ENTRY(net_kernel) == erts_this_dist_entry; + erts_proc_unlock(net_kernel, ERTS_PROC_LOCK_STATUS); + if (!true_nk) + goto badarg; + /* * Check and pick out arguments */ @@ -3923,8 +3950,12 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) scdc.dep = dep; scdc.flags = flags; scdc.version = version; + scdc.setup_pid = BIF_P->common.id; + scdc.net_kernel = net_kernel; res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL); + /* Dec of refc on net_kernel by setup_connection_distctrl() */ + net_kernel = NULL; BUMP_REDS(BIF_P, 5); dep = NULL; @@ -3943,6 +3974,8 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) scdcp->dep = dep; scdcp->flags = flags; scdcp->version = version; + scdcp->setup_pid = BIF_P->common.id; + scdcp->net_kernel = net_kernel; res = erts_proc_sig_send_rpc_request(BIF_P, BIF_ARG_2, @@ -3950,8 +3983,11 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) setup_connection_distctrl, (void *) scdcp); if (is_non_value(res)) - goto badarg; + goto badarg; /* Was not able to send signal... */ + /* Dec of refc on net_kernel by setup_connection_distctrl() */ + net_kernel = NULL; + dep = NULL; ASSERT(is_internal_ordinary_ref(res)); @@ -3962,6 +3998,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) } else { Uint32 conn_id; + int set_res; pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, @@ -4006,7 +4043,13 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) } conn_id = dep->connection_id; - setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); + set_res = setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, + version, BIF_P->common.id, + net_kernel); + /* Dec of refc on net_kernel by setup_connection_epiloge_rwunlock() */ + net_kernel = NULL; + if (set_res == 0) + goto badarg; de_locked = 0; hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE); @@ -4023,6 +4066,9 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) done: + if (net_kernel) + erts_proc_dec_refc(net_kernel); + if (dep && dep != erts_this_dist_entry) { if (de_locked) { if (de_locked > 0) @@ -4047,14 +4093,38 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) goto done; } -static void +static int setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, Eterm ctrlr, Uint flags, - Uint version) + Uint version, Eterm setup_pid, + Process *net_kernel) { Eterm notify_proc = NIL; erts_aint32_t qflgs; - + ErtsProcLocks nk_locks; + int success = 0; + + /* Notify net_kernel about the new dist controller... */ + ASSERT(net_kernel); + nk_locks = ERTS_PROC_LOCK_MSGQ; + erts_proc_lock(net_kernel, nk_locks); + if (!ERTS_PROC_IS_EXITING(net_kernel) + && ERTS_PROC_GET_DIST_ENTRY(net_kernel) == erts_this_dist_entry) { + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 /* 4-tuple */, + &hp, &ohp); + Eterm msg = TUPLE4(hp, am_dist_ctrlr, ctrlr, dep->sysname, setup_pid); + erts_queue_message(net_kernel, nk_locks, mp, msg, am_system); + success = !0; + } + erts_proc_unlock(net_kernel, nk_locks); + erts_proc_dec_refc(net_kernel); + + if (!success) + return 0; + dep->version = version; dep->creation = 0; @@ -4096,6 +4166,8 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, dep->sysname, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, NIL); + + return !0; } static Eterm @@ -4106,11 +4178,13 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * int dep_locked = 0; Eterm *hp; Uint32 conn_id; + int dec_net_kernel_on_error = !0; if (redsp) *redsp = 1; - ASSERT(!ERTS_PROC_IS_EXITING(c_p)); + if (ERTS_PROC_IS_EXITING(c_p)) + goto badarg; erts_de_rwlock(dep); dep_locked = !0; @@ -4123,16 +4197,30 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * if (is_not_nil(dep->cid)) goto badarg; + if (ERTS_PROC_GET_DIST_ENTRY(c_p)) + goto badarg; + + erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); c_p->flags |= F_DISTRIBUTION; ERTS_PROC_SET_DIST_ENTRY(c_p, dep); + erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); dep->send = NULL; /* Only for distr ports... */ if (redsp) *redsp = 5; - setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, - scdcp->flags, scdcp->version); + if (!setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, + scdcp->flags, scdcp->version, + scdcp->setup_pid, + scdcp->net_kernel)) { + erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); + c_p->flags &= ~F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(c_p, NULL); + erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); + dec_net_kernel_on_error = 0; /* dec:ed in epilog... */ + goto badarg; + } /* we take over previous inc in refc of dep */ @@ -4147,6 +4235,9 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * badarg: + if (dec_net_kernel_on_error) + erts_proc_dec_refc(scdcp->net_kernel); + if (bpp) /* not called directly */ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); @@ -4203,14 +4294,15 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) BIF_RET(dhandle); } -Sint erts_abort_connection_rwunlock(DistEntry* dep) +Sint erts_abort_pending_connection_rwunlock(DistEntry* dep, + int *was_connected_p) { ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); - if (dep->state == ERTS_DE_STATE_CONNECTED) { - kill_connection(dep); - } - else if (dep->state == ERTS_DE_STATE_PENDING) { + if (was_connected_p) + *was_connected_p = dep->state == ERTS_DE_STATE_CONNECTED; + + if (dep->state == ERTS_DE_STATE_PENDING) { ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; @@ -4255,20 +4347,24 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) return 0; } -static Sint abort_connection(DistEntry *dep, Uint32 conn_id) +static Sint abort_pending_connection(DistEntry *dep, Uint32 conn_id, + int *was_connected_p) { erts_de_rwlock(dep); if (dep->connection_id == conn_id) - return erts_abort_connection_rwunlock(dep); + return erts_abort_pending_connection_rwunlock(dep, was_connected_p); erts_de_rwunlock(dep); + if (was_connected_p) + *was_connected_p = 0; return 0; } -BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_abort_pending_connection_2(BIF_ALIST_2) { DistEntry* dep; Uint32 conn_id; Sint reds; + int was_connected; if (is_not_atom(BIF_ARG_1)) BIF_ERROR(BIF_P, BADARG); @@ -4278,9 +4374,9 @@ BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) BIF_ERROR(BIF_P, BADARG); } - reds = abort_connection(dep, conn_id); + reds = abort_pending_connection(dep, conn_id, &was_connected); BUMP_REDS(BIF_P, reds); - BIF_RET(am_true); + BIF_RET(was_connected ? am_false : am_true); } int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) @@ -4305,7 +4401,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - abort_connection(dep, conn_id); + abort_pending_connection(dep, conn_id, NULL); return 0; } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index ac05fead3a..f6cb79472f 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -316,7 +316,7 @@ extern void erts_kill_dist_connection(DistEntry *dep, Uint32); extern Uint erts_dist_cache_size(void); -extern Sint erts_abort_connection_rwunlock(DistEntry *dep); +extern Sint erts_abort_pending_connection_rwunlock(DistEntry *dep, int *); extern void erts_debug_dist_seq_tree_foreach( DistEntry *dep, diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 3aab4828cc..ba61f721b4 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -96,9 +96,9 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "erl_db_catree_route_node", "index" }, { "resource_monitors", "address" }, { "driver_list", NULL }, + { "dist_entry", "address" }, { "proc_msgq", "pid" }, { "proc_btm", "pid" }, - { "dist_entry", "address" }, { "dist_entry_links", "address" }, { "update_persistent_term_permission", NULL }, { "persistent_term_delete_permission", NULL }, diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 48c2f34031..0564aec846 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -492,7 +492,7 @@ static void try_delete_dist_entry(DistEntry* dep) if (dep->state != ERTS_DE_STATE_PENDING) ERTS_INTERNAL_ERROR("Garbage collecting connected distribution entry"); - erts_abort_connection_rwunlock(dep); + erts_abort_pending_connection_rwunlock(dep, NULL); report_gc_active_dist_entry(sysname, state); } else diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index d640e135b0..d311122381 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -858,8 +858,8 @@ typedef struct { #define ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS ERTS_PROC_LOCK_MAIN -#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN -#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_LC_PSD_ANY_LOCK +#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCKS_ALL #define ERTS_PSD_PENDING_SUSPEND_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_PENDING_SUSPEND_SET_LOCKS ERTS_PROC_LOCK_MAIN diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 382b1cff77..20a155e1d8 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3773,6 +3773,16 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, pectxt.port_id = prt->common.id; pectxt.reason = modified_reason; + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + ASSERT(dep); + erts_do_net_exits(dep, modified_reason); + erts_deref_dist_entry(dep); + erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL); + erts_atomic32_read_band_relb(&prt->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); + } + if (links) erts_monitor_tree_foreach_delete(&links, link_port_exit, @@ -3790,16 +3800,6 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, (void *) &pectxt); DRV_MONITOR_UNLOCK_PDL(prt); } - - if (state & ERTS_PORT_SFLG_DISTRIBUTION) { - DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); - ASSERT(dep); - erts_do_net_exits(dep, modified_reason); - erts_deref_dist_entry(dep); - erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL); - erts_atomic32_read_band_relb(&prt->state, - ~ERTS_PORT_SFLG_DISTRIBUTION); - } if ((reason != am_kill) && !is_port_ioq_empty(prt)) { /* must turn exiting flag off */ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 1756b108ff..354f2ec3c8 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/src/erts.app.src b/erts/preloaded/src/erts.app.src index c3479ec502..639744bfb7 100644 --- a/erts/preloaded/src/erts.app.src +++ b/erts/preloaded/src/erts.app.src @@ -42,7 +42,7 @@ {registered, []}, {applications, []}, {env, []}, - {runtime_dependencies, ["stdlib-3.5", "kernel-6.1", "sasl-3.3"]} + {runtime_dependencies, ["stdlib-3.5", "kernel-@OTP-16216@", "sasl-3.3"]} ]}. %% vim: ft=erlang diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index ea82c395d5..bbe9a9affa 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -68,7 +68,7 @@ -export([get_dflags/0]). -export([new_connection/1]). --export([abort_connection/2]). +-export([abort_pending_connection/2]). -export([scheduler_wall_time/1, system_flag_scheduler_wall_time/1, gather_sched_wall_time_result/1, @@ -568,10 +568,10 @@ get_dflags() -> new_connection(_Node) -> erlang:nif_error(undefined). --spec erts_internal:abort_connection(Node, ConnId) -> boolean() when +-spec erts_internal:abort_pending_connection(Node, ConnId) -> boolean() when Node :: atom(), ConnId :: {integer(), erlang:dist_handle()}. -abort_connection(_Node, _ConnId) -> +abort_pending_connection(_Node, _ConnId) -> erlang:nif_error(undefined). %% Scheduler wall time diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src index 4f5e6d782f..1fdb390a32 100644 --- a/lib/kernel/src/kernel.app.src +++ b/lib/kernel/src/kernel.app.src @@ -149,6 +149,6 @@ {logger_sasl_compatible, false} ]}, {mod, {kernel, []}}, - {runtime_dependencies, ["erts-10.2.5", "stdlib-3.5", "sasl-3.0"]} + {runtime_dependencies, ["erts-@OTP-16216@", "stdlib-3.5", "sasl-3.0"]} ] }. diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index e333bcd307..4a68e6676d 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -102,8 +102,9 @@ tick, %% tick information connecttime, %% the connection setuptime. connections, %% table of connections - conn_owners = [], %% List of connection owner pids, - pend_owners = [], %% List of potential owners + conn_owners = #{}, %% Map of connection owner pids, + dist_ctrlrs = #{}, %% Map of dist controllers (local ports or pids), + pend_owners = #{}, %% Map of potential owners listen, %% list of #listen allowed, %% list of allowed nodes in a restricted system verbose = 0, %% level of verboseness @@ -125,6 +126,7 @@ conn_id, %% Connection identity state, %% pending | up | up_pending owner, %% owner pid + ctrlr, %% Controller port or pid pending_owner, %% possible new owner address, %% #net_address waiting = [], %% queued processes @@ -356,7 +358,7 @@ do_auto_connect_1(Node, ConnId, From, State) -> spawn(?MODULE,passive_connect_monitor,[From,Node]), {noreply, State}; _ -> - erts_internal:abort_connection(Node, ConnId), + erts_internal:abort_pending_connection(Node, ConnId), {reply, false, State} end; @@ -389,7 +391,7 @@ do_auto_connect_2(Node, ConnId, From, State, ConnLookup) -> case application:get_env(kernel, dist_auto_connect) of {ok, never} -> ?connect_failure(Node,{dist_auto_connect,never}), - erts_internal:abort_connection(Node, ConnId), + erts_internal:abort_pending_connection(Node, ConnId), {reply, false, State}; %% This might happen due to connection close @@ -399,16 +401,15 @@ do_auto_connect_2(Node, ConnId, From, State, ConnLookup) -> (hd(ConnLookup))#connection.state =:= up -> ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), - erts_internal:abort_connection(Node, ConnId), {reply, false, State}; _ -> case setup(Node, ConnId, normal, From, State) of {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; + Owners = State#state.conn_owners, + {noreply,State#state{conn_owners=Owners#{SetupPid => Node}}}; _Error -> ?connect_failure(Node, {setup_call, failed, _Error}), - erts_internal:abort_connection(Node, ConnId), + erts_internal:abort_pending_connection(Node, ConnId), {reply, false, State} end end @@ -428,8 +429,8 @@ do_explicit_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> do_explicit_connect(_ConnLookup, Type, Node, ConnId, From , State) -> case setup(Node,ConnId,Type,From,State) of {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; + Owners = State#state.conn_owners, + {noreply,State#state{conn_owners=Owners#{SetupPid => Node}}}; _Error -> ?connect_failure(Node, {setup_call, failed, _Error}), {reply, false, State} @@ -468,7 +469,7 @@ handle_call({connect, Type, Node}, From, State) -> {noreply, _S} -> %% connection pending ok; {reply, false, _S} -> %% connection refused - erts_internal:abort_connection(Node, ConnId) + erts_internal:abort_pending_connection(Node, ConnId) end, R1 catch @@ -715,20 +716,39 @@ handle_info({accept,AcceptPid,Socket,Family,Proto}, State) -> end; %% +%% New dist controller has been registered +%% +handle_info({dist_ctrlr, Ctrlr, Node, SetupPid} = Msg, + #state{dist_ctrlrs = DistCtrlrs} = State) -> + case ets:lookup(sys_dist, Node) of + [Conn] when (Conn#connection.state =:= pending) + andalso (Conn#connection.owner =:= SetupPid) + andalso (Conn#connection.ctrlr =:= undefined) + andalso (is_port(Ctrlr) orelse is_pid(Ctrlr)) + andalso (node(Ctrlr) == node()) -> + link(Ctrlr), + ets:insert(sys_dist, Conn#connection{ctrlr = Ctrlr}), + {noreply, State#state{dist_ctrlrs = DistCtrlrs#{Ctrlr => Node}}}; + _ -> + error_msg("Net kernel got ~tw~n",[Msg]), + {noreply, State} + end; + +%% %% A node has successfully been connected. %% -handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, - State) -> +handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, State) -> case {Immediate, ets:lookup(sys_dist, Node)} of - {true, [Conn]} when Conn#connection.state =:= pending, - Conn#connection.owner =:= SetupPid -> - ets:insert(sys_dist, Conn#connection{state = up, - address = Address, - waiting = [], - type = Type}), - SetupPid ! {self(), inserted}, - reply_waiting(Node,Conn#connection.waiting, true), - {noreply, State}; + {true, [Conn]} when (Conn#connection.state =:= pending) + andalso (Conn#connection.owner =:= SetupPid) + andalso (Conn#connection.ctrlr /= undefined) -> + ets:insert(sys_dist, Conn#connection{state = up, + address = Address, + waiting = [], + type = Type}), + SetupPid ! {self(), inserted}, + reply_waiting(Node,Conn#connection.waiting, true), + {noreply, State}; _ -> SetupPid ! {self(), bad_request}, {noreply, State} @@ -756,21 +776,17 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> {'EXIT', OldOwner, _} -> true end, - Owners = lists:keyreplace(OldOwner, - 1, - State#state.conn_owners, - {AcceptPid, Node}), ets:insert(sys_dist, Conn#connection{owner = AcceptPid}), AcceptPid ! {self(),{accept_pending,ok_pending}}, - State1 = State#state{conn_owners=Owners}, - {noreply,State1} + Owners = maps:remove(OldOwner, State#state.conn_owners), + {noreply, State#state{conn_owners=Owners#{AcceptPid => Node}}} end; [#connection{state=up}=Conn] -> AcceptPid ! {self(), {accept_pending, up_pending}}, ets:insert(sys_dist, Conn#connection { pending_owner = AcceptPid, state = up_pending }), - Pend = [{AcceptPid, Node} | State#state.pend_owners ], - {noreply, State#state { pend_owners = Pend }}; + Pend = State#state.pend_owners, + {noreply, State#state { pend_owners = Pend#{AcceptPid => Node} }}; [#connection{state=up_pending}] -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; @@ -784,8 +800,8 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> address = Address, type = Type}), AcceptPid ! {self(),{accept_pending,ok}}, - Owners = [{AcceptPid,Node} | State#state.conn_owners], - {noreply, State#state{conn_owners = Owners}} + Owners = State#state.conn_owners, + {noreply, State#state{conn_owners = Owners#{AcceptPid => Node}}} catch _:_ -> error_logger:error_msg("~n** Cannot get connection id for node ~w~n", @@ -796,14 +812,17 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> end; handle_info({SetupPid, {is_pending, Node}}, State) -> - Reply = lists:member({SetupPid,Node},State#state.conn_owners), + Reply = case maps:get(SetupPid, State#state.conn_owners, undefined) of + Node -> true; + _ -> false + end, SetupPid ! {self(), {is_pending, Reply}}, {noreply, State}; %% %% Handle different types of process terminations. %% -handle_info({'EXIT', From, Reason}, State) when is_pid(From) -> +handle_info({'EXIT', From, Reason}, State) -> verbose({'EXIT', From, Reason}, 1, State), handle_exit(From, Reason, State); @@ -827,14 +846,22 @@ handle_info({From,badcookie,_To,_Mess}, State) -> %% handle_info(tick, State) -> ?tckr_dbg(tick), - lists:foreach(fun({Pid,_Node}) -> Pid ! {self(), tick} end, - State#state.conn_owners), + ok = maps:fold(fun (Pid, _Node, ok) -> + Pid ! {self(), tick}, + ok + end, + ok, + State#state.conn_owners), {noreply,State}; handle_info(aux_tick, State) -> ?tckr_dbg(aux_tick), - lists:foreach(fun({Pid,_Node}) -> Pid ! {self(), aux_tick} end, - State#state.conn_owners), + ok = maps:fold(fun (Pid, _Node, ok) -> + Pid ! {self(), aux_tick}, + ok + end, + ok, + State#state.conn_owners), {noreply,State}; handle_info(transition_period_end, @@ -873,6 +900,7 @@ do_handle_exit(Pid, Reason, State) -> listen_exit(Pid, State), accept_exit(Pid, State), conn_own_exit(Pid, Reason, State), + dist_ctrlr_exit(Pid, Reason, State), pending_own_exit(Pid, State), ticker_exit(Pid, State), {noreply,State}. @@ -900,21 +928,24 @@ accept_exit(Pid, State) -> false end. -conn_own_exit(Pid, Reason, State) -> - Owners = State#state.conn_owners, - case lists:keysearch(Pid, 1, Owners) of - {value, {Pid, Node}} -> - throw({noreply, nodedown(Pid, Node, Reason, State)}); - _ -> - false +conn_own_exit(Pid, Reason, #state{conn_owners = Owners} = State) -> + case maps:get(Pid, Owners, undefined) of + undefined -> false; + Node -> throw({noreply, nodedown(Pid, Node, Reason, State)}) + end. + +dist_ctrlr_exit(Pid, Reason, #state{dist_ctrlrs = DCs} = State) -> + case maps:get(Pid, DCs, undefined) of + undefined -> false; + Node -> throw({noreply, nodedown(Pid, Node, Reason, State)}) end. -pending_own_exit(Pid, State) -> - Pend = State#state.pend_owners, - case lists:keysearch(Pid, 1, Pend) of - {value, {Pid, Node}} -> - NewPend = lists:keydelete(Pid, 1, Pend), - State1 = State#state { pend_owners = NewPend }, +pending_own_exit(Pid, #state{pend_owners = Pend} = State) -> + case maps:get(Pid, Pend, undefined) of + undefined -> + false; + Node -> + State1 = State#state { pend_owners = maps:remove(Pid, Pend)}, case get_conn(Node) of {ok, Conn} when Conn#connection.state =:= up_pending -> reply_waiting(Node,Conn#connection.waiting, true), @@ -925,9 +956,7 @@ pending_own_exit(Pid, State) -> _ -> ok end, - throw({noreply, State1}); - _ -> - false + throw({noreply, State1}) end. ticker_exit(Pid, #state{tick = #tick{ticker = Pid, time = T} = Tck} = State) -> @@ -945,10 +974,10 @@ ticker_exit(_, _) -> %% nodedown(Owner, Node, Reason, State) -> State' %% ----------------------------------------------------------- -nodedown(Owner, Node, Reason, State) -> +nodedown(Exited, Node, Reason, State) -> case get_conn(Node) of {ok, Conn} -> - nodedown(Conn, Owner, Node, Reason, Conn#connection.type, State); + nodedown(Conn, Exited, Node, Reason, Conn#connection.type, State); _ -> State end. @@ -959,58 +988,117 @@ get_conn(Node) -> _ -> error end. -nodedown(Conn, Owner, Node, Reason, Type, OldState) -> - Owners = lists:keydelete(Owner, 1, OldState#state.conn_owners), - State = OldState#state{conn_owners = Owners}, +delete_owner(Owner, #state{conn_owners = Owners} = State) -> + State#state{conn_owners = maps:remove(Owner, Owners)}. + +delete_ctrlr(Ctrlr, #state{dist_ctrlrs = DCs} = State) -> + State#state{dist_ctrlrs = maps:remove(Ctrlr, DCs)}. + +nodedown(Conn, Exited, Node, Reason, Type, State) -> case Conn#connection.state of - pending when Conn#connection.owner =:= Owner -> - pending_nodedown(Conn, Node, Type, State); - up when Conn#connection.owner =:= Owner -> - up_nodedown(Conn, Node, Reason, Type, State); - up_pending when Conn#connection.owner =:= Owner -> - up_pending_nodedown(Conn, Node, Reason, Type, State); + pending -> + pending_nodedown(Conn, Exited, Node, Type, State); + up -> + up_nodedown(Conn, Exited, Node, Reason, Type, State); + up_pending -> + up_pending_nodedown(Conn, Exited, Node, Reason, Type, State); _ -> - OldState + State end. -pending_nodedown(Conn, Node, Type, State) -> - % Don't bar connections that have never been alive - %mark_sys_dist_nodedown(Node), - % - instead just delete the node: - erts_internal:abort_connection(Node, Conn#connection.conn_id), - ets:delete(sys_dist, Node), - reply_waiting(Node,Conn#connection.waiting, false), +pending_nodedown(#connection{owner = Owner, + waiting = Waiting, + conn_id = CID}, + Exited, Node, Type, State) when Owner =:= Exited -> + %% Owner exited! + case erts_internal:abort_pending_connection(Node, CID) of + false -> + %% Just got connected but that message has not + %% reached us yet. Wait for controller to exit and + %% handle this then... + ok; + true -> + %% Don't bar connections that have never been alive, i.e. + %% no 'mark_sys_dist_nodedown(Node)'; instead just delete + %% the node: + ets:delete(sys_dist, Node), + reply_waiting(Node, Waiting, false), + case Type of + normal -> + ?nodedown(Node, State); + _ -> + ok + end + end, + delete_owner(Owner, State); +pending_nodedown(#connection{owner = Owner, + ctrlr = Ctrlr, + waiting = Waiting}, + Exited, Node, Type, State) when Ctrlr =:= Exited -> + %% Controller exited! + %% + %% Controller has been registered but crashed + %% before sending mark up message... + %% + %% 'nodeup' messages has been sent by the emulator, + %% so bar the connection... + mark_sys_dist_nodedown(Node), + reply_waiting(Node,Waiting, true), case Type of - normal -> - ?nodedown(Node, State); - _ -> - ok + normal -> + ?nodedown(Node, State); + _ -> + ok end, + delete_owner(Owner, delete_ctrlr(Ctrlr, State)); +pending_nodedown(_Conn, _Exited, _Node, _Type, State) -> State. -up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> - AcceptPid = Conn#connection.pending_owner, - Owners = State#state.conn_owners, - Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners), - erts_internal:abort_connection(Node, Conn#connection.conn_id), +up_pending_nodedown(#connection{owner = Owner, + ctrlr = Ctrlr, + pending_owner = AcceptPid} = Conn, + Exited, Node, _Reason, + _Type, State) when Ctrlr =:= Exited -> + %% Controller exited! Conn1 = Conn#connection { owner = AcceptPid, conn_id = erts_internal:new_connection(Node), + ctrlr = undefined, pending_owner = undefined, state = pending }, ets:insert(sys_dist, Conn1), AcceptPid ! {self(), pending}, - State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}. + Pend = maps:remove(AcceptPid, State#state.pend_owners), + Owners = State#state.conn_owners, + State1 = State#state{conn_owners = Owners#{AcceptPid => Node}, + pend_owners = Pend}, + delete_owner(Owner, delete_ctrlr(Ctrlr, State1)); +up_pending_nodedown(#connection{owner = Owner}, + Exited, _Node, _Reason, + _Type, State) when Owner =:= Exited -> + %% Owner exited! + delete_owner(Owner, State); +up_pending_nodedown(_Conn, _Exited, _Node, _Reason, _Type, State) -> + State. -up_nodedown(Conn, Node, _Reason, Type, State) -> - mark_sys_dist_nodedown(Conn, Node), +up_nodedown(#connection{owner = Owner, + ctrlr = Ctrlr}, + Exited, Node, _Reason, Type, State) when Ctrlr =:= Exited -> + %% Controller exited! + mark_sys_dist_nodedown(Node), case Type of normal -> ?nodedown(Node, State); _ -> ok end, + delete_owner(Owner, delete_ctrlr(Ctrlr, State)); +up_nodedown(#connection{owner = Owner}, + Exited, _Node, _Reason, + _Type, State) when Owner =:= Exited -> + %% Owner exited! + delete_owner(Owner, State); +up_nodedown(_Conn, _Exited, _Node, _Reason, _Type, State) -> State. -mark_sys_dist_nodedown(Conn, Node) -> - erts_internal:abort_connection(Node, Conn#connection.conn_id), +mark_sys_dist_nodedown(Node) -> case application:get_env(kernel, dist_auto_connect) of {ok, once} -> ets:insert(sys_dist, #barred_connection{node = Node}); @@ -1098,29 +1186,19 @@ mk_monitor_nodes_error(_Flag, Opts) -> do_disconnect(Node, State) -> case ets:lookup(sys_dist, Node) of [Conn] when Conn#connection.state =:= up -> - disconnect_pid(Conn#connection.owner, State); + disconnect_ctrlr(Conn#connection.ctrlr, State); [Conn] when Conn#connection.state =:= up_pending -> - disconnect_pid(Conn#connection.owner, State); + disconnect_ctrlr(Conn#connection.ctrlr, State); _ -> {false, State} end. -disconnect_pid(Pid, State) -> - exit(Pid, disconnect), - - %% This code used to only use exit + recv 'EXIT' to sync, - %% but since OTP-22 links are no longer broken atomically - %% so the exit message below can arrive before any remaining - %% exit messages have killed the distribution port - Ref = erlang:monitor(process, Pid), - %% Sync wait for connection to die!!! +disconnect_ctrlr(Ctrlr, State) -> + exit(Ctrlr, disconnect), receive - {'DOWN',Ref,_,_,_} -> - receive - {'EXIT',Pid,Reason} -> - {_,State1} = handle_exit(Pid, Reason, State), - {true, State1} - end + {'EXIT',Ctrlr,Reason} -> + {_,State1} = handle_exit(Ctrlr, Reason, State), + {true, State1} end. %% |