summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Fedorov <maximfca@gmail.com>2022-06-07 15:15:11 -0700
committerMaxim Fedorov <maximfca@gmail.com>2022-06-17 11:04:27 -0700
commitaf9b3d19d834e6bdc1da21eaaf7e43d74220004b (patch)
treeb5f6cf3a6601c28f4c2932375348871fe0b410a2
parentfadfcff7ac17538db6100350585aac6407d70305 (diff)
downloaderlang-af9b3d19d834e6bdc1da21eaaf7e43d74220004b.tar.gz
[pg] Implement single group monitoring
Implement pg:monitor/1,2 that subscribe to changes for a single process group (similar to monitor_scope/0,1).
-rw-r--r--lib/kernel/doc/src/pg.xml15
-rw-r--r--lib/kernel/src/pg.erl185
-rw-r--r--lib/kernel/test/pg_SUITE.erl97
3 files changed, 210 insertions, 87 deletions
diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml
index 85b1529f44..4a308dca73 100644
--- a/lib/kernel/doc/src/pg.xml
+++ b/lib/kernel/doc/src/pg.xml
@@ -166,11 +166,24 @@
</func>
<func>
+ <name name="monitor" arity="1" since="OTP 25.1"/>
+ <name name="monitor" arity="2" since="OTP 25.1"/>
+ <fsummary>Starts membership monitoring for a specified group.</fsummary>
+ <desc>
+ <p>Subscribes the caller to updates for the specified group. Returns
+ list of processes currently in the group, and a reference to match
+ the upcoming notifications.</p>
+ <p>See <seemfa marker="#monitor_scope/0"><c>monitor_scope/0</c></seemfa>
+ for the update message structure.</p>
+ </desc>
+ </func>
+
+ <func>
<name name="demonitor" arity="1" since="OTP 25.1"/>
<name name="demonitor" arity="2" since="OTP 25.1"/>
<fsummary>Stops group membership monitoring.</fsummary>
<desc>
- <p>Unsubscribes the caller from updates off the specified scope.
+ <p>Unsubscribes the caller from updates (scope or group).
Flushes all outstanding updates that were already in the message
queue of the calling process.</p>
</desc>
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 845021c7fa..a44794e352 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -62,6 +62,8 @@
leave/2,
monitor_scope/0,
monitor_scope/1,
+ monitor/1,
+ monitor/2,
demonitor/1,
demonitor/2,
get_members/1,
@@ -162,6 +164,26 @@ monitor_scope(Scope) ->
%%--------------------------------------------------------------------
%% @doc
+%% Returns list of processes in the requested group, and begins monitoring
+%% group changes. Calling process will receive {Ref, join, Group, Pids}
+%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%% Pids leave the group.
+-spec monitor(Group :: group()) -> {reference(), [pid()]}.
+monitor(Group) ->
+ ?MODULE:monitor(?DEFAULT_SCOPE, Group).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns list of processes in the requested group, and begins monitoring
+%% group changes. Calling process will receive {Ref, join, Group, Pids}
+%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%% Pids leave the group.
+-spec monitor(Scope :: atom(), Group :: group()) -> {reference(), [pid()]}.
+monitor(Scope, Group) ->
+ gen_server:call(Scope, {monitor, Group}, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
%% Stops monitoring Scope for groups changes. Flushes all
%% {Ref, join|leave, Group, Pids} messages from the calling process queue.
-spec demonitor(Ref :: reference()) -> ok | false.
@@ -238,7 +260,10 @@ which_local_groups(Scope) when is_atom(Scope) ->
%% remote node: scope process monitor and map of groups to pids for fast sync routine
remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}},
%% processes monitoring group membership
- scope_monitors = #{} :: #{reference() => pid()}
+ scope_monitors = #{} :: #{reference() => pid()},
+ %% processes monitoring specific groups (forward and reverse map)
+ group_monitors = #{} :: #{reference() => group()},
+ monitored_groups = #{} :: #{group() => [{pid(), reference()}]}
}).
-type state() :: #state{}.
@@ -254,25 +279,26 @@ init([Scope]) ->
-spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
| {leave_local, Group :: group(), Pid :: pid()}
| monitor
+ | {monitor, Group :: group()}
| {demonitor, Ref :: reference()},
From :: {pid(), Tag :: any()},
State :: state()) ->
{reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}.
handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
- remote = Remote, scope_monitors = ScopeMon} = State) ->
+ remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
NewLocal = join_local(PidOrPids, Group, Local),
- join_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
+ join_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
{reply, ok, State#state{local = NewLocal}};
handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
- remote = Remote, scope_monitors = ScopeMon} = State) ->
+ remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
case leave_local(PidOrPids, Group, Local) of
Local ->
{reply, not_joined, State};
NewLocal ->
- leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
+ leave_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
{reply, ok, State#state{local = NewLocal}}
end;
@@ -283,24 +309,45 @@ handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMo
MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
{reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
-handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) ->
+handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) ->
+ %% ETS cache is writable only from this process - so get_members is safe to use
+ Members = get_members(Scope, Group),
+ MRef = erlang:monitor(process, Pid),
+ NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG),
+ {reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}};
+
+handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon, group_monitors = GM,
+ monitored_groups = MG} = State) ->
+ %% not using maybe_drop_monitor here as it does not demonitor, and can not return 'false'
case maps:take(Ref, ScopeMon) of
{_, NewMons} ->
erlang:demonitor(Ref),
{reply, ok, State#state{scope_monitors = NewMons}};
error ->
- {reply, false, State}
+ %% group monitor
+ case maps:take(Ref, GM) of
+ {{Pid, Group}, NewGM} ->
+ erlang:demonitor(Ref),
+ {reply, ok, State#state{group_monitors = NewGM,
+ monitored_groups = demonitor_group({Pid, Ref}, Group, MG)}};
+ error ->
+ {reply, false, State}
+ end
end;
handle_call(_Request, _From, _S) ->
erlang:error(badarg).
-spec handle_cast(
- {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
+ {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]} |
+ {discover, Peer :: pid()} |
+ {join, Peer :: pid(), group(), pid() | [pid()]} |
+ {leave, Peer :: pid(), pid() | [pid()], [group()]},
State :: state()) -> {noreply, state()}.
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
- {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+ monitored_groups = MG} = State) ->
+ {noreply, State#state{remote = handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups)}};
handle_cast(_, _State) ->
erlang:error(badarg).
@@ -313,10 +360,11 @@ handle_cast(_, _State) ->
{nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
%% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+ monitored_groups = MG} = State) ->
case maps:get(Peer, Remote, []) of
{MRef, RemoteGroups} ->
- join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids),
+ join_remote_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
%% store remote group => pids map for fast sync operation
NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
{noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
@@ -329,10 +377,11 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot
end;
%% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+ monitored_groups = MG} = State) ->
case maps:get(Peer, Remote, []) of
{MRef, RemoteMap} ->
- _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups),
+ _ = leave_remote_update_ets(Scope, ScopeMon, MG, PidOrPids, Groups),
NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
{noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
[] ->
@@ -358,28 +407,29 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
{noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
end;
-%% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote,
- scope_monitors = ScopeMon} = State) when node(Pid) =:= node() ->
+%% handle local process exit, or a local monitor exit
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
+ remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() ->
case maps:take(Pid, Local) of
error ->
- maybe_drop_monitor(MRef, State);
+ {noreply, maybe_drop_monitor(MRef, State)};
{{MRef, Groups}, NewLocal} ->
- [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups],
- %% send update to all scope processes on remote nodes
+ [leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups],
+ %% send update to all remote peers
broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
{noreply, State#state{local = NewLocal}}
end;
%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
- scope_monitors = ScopeMon} = State) ->
+ scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
case maps:take(Pid, Remote) of
{{MRef, RemoteMap}, NewRemote} ->
- maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap),
+ maps:foreach(fun (Group, Pids) ->
+ leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap),
{noreply, State#state{remote = NewRemote}};
error ->
- maybe_drop_monitor(MRef, State)
+ {noreply, maybe_drop_monitor(MRef, State)}
end;
%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
@@ -420,7 +470,7 @@ ensure_local(Bad) ->
%% Override all knowledge of the remote node with information it sends
%% to local node. Current implementation must do the full table scan
%% to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
+handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups) ->
%% can't use maps:get() because it evaluates 'default' value first,
%% and in this case monitor() call has side effect.
{MRef, RemoteGroups} =
@@ -431,25 +481,25 @@ handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
MRef0
end,
%% sync RemoteMap and transform ETS table
- _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups),
+ _ = sync_groups(Scope, ScopeMon, MG, RemoteGroups, Groups),
Remote#{Peer => {MRef, maps:from_list(Groups)}}.
-sync_groups(Scope, ScopeMon, RemoteGroups, []) ->
+sync_groups(Scope, ScopeMon, MG, RemoteGroups, []) ->
%% leave all missing groups
- [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
-sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) ->
+ [leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+sync_groups(Scope, ScopeMon, MG, RemoteGroups, [{Group, Pids} | Tail]) ->
case maps:take(Group, RemoteGroups) of
{Pids, NewRemoteGroups} ->
- sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
+ sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail);
{OldPids, NewRemoteGroups} ->
[{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group),
%% should be really rare...
AllNewPids = Pids ++ AllOldPids -- OldPids,
true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
- sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
+ sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail);
error ->
- join_remote_update_ets(Scope, ScopeMon, Group, Pids),
- sync_groups(Scope, ScopeMon, RemoteGroups, Tail)
+ join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids),
+ sync_groups(Scope, ScopeMon, MG, RemoteGroups, Tail)
end.
join_local(Pid, Group, Local) when is_pid(Pid) ->
@@ -465,39 +515,39 @@ join_local([], _Group, Local) ->
join_local([Pid | Tail], Group, Local) ->
join_local(Tail, Group, join_local(Pid, Group, Local)).
-join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
[] ->
ets:insert(Scope, {Group, [Pid], [Pid]})
end,
- notify_group(ScopeMon, join, Group, [Pid]);
-join_local_update_ets(Scope, ScopeMon, Group, Pids) ->
+ notify_group(ScopeMon, MG, join, Group, [Pid]);
+join_local_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
[] ->
ets:insert(Scope, {Group, Pids, Pids})
end,
- notify_group(ScopeMon, join, Group, Pids).
+ notify_group(ScopeMon, MG, join, Group, Pids).
-join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], Local});
[] ->
ets:insert(Scope, {Group, [Pid], []})
end,
- notify_group(ScopeMon, join, Group, [Pid]);
-join_remote_update_ets(Scope, ScopeMon, Group, Pids) ->
+ notify_group(ScopeMon, MG, join, Group, [Pid]);
+join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Local});
[] ->
ets:insert(Scope, {Group, Pids, []})
end,
- notify_group(ScopeMon, join, Group, Pids).
+ notify_group(ScopeMon, MG, join, Group, Pids).
join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
@@ -524,19 +574,19 @@ leave_local([], _Group, Local) ->
leave_local([Pid | Tail], Group, Local) ->
leave_local(Tail, Group, leave_local(Pid, Group, Local)).
-leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, [Pid], [Pid]}] ->
ets:delete(Scope, Group),
- notify_group(ScopeMon, leave, Group, [Pid]);
+ notify_group(ScopeMon, MG, leave, Group, [Pid]);
[{Group, All, Local}] ->
ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}),
- notify_group(ScopeMon, leave, Group, [Pid]);
+ notify_group(ScopeMon, MG, leave, Group, [Pid]);
[] ->
%% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
true
end;
-leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
+leave_local_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
case All -- Pids of
@@ -545,25 +595,25 @@ leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
NewAll ->
ets:insert(Scope, {Group, NewAll, Local -- Pids})
end,
- notify_group(ScopeMon, leave, Group, Pids);
+ notify_group(ScopeMon, MG, leave, Group, Pids);
[] ->
true
end.
-leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, ScopeMon, MG, Pid, Groups) when is_pid(Pid) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, [Pid], []}] ->
ets:delete(Scope, Group),
- notify_group(ScopeMon, leave, Group, [Pid]);
+ notify_group(ScopeMon, MG, leave, Group, [Pid]);
[{Group, All, Local}] ->
ets:insert(Scope, {Group, lists:delete(Pid, All), Local}),
- notify_group(ScopeMon, leave, Group, [Pid]);
+ notify_group(ScopeMon, MG, leave, Group, [Pid]);
[] ->
true
end ||
Group <- Groups];
-leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
+leave_remote_update_ets(Scope, ScopeMon, MG, Pids, Groups) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
@@ -573,7 +623,7 @@ leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
NewAll ->
ets:insert(Scope, {Group, NewAll, Local})
end,
- notify_group(ScopeMon, leave, Group, Pids);
+ notify_group(ScopeMon, MG, leave, Group, Pids);
[] ->
true
end ||
@@ -611,25 +661,44 @@ broadcast([Dest | Tail], Msg) ->
erlang:send(Dest, Msg, [noconnect]),
broadcast(Tail, Msg).
-
%% drops a monitor if DOWN was received
-maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) ->
+maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) ->
%% could be a local monitor going DOWN. Since it's a rare event, check should
%% not stay in front of any other, more frequent events
case maps:take(MRef, ScopeMon) of
error ->
- %% this can only happen when leave request and 'DOWN' are in pg queue
- {noreply, State};
+ case maps:take(MRef, GMs) of
+ error ->
+ State;
+ {{Pid, Group}, NewGM} ->
+ %% clean up the inverse map
+ State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}
+ end;
{_Pid, NewScopeMon} ->
- {noreply, State#state{scope_monitors = NewScopeMon}}
+ State#state{scope_monitors = NewScopeMon}
+ end.
+
+demonitor_group(Tag, Group, MG) ->
+ case maps:find(Group, MG) of
+ {ok, [Tag]} ->
+ maps:remove(Group, MG);
+ {ok, Tags} ->
+ maps:put(Group, Tags -- [Tag], MG)
end.
-%% notify all scope monitors about an Action in Groups for Pids
-notify_group(ScopeMon, Action, Group, Pids) ->
+%% notify all monitors about an Action in Groups for Pids
+notify_group(ScopeMonitors, MG, Action, Group, Pids) ->
maps:foreach(
fun (Ref, Pid) ->
erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect])
- end, ScopeMon).
+ end, ScopeMonitors),
+ case maps:find(Group, MG) of
+ error ->
+ ok;
+ {ok, Monitors} ->
+ [erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect]) || {Pid, Ref} <- Monitors],
+ ok
+ end.
%% remove all messages that were send to monitor groups
flush(Ref) ->
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index 5d56b751e0..130b54a5f8 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -55,7 +55,8 @@
disconnected_start/1,
forced_sync/0, forced_sync/1,
group_leave/1,
- monitor_scope/0, monitor_scope/1
+ monitor_scope/0, monitor_scope/1,
+ monitor/1
]).
-include_lib("common_test/include/ct.hrl").
@@ -82,7 +83,7 @@ groups() ->
{cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
disconnected_start, forced_sync, group_leave]},
- {monitor, [parallel], [monitor_scope]}
+ {monitor, [parallel], [monitor_scope, monitor]}
].
%%--------------------------------------------------------------------
@@ -111,7 +112,7 @@ errors(_Config) ->
?assertException(error, badarg, pg:handle_cast(garbage, garbage)),
%% kill with call
{ok, _Pid} = pg:start(second),
- ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)).
+ ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage)).
leave_exit_race() ->
[{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}].
@@ -228,7 +229,9 @@ two(Config) when is_list(Config) ->
Pid = erlang:spawn(forever()),
?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
- %% first RPC must be serialised
+ %% first RPC must be serialised 3 times
+ sync({?FUNCTION_NAME, Node}),
+ sync(?FUNCTION_NAME),
sync({?FUNCTION_NAME, Node}),
?assertEqual([Pid], rpc:call(Node, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
?assertEqual([], rpc:call(Node, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
@@ -243,7 +246,7 @@ two(Config) when is_list(Config) ->
?assertEqual(ok, rpc:call(Node, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])),
?assertEqual(ok, rpc:call(Node, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])),
%% serialise through the *other* node
- sync({?FUNCTION_NAME, Node}),
+ sync_via({?FUNCTION_NAME, Node}, ?FUNCTION_NAME),
?assertEqual(lists:sort([Pid2, Pid3]),
lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
%% stop the peer
@@ -311,7 +314,11 @@ initial(Config) when is_list(Config) ->
?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
{Peer, Node} = spawn_node(?FUNCTION_NAME),
- %% first RPC must be serialised
+ %% first sync makes the peer node to process 'nodeup' (and send discover)
+ sync({?FUNCTION_NAME, Node}),
+ %% second sync makes origin node pg to reply to discover'
+ sync(?FUNCTION_NAME),
+ %% third sync makes peer node to finish processing 'exchange'
sync({?FUNCTION_NAME, Node}),
?assertEqual([Pid], rpc:call(Node, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
@@ -564,24 +571,44 @@ monitor_scope() ->
[{doc, "Tests monitor_scope/1 and demonitor/2"}].
monitor_scope(Config) when is_list(Config) ->
- Self = self(),
- Scope = ?FUNCTION_NAME,
- Group = ?FUNCTION_ARITY,
%% ensure that demonitoring returns 'false' when monitor is not installed
- ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())),
- %% start the actual test case
- {Ref, #{}} = pg:monitor_scope(Scope),
+ ?assertEqual(false, pg:demonitor(?FUNCTION_NAME, erlang:make_ref())),
+ InitialMonitor = fun (Scope) -> {Ref, #{}} = pg:monitor_scope(Scope), Ref end,
+ SecondMonitor = fun (Scope, Group, Control) -> {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope), Ref end,
+ %% WHITE BOX: knowing pg state internals - only the original monitor should stay
+ DownMonitor = fun (Scope, Ref, Self) ->
+ {state, _, _, _, ScopeMonitors, _, _} = sys:get_state(Scope),
+ ?assertEqual(#{Ref => Self}, ScopeMonitors, "pg did not remove DOWNed scope monitor")
+ end,
+ monitor_test_impl(?FUNCTION_NAME, ?FUNCTION_ARITY, InitialMonitor, SecondMonitor, DownMonitor).
+
+monitor(Config) when is_list(Config) ->
+ ExpectedGroup = {?FUNCTION_NAME, ?FUNCTION_ARITY},
+ InitialMonitor = fun (Scope) -> {Ref, []} = pg:monitor(Scope, ExpectedGroup), Ref end,
+ SecondMonitor = fun (Scope, Group, Control) ->
+ {Ref, [Control]} = pg:monitor(Scope, Group), Ref end,
+ DownMonitor = fun (Scope, Ref, Self) ->
+ {state, _, _, _, _, GM, MG} = sys:get_state(Scope),
+ ?assertEqual(#{Ref => {Self, ExpectedGroup}}, GM, "pg did not remove DOWNed group monitor"),
+ ?assertEqual(#{ExpectedGroup => [{Self, Ref}]}, MG, "pg did not remove DOWNed group")
+ end,
+ monitor_test_impl(?FUNCTION_NAME, ExpectedGroup, InitialMonitor, SecondMonitor, DownMonitor).
+
+monitor_test_impl(Scope, Group, InitialMonitor, SecondMonitor, DownMonitor) ->
+ Self = self(),
+ Ref = InitialMonitor(Scope),
%% local join
?assertEqual(ok, pg:join(Scope, Group, Self)),
wait_message(Ref, join, Group, [Self], "Local"),
%% start second monitor (which has 1 local pid at the start)
- SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end),
- Ref2 = receive {second, SecondRef} -> SecondRef end,
+ ExtraMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self, SecondMonitor) end),
+ Ref2 = receive {ExtraMonitor, SecondRef} -> SecondRef end,
%% start a remote node, and a remote monitor
{Peer, Node} = spawn_node(Scope),
ScopePid = whereis(Scope),
%% do not care about the remote monitor, it is started only to check DOWN handling
- _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end),
+ ThirdMonitor = spawn_link(Node, fun() -> second_monitor(ScopePid, Group, Self, SecondMonitor) end),
+ Ref3 = receive {ThirdMonitor, ThirdRef} -> ThirdRef end,
%% remote join
RemotePid = erlang:spawn(Node, forever()),
?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])),
@@ -592,21 +619,28 @@ monitor_scope(Config) when is_list(Config) ->
wait_message(Ref, leave, Group, [Self], "Local"),
%% remote leave
?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])),
+ %% flush the local pg scope via remote pg (to ensure local pg finished sending notifications)
+ sync_via({?FUNCTION_NAME, Node}, ?FUNCTION_NAME),
wait_message(Ref, leave, Group, [RemotePid], "Remote"),
- %% drop the SecondMonitor - this keeps original and remote monitors
- SecondMonMsgs = gen_server:call(SecondMonitor, flush),
+ %% drop the ExtraMonitor - this keeps original and remote monitors
+ SecondMonMsgs = gen_server:call(ExtraMonitor, flush),
%% inspect the queue, it should contain double remote join, then single local and single remove leave
- ?assertEqual([
+ ExpectedLocalMessages = [
{Ref2, join, Group, [RemotePid, RemotePid]},
{Ref2, leave, Group, [Self]},
{Ref2, leave, Group, [RemotePid]}],
- SecondMonMsgs),
+ ?assertEqual(ExpectedLocalMessages, SecondMonMsgs, "Local monitor failed"),
+ %% inspect remote monitor queue
+ ThirdMonMsgs = gen_server:call(ThirdMonitor, flush),
+ ExpectedRemoteMessages = [
+ {Ref3, join, Group, [RemotePid, RemotePid]},
+ {Ref3, leave, Group, [Self]},
+ {Ref3, leave, Group, [RemotePid]}],
+ ?assertEqual(ExpectedRemoteMessages, ThirdMonMsgs, "Remote monitor failed"),
%% remote leave via stop (causes remote monitor to go DOWN)
ok = peer:stop(Peer),
wait_message(Ref, leave, Group, [RemotePid], "Remote stop"),
- %% WHITE BOX: knowing pg state internals - only the original monitor should stay
- {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME),
- ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"),
+ DownMonitor(Scope, Ref, Self),
%% demonitor
?assertEqual(ok, pg:demonitor(Scope, Ref)),
?assertEqual(false, pg:demonitor(Scope, Ref)),
@@ -615,7 +649,7 @@ monitor_scope(Config) when is_list(Config) ->
sync(Scope),
%% join should not be here
receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"]))
- after 0 -> ok end.
+ after 0 -> ok end.
wait_message(Ref, Action, Group, Pids, Msg) ->
receive
@@ -624,12 +658,12 @@ wait_message(Ref, Action, Group, Pids, Msg) ->
after 1000 ->
{messages, Msgs} = process_info(self(), messages),
ct:pal("Message queue: ~0p", [Msgs]),
- ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur")
+ ?assert(false, lists:flatten(io_lib:format("Expected ~s ~s for ~p", [Msg, Action, Group])))
end.
-second_monitor(Scope, Group, Control) ->
- {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope),
- Control ! {second, Ref},
+second_monitor(Scope, Group, Control, SecondMonitor) ->
+ Ref = SecondMonitor(Scope, Group, Control),
+ Control ! {self(), Ref},
second_monitor([]).
second_monitor(Msgs) ->
@@ -643,9 +677,16 @@ second_monitor(Msgs) ->
%%--------------------------------------------------------------------
%% Test Helpers - start/stop additional Erlang nodes
+%% flushes GS (GenServer) queue, ensuring that all prior
+%% messages have been processed
sync(GS) ->
_ = sys:log(GS, get).
+%% flushes GS queue from the point of view of a registered process RegName
+%% running on the Node.
+sync_via({RegName, Node}, GS) ->
+ rpc:call(Node, sys, replace_state, [RegName, fun (S) -> (catch sys:get_state(GS)), S end]).
+
ensure_peers_info(Scope, Nodes) ->
%% Ensures that pg server on local node has gotten info from
%% pg servers on all Peer nodes passed as argument (assuming
@@ -659,7 +700,7 @@ ensure_peers_info(Scope, Nodes) ->
%%
sync(Scope),
- %% Known: nodup handled and discover sent to Peer
+ %% Known: nodeup handled and discover sent to Peer
lists:foreach(fun (Node) -> sync({Scope, Node}) end, Nodes),
%% Known: nodeup handled by Peers and discover sent to local