summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-05-28 16:43:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-05-28 16:43:42 +0100
commit422add5ea8793aa024eca016a6e0045be12e8fd3 (patch)
treebf34b042d2a9e12ea5e32b62b752e6ec4f25ba4d
parent91d50c8280419f391b8c5dd14863bfcf5c506d06 (diff)
downloadrabbitmq-server-422add5ea8793aa024eca016a6e0045be12e8fd3.tar.gz
Backport 55823b694d55 (Merge of bug24362; mirrored_supervisor tests frequently fail)
-rw-r--r--src/mirrored_supervisor.erl81
-rw-r--r--src/mirrored_supervisor_tests.erl6
2 files changed, 42 insertions, 45 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index c5f22c51..ab734fd9 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
+call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
+cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) ->
%% immediately after the tx - we can't be 100% here. So we may as
%% well dirty_select.
case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of
- [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity);
+ [Mirror] -> call(Mirror, Msg);
[] -> {error, not_found}
end.
@@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) ->
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- ?PG2:get_members(Group),
- D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]).
+ D <- [delegate(M)]]).
child(Sup, Id) ->
[Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
Id1 =:= Id],
Pid.
+delegate(Sup) -> child(Sup, delegate).
+mirroring(Sup) -> child(Sup, mirroring).
+
%%----------------------------------------------------------------------------
start_internal(Group, ChildSpecs) ->
@@ -293,28 +296,29 @@ handle_call({init, Overall}, _From,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
- ok = ?PG2:join(Group, self()),
- Rest = ?PG2:get_members(Group) -- [self()],
+ ok = ?PG2:join(Group, Overall),
+ Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
[] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
- ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}),
+ ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}),
erlang:monitor(process, Pid)
end || Pid <- Rest],
- Delegate = child(Overall, delegate),
+ Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
- State = #state{delegate = Delegate,
+ State = #state{overall = Overall,
+ delegate = Delegate,
group = Group}) ->
- {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
@@ -327,9 +331,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
- {reply, Delegate, State};
-
handle_call(group, _From, State = #state{group = Group}) ->
{reply, Group, State};
@@ -348,7 +349,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, Reason},
- State = #state{delegate = Pid, group = Group}) ->
+ State = #state{overall = Pid, group = Group}) ->
%% Since the delegate is temporary, its death won't cause us to
%% die. Since the overall supervisor kills processes in reverse
%% order when shutting down "from above" and we started after the
@@ -362,15 +363,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
- Self = self(),
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> []
+ [O | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(
+ fun() -> update_all(O, Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
case errors(R) of
[] -> {noreply, State};
@@ -389,13 +391,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
- [?GEN_SERVER:cast(P, {die, Reason}) ||
- P <- ?PG2:get_members(Group) -- [self()]].
+ [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Delegate, ChildSpec) ->
- case mnesia:transaction(fun() ->
- check_start(Group, Delegate, ChildSpec)
- end) of
+maybe_start(Group, Overall, Delegate, ChildSpec) ->
+ case mnesia:transaction(
+ fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
@@ -403,31 +403,29 @@ maybe_start(Group, Delegate, ChildSpec) ->
{aborted, E} -> {error, E}
end.
-check_start(Group, Delegate, ChildSpec) ->
+check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
- [] -> write(Group, ChildSpec),
+ [] -> write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
- case self() of
+ case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
- dead -> write(Group, ChildSpec),
+ dead -> write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
end
end.
-supervisor(Pid) ->
- with_exit_handler(
- fun() -> dead end,
- fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end).
+supervisor(Pid) -> with_exit_handler(fun() -> dead end,
+ fun() -> delegate(Pid) end).
-write(Group, ChildSpec) ->
+write(Group, Overall, ChildSpec) ->
ok = mnesia:write(
#mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = self(),
+ mirroring_pid = Overall,
childspec = ChildSpec}),
ChildSpec.
@@ -453,12 +451,12 @@ check_stop(Group, Delegate, Id) ->
id({Id, _, _, _, _, _}) -> Id.
-update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+update_all(Overall, OldOverall) ->
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
key = '$1',
childspec = '$2',
_ = '_'},
- [write(Group, C) ||
+ [write(Group, Overall, C) ||
[{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->
@@ -472,8 +470,7 @@ errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
-create_tables() ->
- create_tables([?TABLE_DEF]).
+create_tables() -> create_tables([?TABLE_DEF]).
create_tables([]) ->
ok;
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 4b5873a9..8bf71671 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -157,7 +157,7 @@ test_no_migration_on_shutdown() ->
with_sups(fun([Evil, _]) ->
?MS:start_child(Evil, childspec(worker)),
try
- call(worker, ping),
+ call(worker, ping, 10000, 100),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _, _} ->
ok
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 60000, 100).
+call(Id, Msg) -> call(Id, Msg, 5*24*60*60*1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
@@ -285,7 +285,7 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]);
kill(Pid, Waits) ->
erlang:monitor(process, Pid),
[erlang:monitor(process, P) || P <- Waits],
- exit(Pid, kill),
+ exit(Pid, bang),
kill_wait(Pid),
[kill_wait(P) || P <- Waits].