summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2022-01-07 22:23:49 +0100
committerRickard Green <rickard@erlang.org>2022-01-07 22:36:19 +0100
commit7fb87d74c865651282c28dc8dd2490649826a821 (patch)
tree21a77df6fe2f37727175192f8ab2cbcfa5b9f0bf
parentf2bd4a073961b4788a3b0a4e265b840a20f453eb (diff)
downloaderlang-7fb87d74c865651282c28dc8dd2490649826a821.tar.gz
global: Propagate and save version between all nodes
-rw-r--r--lib/kernel/src/global.erl134
1 files changed, 105 insertions, 29 deletions
diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl
index 491f60688a..26ddf21dcb 100644
--- a/lib/kernel/src/global.erl
+++ b/lib/kernel/src/global.erl
@@ -82,10 +82,12 @@
%% nodes in the own partition. (R11B-)
%% Vsn 6 does not send any message between locker processes on different
%% nodes, but uses the server as a proxy.
+%% Vsn 7 - propagate global versions between nodes, so we always know
+%% versions of known nodes
%% Current version of global does not support vsn 4 or earlier.
--define(vsn, 6).
+-define(vsn, 7).
%%-----------------------------------------------------------------
%% connect_all = boolean() - true if we are supposed to set up a
@@ -113,7 +115,7 @@
%% {lock_id, Node} = The resource locking the partitions
%%-----------------------------------------------------------------
-record(state, {connect_all :: boolean(),
- known = [] :: [node()],
+ known = #{},
synced = [] :: [node()],
resolvers = [],
syncers = [] :: [pid()],
@@ -625,7 +627,7 @@ handle_call({del_lock, Lock}, {Pid, _Tag}, S0) ->
{reply, true, S};
handle_call(get_known, _From, S) ->
- {reply, S#state.known, S};
+ {reply, mk_known_list(-1, S), S};
handle_call(get_synced, _From, S) ->
{reply, S#state.synced, S};
@@ -683,13 +685,16 @@ handle_cast({init_connect, Vsn, Node, InitMsg}, S) ->
%% It is always the responsibility of newer versions to understand
%% older versions of the protocol.
{HisVsn, HisTag} when HisVsn > ?vsn ->
+ put({pending_known, Node}, HisVsn),
init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
{HisVsn, HisTag} ->
+ put({pending_known, Node}, HisVsn),
init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S);
%% To be future compatible
Tuple when is_tuple(Tuple) ->
List = tuple_to_list(Tuple),
- [_HisVsn, HisTag | _] = List,
+ [HisVsn, HisTag | _] = List,
+ put({pending_known, Node}, HisVsn),
%% use own version handling if his is newer.
init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S);
_ ->
@@ -745,10 +750,10 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
S = trace_message(S0, {exit_resolver, Node}, [MyTag]),
case get({sync_tag_my, Node}) of
MyTag ->
- Known = S#state.known,
+ Known = mk_known_list(node_vsn(Node, S), S),
gen_server:cast({global_name_server, Node},
{resolved, node(), Resolved, Known,
- Known,get_names_ext(),get({sync_tag_his,Node})}),
+ unused,get_names_ext(),get({sync_tag_his,Node})}),
case get({save_ops, Node}) of
{resolved, HisKnown, Names_ext, HisResolved} ->
put({save_ops, Node}, Ops),
@@ -768,7 +773,7 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) ->
%%
%% Here the name clashes are resolved.
%%========================================================================
-handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2,
+handle_cast({resolved, Node, HisResolved, HisKnown, _Unused,
Names_ext, MyTag}, S) ->
%% Sent from global_name_server at Node.
?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}),
@@ -887,7 +892,7 @@ handle_info({nodeup, _Node}, S) when not S#state.connect_all ->
{noreply, S};
handle_info({nodeup, Node}, S0) when S0#state.connect_all ->
- IsKnown = lists:member(Node, S0#state.known) or
+ IsKnown = maps:is_key(Node, S0#state.known) or
%% This one is only for double nodeups (shouldn't occur!)
lists:keymember(Node, 1, S0#state.resolvers),
?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}),
@@ -1129,8 +1134,9 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
%% Known may have shrunk since the lock was taken (due to nodedowns).
Known = S0#state.known,
Synced = S0#state.synced,
- NewNodes = [Node | HisKnown],
- sync_others(HisKnown),
+ NewNodes = make_node_vsn_list([Node | HisKnown], S0),
+ HisKnownNodes = node_list(HisKnown),
+ sync_others(HisKnownNodes),
ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}],
S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0),
%% I am synced with Node, but not with HisKnown yet
@@ -1138,25 +1144,35 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) ->
S3 = lists:foldl(fun(Node1, S1) ->
F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end,
cancel_resolved_locker(Node1, F)
- end, S, HisKnown),
+ end, S, HisKnownNodes),
%% The locker that took the lock is asked to send
%% the {new_nodes, ...} message. This ensures that
%% {del_lock, ...} is received after {new_nodes, ...}
- %% (except when abcast spawns process(es)...).
+ NewNodesMsg = {new_nodes, node(), Ops, Names_ext, NewNodes, ExtraInfo},
NewNodesF = fun() ->
- gen_server:abcast(Known, global_name_server,
- {new_nodes, node(), Ops, Names_ext,
- NewNodes, ExtraInfo})
+ lists:foreach(
+ fun (N) ->
+ case maps:get(N, Known) of
+ V when V >= 7 ->
+ gen_server:cast({global_name_server, N},
+ NewNodesMsg);
+ _OldV ->
+ gen_server:cast({global_name_server, N},
+ {new_nodes, node(), Ops,
+ Names_ext, node_list(NewNodes),
+ ExtraInfo})
+ end
+ end,
+ maps:keys(Known))
end,
F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end,
S4 = cancel_resolved_locker(Node, F),
%% See (*) below... we're node b in that description
- AddedNodes = (NewNodes -- Known),
- NewKnown = Known ++ AddedNodes,
- S4#state.the_locker ! {add_to_known, AddedNodes},
- NewS = trace_message(S4, {added, AddedNodes},
- [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
- NewS#state{known = NewKnown, synced = [Node | Synced]}.
+ {AddedNodes, S5} = add_to_known(NewNodes, S4),
+ S5#state.the_locker ! {add_to_known, AddedNodes},
+ S6 = trace_message(S5, {added, AddedNodes},
+ [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]),
+ S6#state{synced = [Node | Synced]}.
cancel_resolved_locker(Node, CancelFun) ->
Tag = get({sync_tag_my, Node}),
@@ -1166,19 +1182,17 @@ cancel_resolved_locker(Node, CancelFun) ->
S.
new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) ->
- Known = S0#state.known,
%% (*) This one requires some thought...
%% We're node a, other nodes b and c:
%% The problem is that {in_sync, a} may arrive before {resolved, [a]} to
%% b from c, leading to b sending {new_nodes, [a]} to us (node a).
%% Therefore, we make sure we never get duplicates in Known.
- AddedNodes = lists:delete(node(), Nodes -- Known),
+ {AddedNodes, S1} = add_to_known(Nodes, S0),
sync_others(AddedNodes),
- S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0),
+ S2 = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S1),
?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}),
- S#state.the_locker ! {add_to_known, AddedNodes},
- S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]),
- S1#state{known = Known ++ AddedNodes}.
+ S2#state.the_locker ! {add_to_known, AddedNodes},
+ trace_message(S2, {added, AddedNodes}, [{ops,Ops}]).
do_whereis(Name, From) ->
case is_global_lock_set() of
@@ -1935,6 +1949,7 @@ cancel_locker(Node, S, Tag, ToBeRunOnLockerF) ->
reset_node_state(Node) ->
?trace({{node,Node}, reset_node_state, get()}),
+ erase({pending_known, Node}),
erase({wait_lock, Node}),
erase({save_ops, Node}),
erase({pre_connect, Node}),
@@ -2072,13 +2087,74 @@ ref_is_locking(Ref, PidRefs) ->
handle_nodedown(Node, S) ->
%% DOWN signals from monitors have removed locks and registered names.
- #state{known = Known, synced = Syncs} = S,
+ #state{synced = Syncs} = S,
NewS = cancel_locker(Node, S, get({sync_tag_my, Node})),
NewS#state.the_locker ! {remove_from_known, Node},
reset_node_state(Node),
- NewS#state{known = lists:delete(Node, Known),
+ NewS#state{known = maps:remove(Node, S#state.known),
synced = lists:delete(Node, Syncs)}.
+node_vsn(Node, #state{known = Known}) ->
+ case maps:find(Node, Known) of
+ {ok, Ver} ->
+ Ver;
+ error ->
+ case get({pending_known, Node}) of
+ undefined -> -1;
+ Ver when is_integer(Ver) -> Ver
+ end
+ end.
+
+node_list(NList) ->
+ lists:map(fun (N) when is_atom(N) ->
+ N;
+ ({N, _V}) when is_atom(N) ->
+ N
+ end, NList).
+
+
+make_node_vsn_list(NList, #state{} = S) ->
+ lists:map(fun ({N, -1}) when is_atom(N) ->
+ {N, node_vsn(N, S)};
+ (N) when is_atom(N) ->
+ {N, node_vsn(N, S)};
+ ({N, V} = NV) when is_atom(N),
+ is_integer(V) ->
+ NV
+ end, NList).
+
+
+mk_known_list(Vsn, #state{known = Known}) when Vsn < 7 ->
+ lists:map(fun ({N, _V}) -> N end, maps:to_list(Known));
+mk_known_list(_Vsn, #state{known = Known}) ->
+ maps:to_list(Known).
+
+add_to_known(AddKnown, #state{known = Known} = S) ->
+ Fun = fun (N, Acc) when N == node() ->
+ Acc;
+ ({N, _V}, Acc) when N == node() ->
+ Acc;
+ (N, {A, K} = Acc) when is_atom(N) ->
+ case maps:is_key(N, K) of
+ true -> Acc;
+ false -> {[N|A], maps:put(N, -1, K)}
+ end;
+ ({N, V}, {A, K} = Acc) ->
+ case maps:find(N, K) of
+ error ->
+ {[N|A], maps:put(N, V, K)};
+ {ok, NV} when NV >= 0 ->
+ Acc;
+ {ok, _UnknownVsn} ->
+ %% Update version, but don't count
+ %% it as an added node...
+ {A, maps:put(N, V, K)}
+ end
+ end,
+
+ {Added, NewKnown} = lists:foldl(Fun, {[], Known}, AddKnown),
+ {Added, S#state{known = NewKnown}}.
+
get_names() ->
ets:select(global_names,
ets:fun2ms(fun({Name, Pid, Method, _Ref}) ->