summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Fedorov <maximfca@gmail.com>2022-06-07 09:47:09 -0700
committerMaxim Fedorov <maximfca@gmail.com>2022-06-07 09:47:09 -0700
commit1dde3119e070472709f6d28296546b2d15b3c4a3 (patch)
treef56db6fc762c1596c2701e99037f0497e454220b
parent937c0c57afeddac4154cf334dfb72269d73f6630 (diff)
downloaderlang-1dde3119e070472709f6d28296546b2d15b3c4a3.tar.gz
[pg] refactor internals for readability
Original implementation has a number of inconsistencies that makes it harder to read. This patch: - renames 'monitors' to 'local' (processes running on the local node) - renames 'nodes' to 'remote' for consnstency with 'local' - makes naming for join/leave updating server state and ETS table consistent
-rw-r--r--lib/kernel/src/pg.erl184
1 files changed, 89 insertions, 95 deletions
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 573d2e6953..8d0f6124f1 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -204,9 +204,9 @@ which_local_groups(Scope) when is_atom(Scope) ->
%% ETS table name, and also the registered process name (self())
scope :: atom(),
%% monitored local processes and groups they joined
- monitors = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
+ local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
%% remote node: scope process monitor and map of groups to pids for fast sync routine
- nodes = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
+ remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
}).
-type state() :: #state{}.
@@ -214,44 +214,44 @@ which_local_groups(Scope) when is_atom(Scope) ->
-spec init([Scope :: atom()]) -> {ok, state()}.
init([Scope]) ->
ok = net_kernel:monitor_nodes(true),
- %% discover all nodes in the cluster
+ %% discover all nodes running this scope in the cluster
broadcast([{Scope, Node} || Node <- nodes()], {discover, self()}),
Scope = ets:new(Scope, [set, protected, named_table, {read_concurrency, true}]),
{ok, #state{scope = Scope}}.
-spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
| {leave_local, Group :: group(), Pid :: pid()},
- From :: {pid(),Tag :: any()},
+ From :: {pid(), Tag :: any()},
State :: state()) -> {reply, ok | not_joined, state()}.
-handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
- NewMons = join_monitors(PidOrPids, Group, Monitors),
- join_local_group(Scope, Group, PidOrPids),
- broadcast(maps:keys(Nodes), {join, self(), Group, PidOrPids}),
- {reply, ok, State#state{monitors = NewMons}};
+handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+ NewLocal = join_local(PidOrPids, Group, Local),
+ join_local_update_ets(Scope, Group, PidOrPids),
+ broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
+ {reply, ok, State#state{local = NewLocal}};
-handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
- case leave_monitors(PidOrPids, Group, Monitors) of
- Monitors ->
+handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+ case leave_local(PidOrPids, Group, Local) of
+ Local ->
{reply, not_joined, State};
- NewMons ->
- leave_local_group(Scope, Group, PidOrPids),
- broadcast(maps:keys(Nodes), {leave, self(), PidOrPids, [Group]}),
- {reply, ok, State#state{monitors = NewMons}}
+ NewLocal ->
+ leave_local_update_ets(Scope, Group, PidOrPids),
+ broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
+ {reply, ok, State#state{local = NewLocal}}
end;
handle_call(_Request, _From, _S) ->
- error(badarg).
+ erlang:error(badarg).
-spec handle_cast(
{sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
State :: state()) -> {noreply, state()}.
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
- {noreply, State#state{nodes = handle_sync(Scope, Peer, Nodes, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+ {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}};
handle_cast(_, _State) ->
- error(badarg).
+ erlang:error(badarg).
-spec handle_info(
{discover, Peer :: pid()} |
@@ -261,13 +261,13 @@ handle_cast(_, _State) ->
{nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
%% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} = State) ->
- case maps:get(Peer, Nodes, []) of
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) ->
+ case maps:get(Peer, Remote, []) of
{MRef, RemoteGroups} ->
- join_remote(Scope, Group, PidOrPids),
+ join_remote_update_ets(Scope, Group, PidOrPids),
%% store remote group => pids map for fast sync operation
- NewRemoteGroups = join_remote_map(Group, PidOrPids, RemoteGroups),
- {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteGroups}}}};
+ NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
+ {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
[] ->
%% handle possible race condition, when remote node is flickering up/down,
%% and remote join can happen after the node left overlay network
@@ -277,12 +277,12 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes}
end;
%% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
- case maps:get(Peer, Nodes, []) of
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+ case maps:get(Peer, Remote, []) of
{MRef, RemoteMap} ->
- _ = leave_remote(Scope, PidOrPids, Groups),
- NewRemoteMap = leave_update_remote_map(PidOrPids, RemoteMap, Groups),
- {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteMap}}}};
+ _ = leave_remote_update_ets(Scope, PidOrPids, Groups),
+ NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
+ {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
[] ->
%% Handle race condition: remote node disconnected, but scope process
%% of the remote node was just about to send 'leave' message. In this
@@ -294,36 +294,36 @@ handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Node
end;
%% we're being discovered, let's exchange!
-handle_info({discover, Peer}, #state{nodes = Nodes, monitors = Monitors} = State) ->
- gen_server:cast(Peer, {sync, self(), all_local_pids(Monitors)}),
+handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
+ gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
%% do we know who is looking for us?
- case maps:is_key(Peer, Nodes) of
+ case maps:is_key(Peer, Remote) of
true ->
{noreply, State};
false ->
- MRef = monitor(process, Peer),
+ MRef = erlang:monitor(process, Peer),
erlang:send(Peer, {discover, self()}, [noconnect]),
- {noreply, State#state{nodes = Nodes#{Peer => {MRef, #{}}}}}
+ {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
end;
%% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) when node(Pid) =:= node() ->
- case maps:take(Pid, Monitors) of
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() ->
+ case maps:take(Pid, Local) of
error ->
%% this can only happen when leave request and 'DOWN' are in pg queue
{noreply, State};
- {{MRef, Groups}, NewMons} ->
- [leave_local_group(Scope, Group, Pid) || Group <- Groups],
- %% send update to all nodes
- broadcast(maps:keys(Nodes), {leave, self(), Pid, Groups}),
- {noreply, State#state{monitors = NewMons}}
+ {{MRef, Groups}, NewLocal} ->
+ [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups],
+ %% send update to all scope processes on remote nodes
+ broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
+ {noreply, State#state{local = NewLocal}}
end;
%% handle remote node down or leaving overlay network
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, nodes = Nodes} = State) ->
- {{MRef, RemoteMap}, NewNodes} = maps:take(Pid, Nodes),
- maps:foreach(fun (Group, Pids) -> leave_remote(Scope, Pids, [Group]) end, RemoteMap),
- {noreply, State#state{nodes = NewNodes}};
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State) ->
+ {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote),
+ maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap),
+ {noreply, State#state{remote = NewRemote}};
%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
handle_info({nodedown, _Node}, State) ->
@@ -337,7 +337,7 @@ handle_info({nodeup, Node}, #state{scope = Scope} = State) ->
{noreply, State};
handle_info(_Info, _State) ->
- error(badarg).
+ erlang:error(badarg).
-spec terminate(Reason :: any(), State :: state()) -> true.
terminate(_Reason, #state{scope = Scope}) ->
@@ -355,31 +355,31 @@ ensure_local(Pids) when is_list(Pids) ->
(Pid) when is_pid(Pid), node(Pid) =:= node() ->
ok;
(Bad) ->
- error({nolocal, Bad})
+ erlang:error({nolocal, Bad})
end, Pids);
ensure_local(Bad) ->
- error({nolocal, Bad}).
+ erlang:error({nolocal, Bad}).
%% Override all knowledge of the remote node with information it sends
%% to local node. Current implementation must do the full table scan
%% to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, Peer, Nodes, Groups) ->
+handle_sync(Scope, Peer, Remote, Groups) ->
%% can't use maps:get() because it evaluates 'default' value first,
%% and in this case monitor() call has side effect.
{MRef, RemoteGroups} =
- case maps:find(Peer, Nodes) of
+ case maps:find(Peer, Remote) of
error ->
- {monitor(process, Peer), #{}};
+ {erlang:monitor(process, Peer), #{}};
{ok, MRef0} ->
MRef0
end,
%% sync RemoteMap and transform ETS table
_ = sync_groups(Scope, RemoteGroups, Groups),
- Nodes#{Peer => {MRef, maps:from_list(Groups)}}.
+ Remote#{Peer => {MRef, maps:from_list(Groups)}}.
sync_groups(Scope, RemoteGroups, []) ->
%% leave all missing groups
- [leave_remote(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+ [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
case maps:take(Group, RemoteGroups) of
{Pids, NewRemoteGroups} ->
@@ -391,31 +391,31 @@ sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
sync_groups(Scope, NewRemoteGroups, Tail);
error ->
- join_remote(Scope, Group, Pids),
+ join_remote_update_ets(Scope, Group, Pids),
sync_groups(Scope, RemoteGroups, Tail)
end.
-join_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
- case maps:find(Pid, Monitors) of
+join_local(Pid, Group, Local) when is_pid(Pid) ->
+ case maps:find(Pid, Local) of
{ok, {MRef, Groups}} ->
- maps:put(Pid, {MRef, [Group | Groups]}, Monitors);
+ maps:put(Pid, {MRef, [Group | Groups]}, Local);
error ->
MRef = erlang:monitor(process, Pid),
- Monitors#{Pid => {MRef, [Group]}}
+ Local#{Pid => {MRef, [Group]}}
end;
-join_monitors([], _Group, Monitors) ->
- Monitors;
-join_monitors([Pid | Tail], Group, Monitors) ->
- join_monitors(Tail, Group, join_monitors(Pid, Group, Monitors)).
+join_local([], _Group, Local) ->
+ Local;
+join_local([Pid | Tail], Group, Local) ->
+ join_local(Tail, Group, join_local(Pid, Group, Local)).
-join_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
[] ->
ets:insert(Scope, {Group, [Pid], [Pid]})
end;
-join_local_group(Scope, Group, Pids) ->
+join_local_update_ets(Scope, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
@@ -423,14 +423,14 @@ join_local_group(Scope, Group, Pids) ->
ets:insert(Scope, {Group, Pids, Pids})
end.
-join_remote(Scope, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], Local});
[] ->
ets:insert(Scope, {Group, [Pid], []})
end;
-join_remote(Scope, Group, Pids) ->
+join_remote_update_ets(Scope, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Local});
@@ -438,32 +438,32 @@ join_remote(Scope, Group, Pids) ->
ets:insert(Scope, {Group, Pids, []})
end.
-join_remote_map(Group, Pid, RemoteGroups) when is_pid(Pid) ->
+join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
-join_remote_map(Group, Pids, RemoteGroups) ->
+join_remote(Group, Pids, RemoteGroups) ->
maps:update_with(Group, fun (List) -> Pids ++ List end, Pids, RemoteGroups).
-leave_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
- case maps:find(Pid, Monitors) of
+leave_local(Pid, Group, Local) when is_pid(Pid) ->
+ case maps:find(Pid, Local) of
{ok, {MRef, [Group]}} ->
erlang:demonitor(MRef),
- maps:remove(Pid, Monitors);
+ maps:remove(Pid, Local);
{ok, {MRef, Groups}} ->
case lists:member(Group, Groups) of
true ->
- maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Monitors);
+ maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Local);
false ->
- Monitors
+ Local
end;
_ ->
- Monitors
+ Local
end;
-leave_monitors([], _Group, Monitors) ->
- Monitors;
-leave_monitors([Pid | Tail], Group, Monitors) ->
- leave_monitors(Tail, Group, leave_monitors(Pid, Group, Monitors)).
+leave_local([], _Group, Local) ->
+ Local;
+leave_local([Pid | Tail], Group, Local) ->
+ leave_local(Tail, Group, leave_local(Pid, Group, Local)).
-leave_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, [Pid], [Pid]}] ->
ets:delete(Scope, Group);
@@ -473,7 +473,7 @@ leave_local_group(Scope, Group, Pid) when is_pid(Pid) ->
%% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
true
end;
-leave_local_group(Scope, Group, Pids) ->
+leave_local_update_ets(Scope, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
case All -- Pids of
@@ -486,7 +486,7 @@ leave_local_group(Scope, Group, Pids) ->
true
end.
-leave_remote(Scope, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, [Pid], []}] ->
@@ -497,7 +497,7 @@ leave_remote(Scope, Pid, Groups) when is_pid(Pid) ->
true
end ||
Group <- Groups];
-leave_remote(Scope, Pids, Groups) ->
+leave_remote_update_ets(Scope, Pids, Groups) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
@@ -512,9 +512,9 @@ leave_remote(Scope, Pids, Groups) ->
end ||
Group <- Groups].
-leave_update_remote_map(Pid, RemoteMap, Groups) when is_pid(Pid) ->
- leave_update_remote_map([Pid], RemoteMap, Groups);
-leave_update_remote_map(Pids, RemoteMap, Groups) ->
+leave_remote(Pid, RemoteMap, Groups) when is_pid(Pid) ->
+ leave_remote([Pid], RemoteMap, Groups);
+leave_remote(Pids, RemoteMap, Groups) ->
lists:foldl(
fun (Group, Acc) ->
case maps:get(Group, Acc) -- Pids of
@@ -525,20 +525,14 @@ leave_update_remote_map(Pids, RemoteMap, Groups) ->
end
end, RemoteMap, Groups).
-all_local_pids(Monitors) ->
+all_local_pids(Local) ->
maps:to_list(maps:fold(
fun(Pid, {_Ref, Groups}, Acc) ->
lists:foldl(
fun(Group, Acc1) ->
Acc1#{Group => [Pid | maps:get(Group, Acc1, [])]}
- end,
- Acc,
- Groups
- )
- end,
- #{},
- Monitors
- )).
+ end, Acc, Groups)
+ end, #{}, Local)).
%% Works as gen_server:abcast(), but accepts a list of processes
%% instead of nodes list.