summaryrefslogtreecommitdiff
path: root/lib/kernel/src/global.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/global.erl')
-rw-r--r--lib/kernel/src/global.erl534
1 files changed, 470 insertions, 64 deletions
diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl
index 491f60688a..764a0bfd0f 100644
--- a/lib/kernel/src/global.erl
+++ b/lib/kernel/src/global.erl
@@ -32,7 +32,7 @@
whereis_name/1, register_name/2,
register_name/3, register_name_external/2, register_name_external/3,
unregister_name_external/1,re_register_name/2, re_register_name/3,
- unregister_name/1, registered_names/0, send/2, node_disconnected/1,
+ unregister_name/1, registered_names/0, send/2,
set_lock/1, set_lock/2, set_lock/3,
del_lock/1, del_lock/2,
trans/2, trans/3, trans/4,
@@ -54,6 +54,10 @@
-define(N_CONNECT_RETRIES, global_connect_retries).
-define(DEFAULT_N_CONNECT_RETRIES, 0).
+%% Time that we keep information about multicasted lost_connection
+%% messages...
+-define(lost_conn_info_cleanup_time, 60*60*1000).
+
%%% In certain places in the server, calling io:format hangs everything,
%%% so we'd better use erlang:display/1.
%%% my_tracer is used in testsuites
@@ -70,6 +74,9 @@
-define(trace(_), ok).
-endif.
+-define(MAX_64BIT_SMALL_INT, ((1 bsl 59) - 1)).
+-define(MIN_64BIT_SMALL_INT, (-(1 bsl 59))).
+
%% These are the protocol versions:
%% Vsn 1 is the original protocol.
%% Vsn 2 is enhanced with code to take care of registration of names from
@@ -82,15 +89,26 @@
%% nodes in the own partition. (R11B-)
%% Vsn 6 does not send any message between locker processes on different
%% nodes, but uses the server as a proxy.
+%% Vsn 7 - propagate global versions between nodes, so we always know
+%% versions of known nodes
+%% - optional "prevent overlapping partitions" fix supported
%% Current version of global does not support vsn 4 or earlier.
--define(vsn, 6).
+-define(vsn, 7).
+
+%% Version when the "prevent overlapping partitions" fix was introduced.
+-define(pop_vsn, 7).
+%% Version when the "propagate global protocol versions" feature
+%% was introduced.
+-define(pgpv_vsn, 7).
%%-----------------------------------------------------------------
%% connect_all = boolean() - true if we are supposed to set up a
%% fully connected net
-%% known = [Node] - all nodes known to us
+%% known = #{} - Map of known nodes including protocol version
+%% as well as some other information. See state
+%% record declaration below for more info.
%% synced = [Node] - all nodes that have the same names as us
%% resolvers = [{Node, MyTag, Resolver}] -
%% the tag separating different synch sessions,
@@ -112,8 +130,21 @@
%% {sync_tag_his, Node} = The Node's tag, used at synchronization
%% {lock_id, Node} = The resource locking the partitions
%%-----------------------------------------------------------------
--record(state, {connect_all :: boolean(),
- known = [] :: [node()],
+-record(conf, {connect_all :: boolean(),
+ prevent_over_part :: boolean()
+ }).
+
+-record(state, {conf = #conf{} :: #conf{},
+ known = #{} :: #{
+ %% Known connected node with protocol
+ %% version as value
+ node() => non_neg_integer(),
+ %% Connecting node, not yet known, with
+ %% protocol version as value
+ {pending, node()} => non_neg_integer(),
+ %% Node currently being removed
+ {removing, node()} => yes
+ },
synced = [] :: [node()],
resolvers = [],
syncers = [] :: [pid()],
@@ -142,6 +173,12 @@
%%% Resources locked by Pid.
%%% ref() is the same ref() as in global_locks.
%%%
+%%% global_lost_connections (set):
+%%% {{NodeA, NodeB}, {ExtendedCreationA, OpIdA, Timer}
+%%% Information about lost connections (reported by NodeA) used by
+%%% the "prevent overlapping partitions" fix. The timer is used is
+%%% used to remove the entry when not needed anymore.
+%%%
%%% global_pid_names is a 'bag' for backward compatibility.
%%% (Before vsn 5 more than one name could be registered for a process.)
%%%
@@ -207,9 +244,6 @@ send(Name, Msg) ->
whereis_name(Name) ->
where(Name).
-node_disconnected(Node) ->
- global_name_server ! {nodedown, Node}.
-
%%-----------------------------------------------------------------
%% Method = function(Name, Pid1, Pid2) -> Pid | Pid2 | none
%% Method is called if a name conflict is detected when two nodes
@@ -452,6 +486,7 @@ init([]) ->
_ = ets:new(global_pid_names, [bag, named_table, protected]),
_ = ets:new(global_pid_ids, [bag, named_table, protected]),
+ _ = ets:new(global_lost_connections, [set, named_table, protected]),
%% This is for troubleshooting only.
DoTrace = os:getenv("GLOBAL_HIGH_LEVEL_TRACE") =:= "TRUE",
@@ -469,10 +504,28 @@ init([]) ->
_ ->
true
end,
+ POP = case application:get_env(kernel,
+ prevent_overlapping_partitions) of
+ {ok, Bool} when Bool == true; Bool == false ->
+ Bool;
+ {ok, Invalid} ->
+ error({invalid_parameter_value,
+ prevent_overlapping_partitions,
+ Invalid});
+ undefined ->
+ false
+ end,
S = #state{the_locker = start_the_locker(DoTrace),
trace = T0,
the_registrar = start_the_registrar(),
- connect_all = Ca},
+ conf = #conf{connect_all = Ca,
+ prevent_over_part = POP}},
+ _ = rand:seed(exsss,
+ (erlang:monotonic_time(nanosecond) rem 1000000000)
+ + (erlang:system_time(nanosecond) rem 1000000000)),
+ CreX = ((rand:uniform(?MAX_64BIT_SMALL_INT - ?MIN_64BIT_SMALL_INT)
+ - 1) band (bnot ((1 bsl 2) -1))),
+ put(creation_extension, CreX),
{ok, trace_message(S, {init, node()}, [])}.
%%-----------------------------------------------------------------
@@ -593,6 +646,51 @@ init([]) ->
%% sent by each node to all new nodes (Node becomes known to them)
%%-----------------------------------------------------------------
+%% ----------------------------------------------------------------
+%% Prevent Overlapping Partitions Algorithm
+%% ========================================
+%%
+%% 1. When a node lose connection to another node it sends a
+%% {lost_connection, LostConnNode, OtherNode} message to all
+%% other nodes that it knows of.
+%% 2. When a lost_connection message is received the receiver
+%% first checks if it has seen this message before. If so, it
+%% just ignores it. If it has not seen it before, it sends the
+%% message to all nodes it knows of. This in order to ensure
+%% that all connected nodes will receive this message. It then
+%% sends a {remove_connection, LostConnRecvNode} message (where
+%% LostConnRecvNode is its own node name) to OtherNode and
+%% clear all information about OtherNode so OtherNode wont be
+%% part of ReceiverNode's cluster anymore. When this information
+%% has been cleared, no lost_connection will be triggered when
+%% a nodedown message for the connection to OtherNode is
+%% received.
+%% 3. When a {remove_connection, LostConnRecvNode} message is
+%% received, the receiver node takes down the connection to
+%% LostConnRecvNode and clears its information about
+%% LostConnRecvNode so it is not part of its cluster anymore.
+%% Both nodes will receive a nodedown message due to the
+%% connection being closed, but none of them will send
+%% lost_connection messages since they have cleared information
+%% about the other node.
+%%
+%% This will take down more connections than the minimum amount
+%% of connections to remove in order to form fully connected
+%% partitions. For example, if the user takes down a connection
+%% between two nodes, the rest of the nodes will disconnect from
+%% both of these nodes instead of just one. This is due to:
+%% * We do not want to partition a remaining network when a node
+%% has halted. When you receive a nodedown and/or lost_connection
+%% messages you don't know if the corresponding node has halted
+%% or if there are network issues.
+%% * We need to decide which connection to take down as soon as
+%% we receive a lost_connection message in order to prevent
+%% inconsistencies entering global's state.
+%% * All nodes need to make the same choices independent of
+%% each other.
+%%
+%% ----------------------------------------------------------------
+
-spec handle_call(term(), {pid(), term()}, state()) ->
{'noreply', state()} |
{'reply', term(), state()} |
@@ -625,7 +723,7 @@ handle_call({del_lock, Lock}, {Pid, _Tag}, S0) ->
{reply, true, S};
handle_call(get_known, _From, S) ->
- {reply, S#state.known, S};
+ {reply, mk_known_list(0, S), S};
handle_call(get_synced, _From, S) ->
{reply, S#state.synced, S};
@@ -676,27 +774,28 @@ handle_call(Request, From, S) ->
-spec handle_cast(term(), state()) -> {'noreply', state()}.
-handle_cast({init_connect, Vsn, Node, InitMsg}, S) ->
+handle_cast({init_connect, Vsn, Node, InitMsg}, S0) ->
%% Sent from global_name_server at Node.
?trace({'####', init_connect, {vsn, Vsn}, {node,Node},{initmsg,InitMsg}}),
- case Vsn of
- %% It is always the responsibility of newer versions to understand
- %% older versions of the protocol.
- {HisVsn, HisTag} when HisVsn > ?vsn ->
- init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
- {HisVsn, HisTag} ->
- init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S);
- %% To be future compatible
- Tuple when is_tuple(Tuple) ->
- List = tuple_to_list(Tuple),
- [_HisVsn, HisTag | _] = List,
- %% use own version handling if his is newer.
- init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
- _ ->
- Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n",
- [Vsn, Node]),
- error_logger:info_report(lists:flatten(Txt))
- end,
+ S = case Vsn of
+ %% It is always the responsibility of newer versions to understand
+ %% older versions of the protocol.
+ {HisVsn, HisTag} when HisVsn > ?vsn ->
+ init_connect(?vsn, Node, InitMsg, HisTag, HisVsn, S0);
+ {HisVsn, HisTag} ->
+ init_connect(HisVsn, Node, InitMsg, HisTag, HisVsn, S0);
+ %% To be future compatible
+ Tuple when is_tuple(Tuple) ->
+ List = tuple_to_list(Tuple),
+ [HisVsn, HisTag | _] = List,
+ %% use own version handling if his is newer.
+ init_connect(?vsn, Node, InitMsg, HisTag, HisVsn, S0);
+ _ ->
+ Txt = io_lib:format("Illegal global protocol version ~p Node: ~p\n",
+ [Vsn, Node]),
+ error_logger:info_report(lists:flatten(Txt)),
+ S0
+ end,
{noreply, S};
%%=======================================================================
@@ -745,10 +844,10 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
S = trace_message(S0, {exit_resolver, Node}, [MyTag]),
case get({sync_tag_my, Node}) of
MyTag ->
- Known = S#state.known,
+ Known = mk_known_list(node_vsn(Node, S), S),
gen_server:cast({global_name_server, Node},
{resolved, node(), Resolved, Known,
- Known,get_names_ext(),get({sync_tag_his,Node})}),
+ unused,get_names_ext(),get({sync_tag_his,Node})}),
case get({save_ops, Node}) of
{resolved, HisKnown, Names_ext, HisResolved} ->
put({save_ops, Node}, Ops),
@@ -768,7 +867,7 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
%%
%% Here the name clashes are resolved.
%%========================================================================
-handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2,
+handle_cast({resolved, Node, HisResolved, HisKnown, _Unused,
Names_ext, MyTag}, S) ->
%% Sent from global_name_server at Node.
?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}),
@@ -805,7 +904,7 @@ handle_cast({new_nodes, Node, Ops, Names_ext, Nodes, ExtraInfo}, S) ->
%%
%% We are in sync with this node (from the other node's known world).
%%========================================================================
-handle_cast({in_sync, Node, _IsKnown}, S) ->
+handle_cast({in_sync, Node, _IsKnown}, #state{known = Known} = S) ->
%% Sent from global_name_server at Node (in the other partition).
?trace({'####', in_sync, {Node, _IsKnown}}),
lists:foreach(fun(Pid) -> Pid ! {synced, [Node]} end, S#state.syncers),
@@ -815,7 +914,8 @@ handle_cast({in_sync, Node, _IsKnown}, S) ->
true -> Synced;
false -> [Node | Synced]
end,
- {noreply, NewS#state{synced = NSynced}};
+ {noreply, NewS#state{known = maps:remove({pending, Node}, Known),
+ synced = NSynced}};
%% Called when Pid on other node crashed
handle_cast({async_del_name, _Name, _Pid}, S) ->
@@ -868,13 +968,20 @@ handle_info({nodedown, Node}, S) when Node =:= S#state.node_name ->
handle_info({nodedown, Node}, S0) ->
?trace({'####', nodedown, {node,Node}}),
S1 = trace_message(S0, {nodedown, Node}, []),
- S = handle_nodedown(Node, S1),
+ S = handle_nodedown(Node, S1, disconnected),
{noreply, S};
handle_info({extra_nodedown, Node}, S0) ->
?trace({'####', extra_nodedown, {node,Node}}),
S1 = trace_message(S0, {extra_nodedown, Node}, []),
- S = handle_nodedown(Node, S1),
+ S = handle_nodedown(Node, S1, disconnected),
+ {noreply, S};
+
+handle_info({ignore_node, Node}, S0) ->
+ %% global_group wants us to ignore this node...
+ ?trace({'####', ignore_node, {node,Node}}),
+ S1 = trace_message(S0, {ignore_node, Node}, []),
+ S = handle_nodedown(Node, S1, ignore_node),
{noreply, S};
handle_info({nodeup, Node}, S) when Node =:= node() ->
@@ -883,11 +990,12 @@ handle_info({nodeup, Node}, S) when Node =:= node() ->
%% references to old node name ('nonode@nohost') to Node.
{noreply, change_our_node_name(Node, S)};
-handle_info({nodeup, _Node}, S) when not S#state.connect_all ->
+handle_info({nodeup, _Node},
+ #state{conf = #conf{connect_all = false}} = S) ->
{noreply, S};
-handle_info({nodeup, Node}, S0) when S0#state.connect_all ->
- IsKnown = lists:member(Node, S0#state.known) or
+handle_info({nodeup, Node}, S0) ->
+ IsKnown = maps:is_key(Node, S0#state.known) or
%% This one is only for double nodeups (shouldn't occur!)
lists:keymember(Node, 1, S0#state.resolvers),
?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}),
@@ -928,6 +1036,110 @@ handle_info({whereis, Name, From}, S) ->
do_whereis(Name, From),
{noreply, S};
+handle_info({lost_connection, NodeA, XCreationA, OpIdA, NodeB} = Msg,
+ #state{conf = #conf{connect_all = true,
+ prevent_over_part = true}} = S0) ->
+ %% Message introduced in protocol version ?pop_vsn
+ %%
+ %% NodeA reports that it lost connection to NodeB. If we got a
+ %% connection to NodeB, we need to disconnect it in order to
+ %% prevent overlapping partitions...
+ LcKey = {NodeA, NodeB},
+ S1 = case get_lost_connection_info(LcKey) of
+ {XCreationA, OpId, _Tmr} when OpIdA =< OpId ->
+ %% We have already handled this lost connection
+ %% message...
+ S0;
+ {_, _, Tmr} ->
+ %% Inform all other nodes we know of about this as well. This
+ %% in order to prevent that some nodes in this cluster wont
+ %% get the original message from NodeA. This ensures that all
+ %% nodes will know of this connection loss, even if some nodes
+ %% lose connection to NodeA while NodeA is multicasting that
+ %% it lost the connection to NodeB.
+ gns_volatile_multicast(Msg, NodeA, ?pop_vsn, true, S0),
+
+ %% Save info about this lost_connection message...
+ save_lost_connection_info(LcKey, XCreationA, OpIdA, Tmr),
+
+ RmNode = case node() == NodeB of
+ false ->
+ NodeB;
+ true ->
+ %% This toghether with NodeA being known by
+ %% us probably is unusal, but can happen
+ %% since lost_connection messages are
+ %% reapeted by receiving nodes. All other
+ %% nodes will remove us, so there is no
+ %% need to make them remove NodeA as well.
+ %% We therefore request removal from NodeA
+ %% and wait for the nodedown which
+ %% eventually will come since NodeA reported
+ %% this...
+ NodeA
+ end,
+
+ case is_node_potentially_known(RmNode, S0) of
+ false ->
+ S0;
+ true ->
+ case node_vsn(RmNode, S0) of
+ Vsn when Vsn < ?pop_vsn ->
+ erlang:disconnect_node(RmNode),
+ error_logger:warning_msg(
+ "'global' at node ~p disconnected old "
+ "node ~p in order to prevent overlapping "
+ "partitions",
+ [node(), RmNode]),
+ ok;
+ _Vsn ->
+ gns_volatile_send(RmNode,
+ {remove_connection, node()}),
+ error_logger:warning_msg(
+ "'global' at node ~p requested disconnect "
+ "from node ~p in order to prevent "
+ "overlapping partitions",
+ [node(), RmNode]),
+ ok
+ end,
+ handle_nodedown(RmNode, S0, remove_connection)
+ end
+ end,
+
+ {noreply, S1};
+
+handle_info({lost_connection, _NodeA, _XCreationA, _OpIdA, _NodeB}, S) ->
+ %% Message introduced in protocol version ?pop_vsn
+ {noreply, S};
+
+handle_info({timeout, _, _} = TmoMsg, S) ->
+ %% Instance specific message
+ remove_lost_connection_info(TmoMsg),
+ {noreply, S};
+
+handle_info({remove_connection, Node}, S0) ->
+ %% Message introduced in protocol version ?pop_vsn
+ S2 = case is_node_potentially_known(Node, S0) of
+ false ->
+ S0;
+ true ->
+ erlang:disconnect_node(Node),
+ S1 = handle_nodedown(Node, S0, remove_connection),
+ error_logger:warning_msg(
+ "'global' at node ~p disconnected node ~p in order to "
+ "prevent overlapping partitions", [node(), Node]),
+ S1
+ end,
+ {noreply, S2};
+
+handle_info({prepare_shutdown, From, Ref}, S0) ->
+ %% Prevent lost_connection messages being sent due to
+ %% connections being taken down during the shutdown...
+ S1 = S0#state{conf = #conf{connect_all = false,
+ prevent_over_part = false}},
+ From ! {Ref, ok},
+ {noreply, S1};
+
handle_info(known, S) ->
io:format(">>>> ~p\n",[S#state.known]),
{noreply, S};
@@ -1067,7 +1279,8 @@ check_replies([], _Id, _Replies) ->
%% Another node wants to synchronize its registered names with us.
%% Both nodes must have a lock before they are allowed to continue.
%%========================================================================
-init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) ->
+init_connect(Vsn, Node, InitMsg, HisTag, HisVsn,
+ #state{resolvers = Resolvers, known = Known} = S) ->
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
put({prot_vsn, Node}, Vsn),
@@ -1083,7 +1296,9 @@ init_connect(Vsn, Node, InitMsg, HisTag, Resolvers, S) ->
false ->
?trace({init_connect,{pre_connect,Node},{histag,HisTag}}),
put({pre_connect, Node}, {Vsn, InitMsg, HisTag})
- end.
+ end,
+ S#state{known = maps:put({pending, Node}, HisVsn, Known)}.
+
%%========================================================================
%% In the simple case, we'll get lock_is_set before we get exchange,
@@ -1129,8 +1344,9 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
%% Known may have shrunk since the lock was taken (due to nodedowns).
Known = S0#state.known,
Synced = S0#state.synced,
- NewNodes = [Node | HisKnown],
- sync_others(HisKnown),
+ NewNodes = make_node_vsn_list([Node | HisKnown], S0),
+ HisKnownNodes = node_list(HisKnown),
+ sync_others(HisKnownNodes),
ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}],
S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0),
%% I am synced with Node, but not with HisKnown yet
@@ -1138,47 +1354,57 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
S3 = lists:foldl(fun(Node1, S1) ->
F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end,
cancel_resolved_locker(Node1, F)
- end, S, HisKnown),
+ end, S, HisKnownNodes),
%% The locker that took the lock is asked to send
%% the {new_nodes, ...} message. This ensures that
%% {del_lock, ...} is received after {new_nodes, ...}
- %% (except when abcast spawns process(es)...).
+ NewNodesMsg = {new_nodes, node(), Ops, Names_ext, NewNodes, ExtraInfo},
NewNodesF = fun() ->
- gen_server:abcast(Known, global_name_server,
- {new_nodes, node(), Ops, Names_ext,
- NewNodes, ExtraInfo})
+ lists:foreach(
+ fun (N) when is_atom(N) ->
+ case maps:get(N, Known) of
+ V when V >= ?pgpv_vsn ->
+ gen_server:cast({global_name_server, N},
+ NewNodesMsg);
+ _OldV ->
+ gen_server:cast({global_name_server, N},
+ {new_nodes, node(), Ops,
+ Names_ext, node_list(NewNodes),
+ ExtraInfo})
+ end;
+ (_) ->
+ ok
+ end,
+ maps:keys(Known))
end,
F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end,
S4 = cancel_resolved_locker(Node, F),
%% See (*) below... we're node b in that description
- AddedNodes = (NewNodes -- Known),
- NewKnown = Known ++ AddedNodes,
- S4#state.the_locker ! {add_to_known, AddedNodes},
- NewS = trace_message(S4, {added, AddedNodes},
- [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
- NewS#state{known = NewKnown, synced = [Node | Synced]}.
+ {AddedNodes, S5} = add_to_known(NewNodes, S4),
+ S5#state.the_locker ! {add_to_known, AddedNodes},
+ S6 = trace_message(S5, {added, AddedNodes},
+ [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
+ S6#state{synced = [Node | Synced]}.
cancel_resolved_locker(Node, CancelFun) ->
Tag = get({sync_tag_my, Node}),
?trace({calling_cancel_locker,Tag,get()}),
S = CancelFun(Tag),
reset_node_state(Node),
- S.
+ S#state{known = maps:remove({pending, Node}, S#state.known)}.
new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) ->
- Known = S0#state.known,
%% (*) This one requires some thought...
%% We're node a, other nodes b and c:
%% The problem is that {in_sync, a} may arrive before {resolved, [a]} to
%% b from c, leading to b sending {new_nodes, [a]} to us (node a).
%% Therefore, we make sure we never get duplicates in Known.
- AddedNodes = lists:delete(node(), Nodes -- Known),
+ {AddedNodes, S1} = add_to_known(Nodes, S0),
sync_others(AddedNodes),
- S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0),
+ S2 = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S1),
?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}),
- S#state.the_locker ! {add_to_known, AddedNodes},
- S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]),
- S1#state{known = Known ++ AddedNodes}.
+ S2#state.the_locker ! {add_to_known, AddedNodes},
+ trace_message(S2, {added, AddedNodes}, [{ops,Ops}]).
do_whereis(Name, From) ->
case is_global_lock_set() of
@@ -2070,15 +2296,195 @@ pid_locks(Ref) ->
ref_is_locking(Ref, PidRefs) ->
lists:keyfind(Ref, 2, PidRefs) =/= false.
-handle_nodedown(Node, S) ->
+handle_nodedown(Node, #state{synced = Syncs,
+ known = Known0} = S, What) ->
%% DOWN signals from monitors have removed locks and registered names.
- #state{known = Known, synced = Syncs} = S,
NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
NewS#state.the_locker ! {remove_from_known, Node},
reset_node_state(Node),
- NewS#state{known = lists:delete(Node, Known),
+ Known1 = case What of
+ remove_connection ->
+ maps:put({removing, Node}, yes, Known0);
+ ignore_node ->
+ maps:remove({removing, Node}, Known0);
+ disconnected ->
+ case maps:get({removing, Node}, Known0, no) of
+ yes ->
+ maps:remove({removing, Node}, Known0);
+ no ->
+ inform_connection_loss(Node, S),
+ Known0
+ end
+ end,
+ NewS#state{known = maps:remove(Node,
+ maps:remove({pending, Node}, Known1)),
synced = lists:delete(Node, Syncs)}.
+inform_connection_loss(Node,
+ #state{conf = #conf{connect_all = true,
+ prevent_over_part = true}} = S) ->
+ Msg = {lost_connection,
+ node(),
+ (get(creation_extension)
+ + erlang:system_info(creation)
+ - ?MIN_64BIT_SMALL_INT),
+ erlang:unique_integer([monotonic]),
+ Node},
+ gns_volatile_multicast(Msg, Node, ?pop_vsn, true, S);
+inform_connection_loss(_Node, #state{}) ->
+ ok.
+
+%%
+%% Volatile send (does not bring up connections and does not
+%% preserve signal order) of Msg to global name server at Node...
+%%
+gns_volatile_send(Node, Msg) ->
+ To = {global_name_server, Node},
+ case erlang:send(To, Msg, [nosuspend, noconnect]) of
+ ok ->
+ ok;
+ noconnect ->
+ ok;
+ nosuspend ->
+ _ = spawn(fun () ->
+ _ = erlang:send(To, Msg, [noconnect])
+ end),
+ ok
+ end.
+
+%%
+%% Volatile multicast of Msg to all global name servers on known nodes
+%% (and pending known nodes if AlsoPend is true) using protocol version
+%% MinVer or larger...
+%%
+gns_volatile_multicast(Msg, IgnoreNode, MinVer,
+ AlsoPend, #state{known = Known}) ->
+ Send = fun (Key, Node) ->
+ case maps:get(Key, Known) of
+ Ver when Ver < MinVer -> ok;
+ _Ver -> gns_volatile_send(Node, Msg)
+ end
+ end,
+ lists:foreach(fun (Node) when is_atom(Node), Node =/= IgnoreNode ->
+ Send(Node, Node);
+ ({pending, Node} = Key) when AlsoPend == true,
+ Node =/= IgnoreNode ->
+ Send(Key, Node);
+ (_) ->
+ ok
+ end, maps:keys(Known)).
+
+is_node_potentially_known(Node, #state{known = Known}) ->
+ maps:is_key(Node, Known) orelse maps:is_key({pending, Node}, Known).
+
+node_vsn(Node, #state{}) when node() == Node ->
+ ?vsn;
+node_vsn(Node, #state{known = Known}) ->
+ case maps:find(Node, Known) of
+ {ok, Ver} ->
+ Ver;
+ error ->
+ case maps:find({pending, Node}, Known) of
+ {ok, Ver} ->
+ Ver;
+ error ->
+ 0
+ end
+ end.
+
+node_list(NList) ->
+ lists:map(fun (N) when is_atom(N) ->
+ N;
+ ({N, _V}) when is_atom(N) ->
+ N
+ end, NList).
+
+
+make_node_vsn_list(NList, #state{} = S) ->
+ lists:map(fun ({N, 0}) when is_atom(N) ->
+ {N, node_vsn(N, S)};
+ (N) when is_atom(N) ->
+ {N, node_vsn(N, S)};
+ ({N, V} = NV) when is_atom(N),
+ is_integer(V) ->
+ NV
+ end, NList).
+
+mk_known_list(Vsn, #state{known = Known}) when Vsn < ?pgpv_vsn ->
+ lists:foldl(fun ({N, _V}, Ns) when is_atom(N) ->
+ [N | Ns];
+ (_, Ns) ->
+ Ns
+ end,
+ [],
+ maps:to_list(Known));
+mk_known_list(_Vsn, #state{known = Known}) ->
+ lists:foldl(fun ({N, _V} = NV, NVs) when is_atom(N) ->
+ [NV | NVs];
+ (_, Ns) ->
+ Ns
+ end,
+ [],
+ maps:to_list(Known)).
+
+add_to_known(AddKnown, #state{known = Known} = S) ->
+ Fun = fun (N, Acc) when N == node() ->
+ Acc;
+ ({N, _V}, Acc) when N == node() ->
+ Acc;
+ (N, {A, K} = Acc) when is_atom(N) ->
+ case maps:is_key(N, K) of
+ true -> Acc;
+ false -> {[N|A], maps:put(N, 0, K)}
+ end;
+ ({N, V}, {A, K} = Acc) ->
+ case maps:find(N, K) of
+ error ->
+ {[N|A], maps:put(N, V, K)};
+ {ok, NV} when NV >= 0 ->
+ Acc;
+ {ok, _UnknownVsn} ->
+ %% Update version, but don't count
+ %% it as an added node...
+ {A, maps:put(N, V, K)}
+ end
+ end,
+
+ {Added, NewKnown} = lists:foldl(Fun, {[], Known}, AddKnown),
+ {Added, S#state{known = NewKnown}}.
+
+get_lost_connection_info(LcKey) ->
+ case ets:lookup(global_lost_connections, LcKey) of
+ [{LcKey, LcValue}] ->
+ LcValue;
+ _ ->
+ {undefined, undefined, undefined}
+ end.
+
+save_lost_connection_info(LcKey, XCre, OpId, undefined) ->
+ %% Make sure we clean up old unused information about
+ %% lost connections...
+ Tmr = erlang:start_timer(?lost_conn_info_cleanup_time,
+ self(), {lost_connection, LcKey}),
+ Value = {XCre, OpId, Tmr},
+ _ = ets:insert(global_lost_connections, {LcKey, Value}),
+ ok;
+save_lost_connection_info(LcKey, XCre, OpId, OldTmr) ->
+ %% Cancel lost connection info cleanup timer for info being replaced...
+ _ = erlang:cancel_timer(OldTmr, [{async, true}, {info, false}]),
+ save_lost_connection_info(LcKey, XCre, OpId, undefined).
+
+remove_lost_connection_info({timeout, Tmr, {lost_connection, LcKey}}) ->
+ case ets:lookup(global_lost_connections, LcKey) of
+ [{LcKey, {_, _, Tmr}}] ->
+ _ = ets:delete(global_lost_connections, LcKey),
+ ok;
+ _ ->
+ ok
+ end;
+remove_lost_connection_info(_) ->
+ ok.
+
get_names() ->
ets:select(global_names,
ets:fun2ms(fun({Name, Pid, Method, _Ref}) ->