From 7fb87d74c865651282c28dc8dd2490649826a821 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 7 Jan 2022 22:23:49 +0100 Subject: global: Propagate and save version between all nodes --- lib/kernel/src/global.erl | 134 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 105 insertions(+), 29 deletions(-) diff --git a/lib/kernel/src/global.erl b/lib/kernel/src/global.erl index 491f60688a..26ddf21dcb 100644 --- a/lib/kernel/src/global.erl +++ b/lib/kernel/src/global.erl @@ -82,10 +82,12 @@ %% nodes in the own partition. (R11B-) %% Vsn 6 does not send any message between locker processes on different %% nodes, but uses the server as a proxy. +%% Vsn 7 - propagate global versions between nodes, so we always know +%% versions of known nodes %% Current version of global does not support vsn 4 or earlier. --define(vsn, 6). +-define(vsn, 7). %%----------------------------------------------------------------- %% connect_all = boolean() - true if we are supposed to set up a @@ -113,7 +115,7 @@ %% {lock_id, Node} = The resource locking the partitions %%----------------------------------------------------------------- -record(state, {connect_all :: boolean(), - known = [] :: [node()], + known = #{}, synced = [] :: [node()], resolvers = [], syncers = [] :: [pid()], @@ -625,7 +627,7 @@ handle_call({del_lock, Lock}, {Pid, _Tag}, S0) -> {reply, true, S}; handle_call(get_known, _From, S) -> - {reply, S#state.known, S}; + {reply, mk_known_list(-1, S), S}; handle_call(get_synced, _From, S) -> {reply, S#state.synced, S}; @@ -683,13 +685,16 @@ handle_cast({init_connect, Vsn, Node, InitMsg}, S) -> %% It is always the responsibility of newer versions to understand %% older versions of the protocol. {HisVsn, HisTag} when HisVsn > ?vsn -> + put({pending_known, Node}, HisVsn), init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S); {HisVsn, HisTag} -> + put({pending_known, Node}, HisVsn), init_connect(HisVsn, Node, InitMsg, HisTag, S#state.resolvers, S); %% To be future compatible Tuple when is_tuple(Tuple) -> List = tuple_to_list(Tuple), - [_HisVsn, HisTag | _] = List, + [HisVsn, HisTag | _] = List, + put({pending_known, Node}, HisVsn), %% use own version handling if his is newer. init_connect(?vsn, Node, InitMsg, HisTag, S#state.resolvers, S); _ -> @@ -745,10 +750,10 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) -> S = trace_message(S0, {exit_resolver, Node}, [MyTag]), case get({sync_tag_my, Node}) of MyTag -> - Known = S#state.known, + Known = mk_known_list(node_vsn(Node, S), S), gen_server:cast({global_name_server, Node}, {resolved, node(), Resolved, Known, - Known,get_names_ext(),get({sync_tag_his,Node})}), + unused,get_names_ext(),get({sync_tag_his,Node})}), case get({save_ops, Node}) of {resolved, HisKnown, Names_ext, HisResolved} -> put({save_ops, Node}, Ops), @@ -768,7 +773,7 @@ handle_cast({exchange_ops, Node, MyTag, Ops, Resolved}, S0) -> %% %% Here the name clashes are resolved. %%======================================================================== -handle_cast({resolved, Node, HisResolved, HisKnown, _HisKnown_v2, +handle_cast({resolved, Node, HisResolved, HisKnown, _Unused, Names_ext, MyTag}, S) -> %% Sent from global_name_server at Node. ?trace({'####', resolved, {his_resolved,HisResolved}, {node,Node}}), @@ -887,7 +892,7 @@ handle_info({nodeup, _Node}, S) when not S#state.connect_all -> {noreply, S}; handle_info({nodeup, Node}, S0) when S0#state.connect_all -> - IsKnown = lists:member(Node, S0#state.known) or + IsKnown = maps:is_key(Node, S0#state.known) or %% This one is only for double nodeups (shouldn't occur!) lists:keymember(Node, 1, S0#state.resolvers), ?trace({'####', nodeup, {node,Node}, {isknown,IsKnown}}), @@ -1129,8 +1134,9 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> %% Known may have shrunk since the lock was taken (due to nodedowns). Known = S0#state.known, Synced = S0#state.synced, - NewNodes = [Node | HisKnown], - sync_others(HisKnown), + NewNodes = make_node_vsn_list([Node | HisKnown], S0), + HisKnownNodes = node_list(HisKnown), + sync_others(HisKnownNodes), ExtraInfo = [{vsn,get({prot_vsn, Node})}, {lock, get({lock_id, Node})}], S = do_ops(Ops, node(), Names_ext, ExtraInfo, S0), %% I am synced with Node, but not with HisKnown yet @@ -1138,25 +1144,35 @@ resolved(Node, HisResolved, HisKnown, Names_ext, S0) -> S3 = lists:foldl(fun(Node1, S1) -> F = fun(Tag) -> cancel_locker(Node1,S1,Tag) end, cancel_resolved_locker(Node1, F) - end, S, HisKnown), + end, S, HisKnownNodes), %% The locker that took the lock is asked to send %% the {new_nodes, ...} message. This ensures that %% {del_lock, ...} is received after {new_nodes, ...} - %% (except when abcast spawns process(es)...). + NewNodesMsg = {new_nodes, node(), Ops, Names_ext, NewNodes, ExtraInfo}, NewNodesF = fun() -> - gen_server:abcast(Known, global_name_server, - {new_nodes, node(), Ops, Names_ext, - NewNodes, ExtraInfo}) + lists:foreach( + fun (N) -> + case maps:get(N, Known) of + V when V >= 7 -> + gen_server:cast({global_name_server, N}, + NewNodesMsg); + _OldV -> + gen_server:cast({global_name_server, N}, + {new_nodes, node(), Ops, + Names_ext, node_list(NewNodes), + ExtraInfo}) + end + end, + maps:keys(Known)) end, F = fun(Tag) -> cancel_locker(Node, S3, Tag, NewNodesF) end, S4 = cancel_resolved_locker(Node, F), %% See (*) below... we're node b in that description - AddedNodes = (NewNodes -- Known), - NewKnown = Known ++ AddedNodes, - S4#state.the_locker ! {add_to_known, AddedNodes}, - NewS = trace_message(S4, {added, AddedNodes}, - [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]), - NewS#state{known = NewKnown, synced = [Node | Synced]}. + {AddedNodes, S5} = add_to_known(NewNodes, S4), + S5#state.the_locker ! {add_to_known, AddedNodes}, + S6 = trace_message(S5, {added, AddedNodes}, + [{new_nodes, NewNodes}, {abcast, Known}, {ops,Ops}]), + S6#state{synced = [Node | Synced]}. cancel_resolved_locker(Node, CancelFun) -> Tag = get({sync_tag_my, Node}), @@ -1166,19 +1182,17 @@ cancel_resolved_locker(Node, CancelFun) -> S. new_nodes(Ops, ConnNode, Names_ext, Nodes, ExtraInfo, S0) -> - Known = S0#state.known, %% (*) This one requires some thought... %% We're node a, other nodes b and c: %% The problem is that {in_sync, a} may arrive before {resolved, [a]} to %% b from c, leading to b sending {new_nodes, [a]} to us (node a). %% Therefore, we make sure we never get duplicates in Known. - AddedNodes = lists:delete(node(), Nodes -- Known), + {AddedNodes, S1} = add_to_known(Nodes, S0), sync_others(AddedNodes), - S = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S0), + S2 = do_ops(Ops, ConnNode, Names_ext, ExtraInfo, S1), ?trace({added_nodes_in_sync,{added_nodes,AddedNodes}}), - S#state.the_locker ! {add_to_known, AddedNodes}, - S1 = trace_message(S, {added, AddedNodes}, [{ops,Ops}]), - S1#state{known = Known ++ AddedNodes}. + S2#state.the_locker ! {add_to_known, AddedNodes}, + trace_message(S2, {added, AddedNodes}, [{ops,Ops}]). do_whereis(Name, From) -> case is_global_lock_set() of @@ -1935,6 +1949,7 @@ cancel_locker(Node, S, Tag, ToBeRunOnLockerF) -> reset_node_state(Node) -> ?trace({{node,Node}, reset_node_state, get()}), + erase({pending_known, Node}), erase({wait_lock, Node}), erase({save_ops, Node}), erase({pre_connect, Node}), @@ -2072,13 +2087,74 @@ ref_is_locking(Ref, PidRefs) -> handle_nodedown(Node, S) -> %% DOWN signals from monitors have removed locks and registered names. - #state{known = Known, synced = Syncs} = S, + #state{synced = Syncs} = S, NewS = cancel_locker(Node, S, get({sync_tag_my, Node})), NewS#state.the_locker ! {remove_from_known, Node}, reset_node_state(Node), - NewS#state{known = lists:delete(Node, Known), + NewS#state{known = maps:remove(Node, S#state.known), synced = lists:delete(Node, Syncs)}. +node_vsn(Node, #state{known = Known}) -> + case maps:find(Node, Known) of + {ok, Ver} -> + Ver; + error -> + case get({pending_known, Node}) of + undefined -> -1; + Ver when is_integer(Ver) -> Ver + end + end. + +node_list(NList) -> + lists:map(fun (N) when is_atom(N) -> + N; + ({N, _V}) when is_atom(N) -> + N + end, NList). + + +make_node_vsn_list(NList, #state{} = S) -> + lists:map(fun ({N, -1}) when is_atom(N) -> + {N, node_vsn(N, S)}; + (N) when is_atom(N) -> + {N, node_vsn(N, S)}; + ({N, V} = NV) when is_atom(N), + is_integer(V) -> + NV + end, NList). + + +mk_known_list(Vsn, #state{known = Known}) when Vsn < 7 -> + lists:map(fun ({N, _V}) -> N end, maps:to_list(Known)); +mk_known_list(_Vsn, #state{known = Known}) -> + maps:to_list(Known). + +add_to_known(AddKnown, #state{known = Known} = S) -> + Fun = fun (N, Acc) when N == node() -> + Acc; + ({N, _V}, Acc) when N == node() -> + Acc; + (N, {A, K} = Acc) when is_atom(N) -> + case maps:is_key(N, K) of + true -> Acc; + false -> {[N|A], maps:put(N, -1, K)} + end; + ({N, V}, {A, K} = Acc) -> + case maps:find(N, K) of + error -> + {[N|A], maps:put(N, V, K)}; + {ok, NV} when NV >= 0 -> + Acc; + {ok, _UnknownVsn} -> + %% Update version, but don't count + %% it as an added node... + {A, maps:put(N, V, K)} + end + end, + + {Added, NewKnown} = lists:foldl(Fun, {[], Known}, AddKnown), + {Added, S#state{known = NewKnown}}. + get_names() -> ets:select(global_names, ets:fun2ms(fun({Name, Pid, Method, _Ref}) -> -- cgit v1.2.1