summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-05-23 12:59:36 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-05-23 12:59:36 +0100
commit1c83a53cd0a40e69e48a016e18df81efb1803381 (patch)
tree0b7b714066067971264a0fc19246e9c79be873e4
parent4d89531345d95406e8127bef87e2f578307102a4 (diff)
parent853b30bb9cfd76ef9bf74129e61e1fe6541786fb (diff)
downloadrabbitmq-server-1c83a53cd0a40e69e48a016e18df81efb1803381.tar.gz
Merge bug24919
-rw-r--r--docs/rabbitmqctl.1.xml3
-rw-r--r--src/mirrored_supervisor.erl81
-rw-r--r--src/mirrored_supervisor_tests.erl6
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_direct.erl12
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl17
-rw-r--r--src/rabbit_networking.erl13
-rw-r--r--src/rabbit_sup.erl38
8 files changed, 84 insertions, 94 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 1effd691..3eb83c88 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -978,7 +978,8 @@
</varlistentry>
<varlistentry>
<term>type</term>
- <listitem><para>The exchange type (one of [<command>direct</command>,
+ <listitem><para>The exchange type (such as
+ [<command>direct</command>,
<command>topic</command>, <command>headers</command>,
<command>fanout</command>]).</para></listitem>
</varlistentry>
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fe93981..4fc488b8 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
+call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
+cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) ->
%% immediately after the tx - we can't be 100% here. So we may as
%% well dirty_select.
case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of
- [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity);
+ [Mirror] -> call(Mirror, Msg);
[] -> {error, not_found}
end.
@@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) ->
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- ?PG2:get_members(Group),
- D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]).
+ D <- [delegate(M)]]).
child(Sup, Id) ->
[Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
Id1 =:= Id],
Pid.
+delegate(Sup) -> child(Sup, delegate).
+mirroring(Sup) -> child(Sup, mirroring).
+
%%----------------------------------------------------------------------------
start_internal(Group, ChildSpecs) ->
@@ -288,28 +291,29 @@ handle_call({init, Overall}, _From,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
- ok = ?PG2:join(Group, self()),
- Rest = ?PG2:get_members(Group) -- [self()],
+ ok = ?PG2:join(Group, Overall),
+ Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
[] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
- ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}),
+ ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}),
erlang:monitor(process, Pid)
end || Pid <- Rest],
- Delegate = child(Overall, delegate),
+ Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
- State = #state{delegate = Delegate,
+ State = #state{overall = Overall,
+ delegate = Delegate,
group = Group}) ->
- {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
@@ -322,9 +326,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
- {reply, Delegate, State};
-
handle_call(group, _From, State = #state{group = Group}) ->
{reply, Group, State};
@@ -343,7 +344,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, Reason},
- State = #state{delegate = Pid, group = Group}) ->
+ State = #state{overall = Pid, group = Group}) ->
%% Since the delegate is temporary, its death won't cause us to
%% die. Since the overall supervisor kills processes in reverse
%% order when shutting down "from above" and we started after the
@@ -357,15 +358,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
- Self = self(),
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> []
+ [O | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(
+ fun() -> update_all(O, Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
case errors(R) of
[] -> {noreply, State};
@@ -384,13 +386,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
- [?GEN_SERVER:cast(P, {die, Reason}) ||
- P <- ?PG2:get_members(Group) -- [self()]].
+ [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Delegate, ChildSpec) ->
- case mnesia:transaction(fun() ->
- check_start(Group, Delegate, ChildSpec)
- end) of
+maybe_start(Group, Overall, Delegate, ChildSpec) ->
+ case mnesia:transaction(
+ fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
@@ -398,31 +398,29 @@ maybe_start(Group, Delegate, ChildSpec) ->
{aborted, E} -> {error, E}
end.
-check_start(Group, Delegate, ChildSpec) ->
+check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
- [] -> write(Group, ChildSpec),
+ [] -> write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
- case self() of
+ case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
- dead -> write(Group, ChildSpec),
+ dead -> write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
end
end.
-supervisor(Pid) ->
- with_exit_handler(
- fun() -> dead end,
- fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end).
+supervisor(Pid) -> with_exit_handler(fun() -> dead end,
+ fun() -> delegate(Pid) end).
-write(Group, ChildSpec) ->
+write(Group, Overall, ChildSpec) ->
ok = mnesia:write(
#mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = self(),
+ mirroring_pid = Overall,
childspec = ChildSpec}),
ChildSpec.
@@ -448,12 +446,12 @@ check_stop(Group, Delegate, Id) ->
id({Id, _, _, _, _, _}) -> Id.
-update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+update_all(Overall, OldOverall) ->
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
key = '$1',
childspec = '$2',
_ = '_'},
- [write(Group, C) ||
+ [write(Group, Overall, C) ||
[{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->
@@ -467,8 +465,7 @@ errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
-create_tables() ->
- create_tables([?TABLE_DEF]).
+create_tables() -> create_tables([?TABLE_DEF]).
create_tables([]) ->
ok;
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 60192b34..f8cbd853 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -157,7 +157,7 @@ test_no_migration_on_shutdown() ->
with_sups(fun([Evil, _]) ->
?MS:start_child(Evil, childspec(worker)),
try
- call(worker, ping),
+ call(worker, ping, 1000, 100),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _, _} ->
ok
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 1000, 100).
+call(Id, Msg) -> call(Id, Msg, 10*1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
@@ -285,7 +285,7 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]);
kill(Pid, Waits) ->
erlang:monitor(process, Pid),
[erlang:monitor(process, P) || P <- Waits],
- exit(Pid, kill),
+ exit(Pid, bang),
kill_wait(Pid),
[kill_wait(P) || P <- Waits].
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ea9731b6..df009529 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -60,7 +60,8 @@
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
- {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [worker_pool_sup]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).
@@ -143,7 +144,8 @@
-rabbit_boot_step({mirror_queue_slave_sup,
[{description, "mirror queue slave sup"},
- {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [rabbit_mirror_queue_slave_sup]}},
{requires, recovery},
{enables, routing_ready}]}).
@@ -538,7 +540,7 @@ boot_error(Format, Args) ->
boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
- rabbit_sup:start_child(delegate_sup, [Count]).
+ rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a471d282..c07ad832 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -47,16 +47,10 @@
%%----------------------------------------------------------------------------
-boot() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
+boot() -> rabbit_sup:start_supervisor_child(
+ rabbit_direct_client_sup, rabbit_client_sup,
[{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+ {rabbit_channel_sup, start_link, []}]).
force_event_refresh() ->
[Pid ! force_event_refresh || Pid<- list()],
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 8eacb1f3..a2034876 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start/0, start_link/0, start_child/2]).
+-export([start_link/0, start_child/2]).
-export([init/1]).
@@ -26,20 +26,9 @@
-define(SERVER, ?MODULE).
-start() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_mirror_queue_slave_sup,
- {rabbit_mirror_queue_slave_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
- ok.
+start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) ->
- supervisor2:start_child({?SERVER, Node}, Args).
+start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
{ok, {{simple_one_for_one_terminate, 10, 10},
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f0c75d23..78deea97 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -136,15 +136,10 @@ boot_ssl() ->
ok
end.
-start() ->
- {ok,_} = supervisor2:start_child(
- rabbit_sup,
- {rabbit_tcp_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_tcp_client_sup},
- {rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+start() -> rabbit_sup:start_supervisor_child(
+ rabbit_tcp_client_sup, rabbit_client_sup,
+ [{local, rabbit_tcp_client_sup},
+ {rabbit_connection_sup,start_link,[]}]).
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index bf2b4798..f142d233 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -19,6 +19,8 @@
-behaviour(supervisor).
-export([start_link/0, start_child/1, start_child/2, start_child/3,
+ start_supervisor_child/1, start_supervisor_child/2,
+ start_supervisor_child/3,
start_restartable_child/1, start_restartable_child/2, stop_child/1]).
-export([init/1]).
@@ -33,7 +35,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_child/1 :: (atom()) -> 'ok').
+-spec(start_child/2 :: (atom(), [any()]) -> 'ok').
-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/1 :: (atom()) -> 'ok').
+-spec(start_supervisor_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(start_restartable_child/1 :: (atom()) -> 'ok').
-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
@@ -42,22 +48,29 @@
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
-start_child(Mod) ->
- start_child(Mod, []).
+start_child(Mod) -> start_child(Mod, []).
-start_child(Mod, Args) ->
- start_child(Mod, Mod, Args).
+start_child(Mod, Args) -> start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- child_reply(supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]})).
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
+
+start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
+
+start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
+
+start_supervisor_child(ChildId, Mod, Args) ->
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, infinity, supervisor, [Mod]})).
-start_restartable_child(Mod) ->
- start_restartable_child(Mod, []).
+start_restartable_child(Mod) -> start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
@@ -73,8 +86,7 @@ stop_child(ChildId) ->
E -> E
end.
-init([]) ->
- {ok, {{one_for_all, 0, 1}, []}}.
+init([]) -> {ok, {{one_for_all, 0, 1}, []}}.
%%----------------------------------------------------------------------------