diff options
author | Rickard Green <rickard@erlang.org> | 2022-02-06 23:50:35 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2022-02-06 23:50:35 +0100 |
commit | 20c9e3428bb7f902447b755289e09eecbeac73dc (patch) | |
tree | 5234a3a31c8e59e8a99f94fdb5f0d19546b35b82 /lib/kernel/src/global.erl | |
parent | df48c260e74c3e9058ff8681ce9f554e6fa0fe34 (diff) | |
parent | 78c9e20ed86831c6d9618735c295475a7b9fedc4 (diff) | |
download | erlang-20c9e3428bb7f902447b755289e09eecbeac73dc.tar.gz |
Merge branch 'rickard/prevent-overlapping-partitions/23.3.4/ERIERL-732/OTP-17843' into rickard/prevent-overlapping-partitions/24.2/ERIERL-732/OTP-17843
* rickard/prevent-overlapping-partitions/23.3.4/ERIERL-732/OTP-17843:
global: Preventing overlapping partitions fix
global: Propagate and save version between all nodes
Diffstat (limited to 'lib/kernel/src/global.erl')
-rw-r--r-- | lib/kernel/src/global.erl | 528 |
1 files changed, 464 insertions, 64 deletions
diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl index 03f6c0aa94..32ad20bc61 100644 --- a/lib/kernel/src/global.erl +++ b/lib/kernel/src/global.erl @@ -32,7 +32,7 @@ whereis_name/1, register_name/2, register_name/3, register_name_external/2, register_name_external/3, unregister_name_external/1,re_register_name/2, re_register_name/3, - unregister_name/1, registered_names/0, send/2, node_disconnected/1, + unregister_name/1, registered_names/0, send/2, set_lock/1, set_lock/2, set_lock/3, del_lock/1, del_lock/2, trans/2, trans/3, trans/4, @@ -54,6 +54,10 @@ -define(N_CONNECT_RETRIES, global_connect_retries). -define(DEFAULT_N_CONNECT_RETRIES, 0). +%% Time that we keep information about multicasted lost_connection +%% messages... +-define(lost_conn_info_cleanup_time, 60*60*1000). + %%% In certain places in the server, calling io:format hangs everything, %%% so we'd better use erlang:display/1. %%% my_tracer is used in testsuites @@ -70,6 +74,9 @@ -define(trace(_), ok). -endif. +-define(MAX_64BIT_SMALL_INT, ((1 bsl 59) - 1)). +-define(MIN_64BIT_SMALL_INT, (-(1 bsl 59))). + %% These are the protocol versions: %% Vsn 1 is the original protocol. %% Vsn 2 is enhanced with code to take care of registration of names from @@ -82,15 +89,26 @@ %% nodes in the own partition. (R11B-) %% Vsn 6 does not send any message between locker processes on different %% nodes, but uses the server as a proxy. +%% Vsn 7 - propagate global versions between nodes, so we always know +%% versions of known nodes +%% - optional "prevent overlapping partitions" fix supported %% Current version of global does not support vsn 4 or earlier. --define(vsn, 6). +-define(vsn, 7). + +%% Version when the "prevent overlapping partitions" fix was introduced. +-define(pop_vsn, 7). +%% Version when the "propagate global protocol versions" feature +%% was introduced. +-define(pgpv_vsn, 7). %%----------------------------------------------------------------- %% connect_all = boolean() - true if we are supposed to set up a %% fully connected net -%% known = [Node] - all nodes known to us +%% known = #{} - Map of known nodes including protocol version +%% as well as some other information. See state +%% record declaration below for more info. %% synced = [Node] - all nodes that have the same names as us %% resolvers = [{Node, MyTag, Resolver}] - %% the tag separating different synch sessions, @@ -112,8 +130,21 @@ %% {sync_tag_his, Node} = The Node's tag, used at synchronization %% {lock_id, Node} = The resource locking the partitions %%----------------------------------------------------------------- --record(state, {connect_all :: boolean(), - known = [] :: [node()], +-record(conf, {connect_all :: boolean(), + prevent_over_part :: boolean() + }). + +-record(state, {conf = #conf{} :: #conf{}, + known = #{} :: #{ + %% Known connected node with protocol + %% version as value + node() => non_neg_integer(), + %% Connecting node, not yet known, with + %% protocol version as value + {pending, node()} => non_neg_integer(), + %% Node currently being removed + {removing, node()} => yes + }, synced = [] :: [node()], resolvers = [], syncers = [] :: [pid()], @@ -142,6 +173,12 @@ %%% Resources locked by Pid. %%% ref() is the same ref() as in global_locks. %%% +%%% global_lost_connections (set): +%%% {{NodeA, NodeB}, {ExtendedCreationA, OpIdA, Timer} +%%% Information about lost connections (reported by NodeA) used by +%%% the "prevent overlapping partitions" fix. The timer is used is +%%% used to remove the entry when not needed anymore. +%%% %%% global_pid_names is a 'bag' for backward compatibility. %%% (Before vsn 5 more than one name could be registered for a process.) %%% @@ -207,9 +244,6 @@ send(Name, Msg) -> whereis_name(Name) -> where(Name). -node_disconnected(Node) -> - global_name_server ! {nodedown, Node}. - %%----------------------------------------------------------------- %% Method = function(Name, Pid1, Pid2) -> Pid | Pid2 | none %% Method is called if a name conflict is detected when two nodes @@ -452,6 +486,7 @@ init([]) -> _ = ets:new(global_pid_names, [bag, named_table, protected]), _ = ets:new(global_pid_ids, [bag, named_table, protected]), + _ = ets:new(global_lost_connections, [set, named_table, protected]), %% This is for troubleshooting only. DoTrace = os:getenv("GLOBAL_HIGH_LEVEL_TRACE") =:= "TRUE", @@ -469,10 +504,28 @@ init([]) -> _ -> true end, + POP = case application:get_env(kernel, + prevent_overlapping_partitions) of + {ok, Bool} when Bool == true; Bool == false -> + Bool; + {ok, Invalid} -> + error({invalid_parameter_value, + prevent_overlapping_partitions, + Invalid}); + undefined -> + false + end, S = #state{the_locker = start_the_locker(DoTrace), trace = T0, the_registrar = start_the_registrar(), - connect_all = Ca}, + conf = #conf{connect_all = Ca, + prevent_over_part = POP}}, + _ = rand:seed(default, + (erlang:monotonic_time(nanosecond) rem 1000000000) + + (erlang:system_time(nanosecond) rem 1000000000)), + CreX = ((rand:uniform(?MAX_64BIT_SMALL_INT - ?MIN_64BIT_SMALL_INT) + - 1) band (bnot ((1 bsl 32) -1))), + put(creation_extension, CreX), {ok, trace_message(S, {init, node()}, [])}. %%----------------------------------------------------------------- @@ -593,6 +646,51 @@ init([]) -> %% sent by each node to all new nodes (Node becomes known to them) %%----------------------------------------------------------------- +%% ---------------------------------------------------------------- +%% Prevent Overlapping Partitions Algorithm +%% ======================================== +%% +%% 1. When a node lose connection to another node it sends a +%% {lost_connection, LostConnNode, OtherNode} message to all +%% other nodes that it knows of. +%% 2. When a lost_connection message is received the receiver +%% first checks if it has seen this message before. If so, it +%% just ignores it. If it has not seen it before, it sends the +%% message to all nodes it knows of. This in order to ensure +%% that all connected nodes will receive this message. It then +%% sends a {remove_connection, LostConnRecvNode} message (where +%% LostConnRecvNode is its own node name) to OtherNode and +%% clear all information about OtherNode so OtherNode wont be +%% part of ReceiverNode's cluster anymore. When this information +%% has been cleared, no lost_connection will be triggered when +%% a nodedown message for the connection to OtherNode is +%% received. +%% 3. When a {remove_connection, LostConnRecvNode} message is +%% received, the receiver node takes down the connection to +%% LostConnRecvNode and clears its information about +%% LostConnRecvNode so it is not part of its cluster anymore. +%% Both nodes will receive a nodedown message due to the +%% connection being closed, but none of them will send +%% lost_connection messages since they have cleared information +%% about the other node. +%% +%% This will take down more connections than the minimum amount +%% of connections to remove in order to form fully connected +%% partitions. For example, if the user takes down a connection +%% between two nodes, the rest of the nodes will disconnect from +%% both of these nodes instead of just one. This is due to: +%% * We do not want to partition a remaining network when a node +%% has halted. When you receive a nodedown and/or lost_connection +%% messages you don't know if the corresponding node has halted +%% or if there are network issues. +%% * We need to decide which connection to take down as soon as +%% we receive a lost_connection message in order to prevent +%% inconsistencies entering global's state. +%% * All nodes need to make the same choices independent of +%% each other. +%% +%% ---------------------------------------------------------------- + -spec handle_call(term(), {pid(), term()}, state()) -> {'noreply', state()} | {'reply', term(), state()} | @@ -625,7 +723,7 @@ handle_call({del_lock, Lock}, {Pid, _Tag}, S0) -> {reply, true, S}; handle_call(get_known, _From, S) -> - {reply, S#state.known, S}; + {reply, mk_known_list(0, S), S}; handle_call(get_synced, _From, S) -> {reply, S#state.synced, S}; @@ -676,27 +774,28 @@ handle_call(Request, From, S) -> -spec handle_cast(term(), state()) -> {'noreply', state()}. -handle_cast({init_connect, Vsn, Node, InitMsg}, S) -> +handle_cast({init_connect, Vsn, Node, InitMsg}, S0) -> %% Sent from global_name_server at Node. ?trace({'####', init_connect, {vsn, Vsn}, {node,Node},{initmsg,InitMsg}}), - case Vsn of - %% It is always the responsibility of newer versions to understand - %% older versions of the protocol. - {HisVsn, HisTag} when HisVsn > ?vsn -> - init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S); - {HisVsn, HisTag} -> - init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S); - %% To be future compatible - Tuple when is_tuple(Tuple) -> - List = tuple_to_list(Tuple), - [_HisVsn, HisTag | _] = List, - %% use own version handling if his is newer. - init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S); - _ -> - Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n", - [Vsn, Node]), - error_logger:info_report(lists:flatten(Txt)) - end, + S = case Vsn of + %% It is always the responsibility of newer versions to understand + %% older versions of the protocol. + {HisVsn, HisTag} when HisVsn > ?vsn -> + init_connect(?vsn, Node, InitMsg, HisTag, HisVsn, S0); + {HisVsn, HisTag} -> + init_connect(HisVsn, Node, InitMsg, HisTag, HisVsn, S0); + %% To be future compatible + Tuple when is_tuple(Tuple) -> + List = tuple_to_list(Tuple), + [HisVsn, HisTag | _] = List, + %% use own version handling if his is newer. + init_connect(?vsn, Node, InitMsg, HisTag, HisVsn, S0); + _ -> + Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n", + [Vsn, Node]), + error_logger:info_report(lists:flatten(Txt)), + S0 + end, {noreply, S}; %%======================================================================= @@ -745,10 +844,10 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) -> S = trace_message(S0, {exit_resolver, Node}, [MyTag]), case get({sync_tag_my, Node}) of MyTag -> - Known = S#state.known, + Known = mk_known_list(node_vsn(Node, S), S), gen_server:cast({global_name_server, Node}, {resolved, node(), Resolved, Known, - Known,get_names_ext(),get({sync_tag_his,Node})}), + unused,get_names_ext(),get({sync_tag_his,Node})}), case get({save_ops, Node}) of {resolved, HisKnown, Names_ext, HisResolved} -> put({save_ops, Node}, Ops), @@ -768,7 +867,7 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) -> %% %% Here the name clashes are resolved. %%======================================================================== -handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2, +handle_cast({resolved, Node, HisResolved, HisKnown, _Unused, Names_ext, MyTag}, S) -> %% Sent from global_name_server at Node. ?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}), @@ -805,7 +904,7 @@ handle_cast({new_nodes, Node, Ops, Names_ext, Nodes, ExtraInfo}, S) -> %% %% We are in sync with this node (from the other node's known world). %%======================================================================== -handle_cast({in_sync, Node, _IsKnown}, S) -> +handle_cast({in_sync, Node, _IsKnown}, #state{known = Known} = S) -> %% Sent from global_name_server at Node (in the other partition). ?trace({'####', in_sync, {Node, _IsKnown}}), lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers), @@ -815,7 +914,8 @@ handle_cast({in_sync, Node, _IsKnown}, S) -> true -> Synced; false -> [Node | Synced] end, - {noreply, NewS#state{synced = NSynced}}; + {noreply, NewS#state{known = maps:remove({pending, Node}, Known), + synced = NSynced}}; %% Called when Pid on other node crashed handle_cast({async_del_name, _Name, _Pid}, S) -> @@ -868,13 +968,20 @@ handle_info({nodedown, Node}, S) when Node =:= S#state.node_name -> handle_info({nodedown, Node}, S0) -> ?trace({'####', nodedown, {node,Node}}), S1 = trace_message(S0, {nodedown, Node}, []), - S = handle_nodedown(Node, S1), + S = handle_nodedown(Node, S1, disconnected), {noreply, S}; handle_info({extra_nodedown, Node}, S0) -> ?trace({'####', extra_nodedown, {node,Node}}), S1 = trace_message(S0, {extra_nodedown, Node}, []), - S = handle_nodedown(Node, S1), + S = handle_nodedown(Node, S1, disconnected), + {noreply, S}; + +handle_info({ignore_node, Node}, S0) -> + %% global_group wants us to ignore this node... + ?trace({'####', ignore_node, {node,Node}}), + S1 = trace_message(S0, {ignore_node, Node}, []), + S = handle_nodedown(Node, S1, ignore_node), {noreply, S}; handle_info({nodeup, Node}, S) when Node =:= node() -> @@ -883,11 +990,12 @@ handle_info({nodeup, Node}, S) when Node =:= node() -> %% references to old node name ('nonode@nohost') to Node. {noreply, change_our_node_name(Node, S)}; -handle_info({nodeup, _Node}, S) when not S#state.connect_all -> +handle_info({nodeup, _Node}, + #state{conf = #conf{connect_all = false}} = S) -> {noreply, S}; -handle_info({nodeup, Node}, S0) when S0#state.connect_all -> - IsKnown = lists:member(Node, S0#state.known) or +handle_info({nodeup, Node}, S0) -> + IsKnown = maps:is_key(Node, S0#state.known) or %% This one is only for double nodeups (shouldn't occur!) lists:keymember(Node, 1, S0#state.resolvers), ?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}), @@ -928,6 +1036,110 @@ handle_info({whereis, Name, From}, S) -> _ = do_whereis(Name, From), {noreply, S}; +handle_info({lost_connection, NodeA, XCreationA, OpIdA, NodeB} = Msg, + #state{conf = #conf{connect_all = true, + prevent_over_part = true}} = S0) -> + %% Message introduced in protocol version ?pop_vsn + %% + %% NodeA reports that it lost connection to NodeB. If we got a + %% connection to NodeB, we need to disconnect it in order to + %% prevent overlapping partitions... + LcKey = {NodeA, NodeB}, + S1 = case get_lost_connection_info(LcKey) of + {XCreationA, OpId, _Tmr} when OpIdA =< OpId -> + %% We have already handled this lost connection + %% message... + S0; + {_, _, Tmr} -> + %% Inform all other nodes we know of about this as well. This + %% in order to prevent that some nodes in this cluster wont + %% get the original message from NodeA. This ensures that all + %% nodes will know of this connection loss, even if some nodes + %% lose connection to NodeA while NodeA is multicasting that + %% it lost the connection to NodeB. + gns_volatile_multicast(Msg, NodeA, ?pop_vsn, true, S0), + + %% Save info about this lost_connection message... + save_lost_connection_info(LcKey, XCreationA, OpIdA, Tmr), + + RmNode = case node() == NodeB of + false -> + NodeB; + true -> + %% This toghether with NodeA being known by + %% us probably is unusal, but can happen + %% since lost_connection messages are + %% reapeted by receiving nodes. All other + %% nodes will remove us, so there is no + %% need to make them remove NodeA as well. + %% We therefore request removal from NodeA + %% and wait for the nodedown which + %% eventually will come since NodeA reported + %% this... + NodeA + end, + + case is_node_potentially_known(RmNode, S0) of + false -> + S0; + true -> + case node_vsn(RmNode, S0) of + Vsn when Vsn < ?pop_vsn -> + erlang:disconnect_node(RmNode), + error_logger:warning_msg( + "'global' at node ~p disconnected old " + "node ~p in order to prevent overlapping " + "partitions", + [node(), RmNode]), + ok; + _Vsn -> + gns_volatile_send(RmNode, + {remove_connection, node()}), + error_logger:warning_msg( + "'global' at node ~p requested disconnect " + "from node ~p in order to prevent " + "overlapping partitions", + [node(), RmNode]), + ok + end, + handle_nodedown(RmNode, S0, remove_connection) + end + end, + + {noreply, S1}; + +handle_info({lost_connection, _NodeA, _XCreationA, _OpIdA, _NodeB}, S) -> + %% Message introduced in protocol version ?pop_vsn + {noreply, S}; + +handle_info({timeout, _, _} = TmoMsg, S) -> + %% Instance specific message + remove_lost_connection_info(TmoMsg), + {noreply, S}; + +handle_info({remove_connection, Node}, S0) -> + %% Message introduced in protocol version ?pop_vsn + S2 = case is_node_potentially_known(Node, S0) of + false -> + S0; + true -> + erlang:disconnect_node(Node), + S1 = handle_nodedown(Node, S0, remove_connection), + error_logger:warning_msg( + "'global' at node ~p disconnected node ~p in order to " + "prevent overlapping partitions", [node(), Node]), + S1 + end, + {noreply, S2}; + +handle_info({prepare_shutdown, From, Ref}, S0) -> + %% Prevent lost_connection messages being sent due to + %% connections being taken down during the shutdown... + S1 = S0#state{conf = #conf{connect_all = false, + prevent_over_part = false}}, + From ! {Ref, ok}, + {noreply, S1}; + handle_info(known, S) -> io:format(">>>> ~p\n",[S#state.known]), {noreply, S}; @@ -1067,7 +1279,8 @@ check_replies([], _Id, _Replies) -> %% Another node wants to synchronize its registered names with us. %% Both nodes must have a lock before they are allowed to continue. %%======================================================================== -init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) -> +init_connect(Vsn, Node, InitMsg, HisTag, HisVsn, + #state{resolvers = Resolvers, known = Known} = S) -> %% It is always the responsibility of newer versions to understand %% older versions of the protocol. put({prot_vsn, Node}, Vsn), @@ -1083,7 +1296,9 @@ init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) -> false -> ?trace({init_connect,{pre_connect,Node},{histag,HisTag}}), put({pre_connect, Node}, {Vsn, InitMsg, HisTag}) - end. + end, + S#state{known = maps:put({pending, Node}, HisVsn, Known)}. + %%======================================================================== %% In the simple case, we'll get lock_is_set before we get exchange, @@ -1129,8 +1344,9 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> %% Known may have shrunk since the lock was taken (due to nodedowns). Known = S0#state.known, Synced = S0#state.synced, - NewNodes = [Node | HisKnown], - sync_others(HisKnown), + NewNodes = make_node_vsn_list([Node | HisKnown], S0), + HisKnownNodes = node_list(HisKnown), + sync_others(HisKnownNodes), ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}], S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0), %% I am synced with Node, but not with HisKnown yet @@ -1138,47 +1354,54 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> S3 = lists:foldl(fun(Node1, S1) -> F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end, cancel_resolved_locker(Node1, F) - end, S, HisKnown), + end, S, HisKnownNodes), %% The locker that took the lock is asked to send %% the {new_nodes, ...} message. This ensures that %% {del_lock, ...} is received after {new_nodes, ...} - %% (except when abcast spawns process(es)...). + NewNodesMsg = {new_nodes, node(), Ops, Names_ext, NewNodes, ExtraInfo}, NewNodesF = fun() -> - gen_server:abcast(Known, global_name_server, - {new_nodes, node(), Ops, Names_ext, - NewNodes, ExtraInfo}) + maps:foreach( + fun (N, V) when is_atom(N), V >= ?pgpv_vsn -> + gen_server:cast({global_name_server, N}, + NewNodesMsg); + (N, _OldV) when is_atom(N) -> + gen_server:cast({global_name_server, N}, + {new_nodes, node(), Ops, + Names_ext, node_list(NewNodes), + ExtraInfo}); + (_, _) -> + ok + end, + Known) end, F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end, S4 = cancel_resolved_locker(Node, F), %% See (*) below... we're node b in that description - AddedNodes = (NewNodes -- Known), - NewKnown = Known ++ AddedNodes, - S4#state.the_locker ! {add_to_known, AddedNodes}, - NewS = trace_message(S4, {added, AddedNodes}, - [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]), - NewS#state{known = NewKnown, synced = [Node | Synced]}. + {AddedNodes, S5} = add_to_known(NewNodes, S4), + S5#state.the_locker ! {add_to_known, AddedNodes}, + S6 = trace_message(S5, {added, AddedNodes}, + [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]), + S6#state{synced = [Node | Synced]}. cancel_resolved_locker(Node, CancelFun) -> Tag = get({sync_tag_my, Node}), ?trace({calling_cancel_locker,Tag,get()}), S = CancelFun(Tag), reset_node_state(Node), - S. + S#state{known = maps:remove({pending, Node}, S#state.known)}. new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) -> - Known = S0#state.known, %% (*) This one requires some thought... %% We're node a, other nodes b and c: %% The problem is that {in_sync, a} may arrive before {resolved, [a]} to %% b from c, leading to b sending {new_nodes, [a]} to us (node a). %% Therefore, we make sure we never get duplicates in Known. - AddedNodes = lists:delete(node(), Nodes -- Known), + {AddedNodes, S1} = add_to_known(Nodes, S0), sync_others(AddedNodes), - S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0), + S2 = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S1), ?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}), - S#state.the_locker ! {add_to_known, AddedNodes}, - S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]), - S1#state{known = Known ++ AddedNodes}. + S2#state.the_locker ! {add_to_known, AddedNodes}, + trace_message(S2, {added, AddedNodes}, [{ops,Ops}]). do_whereis(Name, From) -> case is_global_lock_set() of @@ -2070,15 +2293,192 @@ pid_locks(Ref) -> ref_is_locking(Ref, PidRefs) -> lists:keyfind(Ref, 2, PidRefs) =/= false. -handle_nodedown(Node, S) -> +handle_nodedown(Node, #state{synced = Syncs, + known = Known0} = S, What) -> %% DOWN signals from monitors have removed locks and registered names. - #state{known = Known, synced = Syncs} = S, NewS = cancel_locker(Node, S, get({sync_tag_my, Node})), NewS#state.the_locker ! {remove_from_known, Node}, reset_node_state(Node), - NewS#state{known = lists:delete(Node, Known), + Known1 = case What of + remove_connection -> + maps:put({removing, Node}, yes, Known0); + ignore_node -> + maps:remove({removing, Node}, Known0); + disconnected -> + case maps:get({removing, Node}, Known0, no) of + yes -> + maps:remove({removing, Node}, Known0); + no -> + inform_connection_loss(Node, S), + Known0 + end + end, + NewS#state{known = maps:remove(Node, + maps:remove({pending, Node}, Known1)), synced = lists:delete(Node, Syncs)}. +inform_connection_loss(Node, + #state{conf = #conf{connect_all = true, + prevent_over_part = true}} = S) -> + Msg = {lost_connection, + node(), + (get(creation_extension) + + erlang:system_info(creation) + - ?MIN_64BIT_SMALL_INT), + erlang:unique_integer([monotonic]), + Node}, + gns_volatile_multicast(Msg, Node, ?pop_vsn, true, S); +inform_connection_loss(_Node, #state{}) -> + ok. + +%% +%% Volatile send (does not bring up connections and does not +%% preserve signal order) of Msg to global name server at Node... +%% +gns_volatile_send(Node, Msg) -> + To = {global_name_server, Node}, + case erlang:send(To, Msg, [nosuspend, noconnect]) of + ok -> + ok; + noconnect -> + ok; + nosuspend -> + _ = spawn(fun () -> + _ = erlang:send(To, Msg, [noconnect]) + end), + ok + end. + +%% +%% Volatile multicast of Msg to all global name servers on known nodes +%% (and pending known nodes if AlsoPend is true) using protocol version +%% MinVer or larger... +%% +gns_volatile_multicast(Msg, IgnoreNode, MinVer, + AlsoPend, #state{known = Known}) -> + maps:foreach(fun (Node, Ver) when is_atom(Node), + Node =/= IgnoreNode, + Ver >= MinVer -> + gns_volatile_send(Node, Msg); + ({pending, Node}, Ver) when AlsoPend == true, + Node =/= IgnoreNode, + Ver >= MinVer -> + gns_volatile_send(Node, Msg); + (_, _) -> + ok + end, Known). + +is_node_potentially_known(Node, #state{known = Known}) -> + maps:is_key(Node, Known) orelse maps:is_key({pending, Node}, Known). + +node_vsn(Node, #state{}) when node() == Node -> + ?vsn; +node_vsn(Node, #state{known = Known}) -> + case maps:find(Node, Known) of + {ok, Ver} -> + Ver; + error -> + case maps:find({pending, Node}, Known) of + {ok, Ver} -> + Ver; + error -> + 0 + end + end. + +node_list(NList) -> + lists:map(fun (N) when is_atom(N) -> + N; + ({N, _V}) when is_atom(N) -> + N + end, NList). + + +make_node_vsn_list(NList, #state{} = S) -> + lists:map(fun ({N, 0}) when is_atom(N) -> + {N, node_vsn(N, S)}; + (N) when is_atom(N) -> + {N, node_vsn(N, S)}; + ({N, V} = NV) when is_atom(N), + is_integer(V) -> + NV + end, NList). + +mk_known_list(Vsn, #state{known = Known}) when Vsn < ?pgpv_vsn -> + lists:foldl(fun ({N, _V}, Ns) when is_atom(N) -> + [N | Ns]; + (_, Ns) -> + Ns + end, + [], + maps:to_list(Known)); +mk_known_list(_Vsn, #state{known = Known}) -> + lists:foldl(fun ({N, _V} = NV, NVs) when is_atom(N) -> + [NV | NVs]; + (_, Ns) -> + Ns + end, + [], + maps:to_list(Known)). + +add_to_known(AddKnown, #state{known = Known} = S) -> + Fun = fun (N, Acc) when N == node() -> + Acc; + ({N, _V}, Acc) when N == node() -> + Acc; + (N, {A, K} = Acc) when is_atom(N) -> + case maps:is_key(N, K) of + true -> Acc; + false -> {[N|A], maps:put(N, 0, K)} + end; + ({N, V}, {A, K} = Acc) -> + case maps:find(N, K) of + error -> + {[N|A], maps:put(N, V, K)}; + {ok, NV} when NV >= 0 -> + Acc; + {ok, _UnknownVsn} -> + %% Update version, but don't count + %% it as an added node... + {A, maps:put(N, V, K)} + end + end, + + {Added, NewKnown} = lists:foldl(Fun, {[], Known}, AddKnown), + {Added, S#state{known = NewKnown}}. + +get_lost_connection_info(LcKey) -> + case ets:lookup(global_lost_connections, LcKey) of + [{LcKey, LcValue}] -> + LcValue; + _ -> + {undefined, undefined, undefined} + end. + +save_lost_connection_info(LcKey, XCre, OpId, undefined) -> + %% Make sure we clean up old unused information about + %% lost connections... + Tmr = erlang:start_timer(?lost_conn_info_cleanup_time, + self(), {lost_connection, LcKey}), + Value = {XCre, OpId, Tmr}, + _ = ets:insert(global_lost_connections, {LcKey, Value}), + ok; +save_lost_connection_info(LcKey, XCre, OpId, OldTmr) -> + %% Cancel lost connection info cleanup timer for info being replaced... + _ = erlang:cancel_timer(OldTmr, [{async, true}, {info, false}]), + save_lost_connection_info(LcKey, XCre, OpId, undefined). + +remove_lost_connection_info({timeout, Tmr, {lost_connection, LcKey}}) -> + case ets:lookup(global_lost_connections, LcKey) of + [{LcKey, {_, _, Tmr}}] -> + _ = ets:delete(global_lost_connections, LcKey), + ok; + _ -> + ok + end; +remove_lost_connection_info(_) -> + ok. + get_names() -> ets:select(global_names, ets:fun2ms(fun({Name, Pid, Method, _Ref}) -> |