summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-11 11:32:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-11 11:32:09 +0100
commitbc973358fae4cbd6be23cd6572aed323340fdf08 (patch)
tree774d6286a28184af4d7846f13179833bc74f6b87
parent4da65790598f6387d99092e7d67745d34d3ab821 (diff)
parentacc7038fde6c9f244a4a0b8188cc293c1cbbe33c (diff)
downloadrabbitmq-server-bc973358fae4cbd6be23cd6572aed323340fdf08.tar.gz
Merge bug21446
-rw-r--r--src/gen_server2.erl14
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_amqqueue.erl98
-rw-r--r--src/rabbit_amqqueue_process.erl40
-rw-r--r--src/rabbit_amqqueue_sup.erl34
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl52
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_binding.erl6
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_misc.erl31
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl37
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_prequeue.erl103
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_tests.erl19
-rw-r--r--src/rabbit_variable_queue.erl25
-rw-r--r--src/rabbit_vhost.erl13
-rw-r--r--src/rabbit_vm.erl2
20 files changed, 355 insertions, 185 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ee82bcb3..d2f96b52 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -69,7 +69,9 @@
%% which will be passed into any of the callback functions in the new
%% module. Note there is no form also encompassing a reply, thus if
%% you wish to reply in handle_call/3 and change the callback module,
-%% you need to use gen_server2:reply/2 to issue the reply manually.
+%% you need to use gen_server2:reply/2 to issue the reply
+%% manually. The init function can similarly return a 5th argument,
+%% Module, in order to dynamically decide the callback module on init.
%%
%% 8) The callback module can optionally implement
%% format_message_queue/2 which is the equivalent of format_status/2
@@ -125,6 +127,7 @@
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% {ok, State, Timeout, Backoff}
+%%% {ok, State, Timeout, Backoff, Module}
%%% ignore
%%% {stop, Reason}
%%%
@@ -242,6 +245,8 @@
{ok, State :: term(), timeout() | hibernate} |
{ok, State :: term(), timeout() | hibernate,
{backoff, millis(), millis(), millis()}} |
+ {ok, State :: term(), timeout() | hibernate,
+ {backoff, millis(), millis(), millis()}, atom()} |
ignore |
{stop, Reason :: term()}.
-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
@@ -568,6 +573,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(GS2State #gs2_state { state = State,
time = Timeout,
timeout_state = Backoff1 });
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} ->
+ Backoff1 = extend_backoff(Backoff),
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { mod = Mod1,
+ state = State,
+ time = Timeout,
+ timeout_state = Backoff1 });
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b00a1ad7..bd34cf8b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -134,17 +134,10 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({mirror_queue_slave_sup,
- [{description, "mirror queue slave sup"},
- {mfa, {rabbit_sup, start_supervisor_child,
- [rabbit_mirror_queue_slave_sup]}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({mirrored_queues,
[{description, "adding mirrors to queues"},
{mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, mirror_queue_slave_sup},
+ {requires, recovery},
{enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 692179fc..6f299a9e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,7 +17,8 @@
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5, declare/6,
- delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
+ delete_immediately/1, delete/3, purge/1, forget_all_durable/1,
+ delete_crashed/1, delete_crashed_internal/1]).
-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -49,7 +50,7 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, absent_reason/0]).
-type(name() :: rabbit_types:r('queue')).
-type(qpids() :: [pid()]).
@@ -59,10 +60,11 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+-type(absent_reason() :: 'nodedown' | 'crashed').
-type(queue_or_absent() :: rabbit_types:amqqueue() |
- {'absent', rabbit_types:amqqueue()}).
--type(not_found_or_absent() :: 'not_found' |
- {'absent', rabbit_types:amqqueue()}).
+ {'absent', rabbit_types:amqqueue(),absent_reason()}).
+-type(not_found_or_absent() ::
+ 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}).
-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
@@ -74,8 +76,9 @@
-spec(declare/6 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
- -> {'new' | 'existing' | 'absent' | 'owner_died',
- rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
+ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
+ {'absent', rabbit_types:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
@@ -138,6 +141,8 @@
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
+-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -209,14 +214,14 @@ recover() ->
BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
- {rabbit_amqqueue_sup,
- {rabbit_amqqueue_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
- ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
- ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup),
+ ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
+ ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
@@ -241,9 +246,9 @@ find_durable_queues() ->
recover_durable_queues(QueuesAndRecoveryTerms) ->
{Results, Failures} =
- gen_server2:mcall([{start_queue_process(node(), Q),
- {init, {self(), Terms}}} ||
- {Q, Terms} <- QueuesAndRecoveryTerms]),
+ gen_server2:mcall(
+ [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery),
+ {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
[rabbit_log:error("Queue ~p failed to initialise: ~p~n",
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
@@ -269,7 +274,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
down_slave_nodes = [],
gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
- gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
+ {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -280,18 +287,14 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case not_found_or_absent(QueueName) of
- not_found -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
- B = add_default_binding(Q1),
- fun () -> B(), Q1 end;
- {absent, _Q} = R -> rabbit_misc:const(R)
+ not_found -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {absent, _Q, _} = R -> rabbit_misc:const(R)
end;
- [ExistingQ = #amqqueue{pid = QPid}] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
- fun () -> TailFun(), ExistingQ end
- end
+ [ExistingQ] ->
+ rabbit_misc:const(ExistingQ)
end
end).
@@ -342,10 +345,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
-start_queue_process(Node, Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
- Pid.
-
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
@@ -368,7 +367,7 @@ not_found_or_absent(Name) ->
%% rabbit_queue and not found anything
case mnesia:read({rabbit_durable_queue, Name}) of
[] -> not_found;
- [Q] -> {absent, Q} %% Q exists on stopped node
+ [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
end.
not_found_or_absent_dirty(Name) ->
@@ -377,7 +376,7 @@ not_found_or_absent_dirty(Name) ->
%% and only affect the error kind.
case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
{error, not_found} -> not_found;
- {ok, Q} -> {absent, Q}
+ {ok, Q} -> {absent, Q, nodedown}
end.
with(Name, F, E) ->
@@ -391,8 +390,11 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- timer:sleep(25),
- with(Name, F, E)
+ case crashed_or_recovering(Q) of
+ crashed -> E({absent, Q, crashed});
+ recovering -> timer:sleep(25),
+ with(Name, F, E)
+ end
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -401,10 +403,24 @@ with(Name, F, E) ->
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
- with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
+%% TODO we could say we are crashed when we mean recovering if we
+%% happen to call in the middle of a crash-failover. We could try to
+%% figure out whether that's happening by looking for the supervisor -
+%% but we'd need some additional book keeping to know what it is. And
+%% it will just mean a temporary glitch while crashing, which is
+%% fairly tolerable.
+crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
+ case lists:member(node(QPid), [node() | nodes()]) of
+ true -> crashed;
+ false -> recovering
+ end;
+crashed_or_recovering(_Q) ->
+ recovering.
+
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
@@ -565,6 +581,14 @@ delete_immediately(QPids) ->
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate:call(QPid, {delete, IfUnused, IfEmpty}).
+delete_crashed(#amqqueue{ pid = QPid } = Q) ->
+ rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
+
+delete_crashed_internal(#amqqueue{ name = QName }) ->
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQ:delete_crashed(QName),
+ ok = internal_delete(QName).
+
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 42c96807..12a3c9f0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -24,7 +24,7 @@
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
--export([start_link/1, info_keys/0]).
+-export([info_keys/0]).
-export([init_with_backing_queue_state/7]).
@@ -61,8 +61,6 @@
-ifdef(use_specs).
--spec(start_link/1 ::
- (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(init_with_backing_queue_state/7 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(),
@@ -102,8 +100,6 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-
info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
@@ -113,7 +109,8 @@ init(Q) ->
process_flag(trap_exit, true),
?store_proc_name(Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
+ ?MODULE}.
init_state(Q) ->
State = #q{q = Q,
@@ -140,7 +137,7 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
false -> #q{backing_queue = undefined,
backing_queue_state = undefined,
q = Q} = State,
- gen_server2:reply(From, {owner_died, Q}),
+ send_reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
{_, Terms} = recovery_status(Recover),
BQS = bq_init(BQ, Q, Terms),
@@ -152,12 +149,12 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
init_it2(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- {Recovery, TermsOrNew} = recovery_status(Recover),
- case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
+ {Barrier, TermsOrNew} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
#amqqueue{} = Q1 ->
- case matches(Recovery, Q, Q1) of
+ case matches(Recover, Q, Q1) of
true ->
- gen_server2:reply(From, {new, Q}),
+ send_reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
@@ -165,7 +162,7 @@ init_it2(Recover, From, State = #q{q = Q,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, TermsOrNew),
- recovery_barrier(Recovery),
+ recovery_barrier(Barrier),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -182,8 +179,11 @@ init_it2(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
-recovery_status(new) -> {new, new};
-recovery_status({Recover, Terms}) -> {Recover, Terms}.
+recovery_status(new) -> {no_barrier, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
+
+send_reply(none, _Q) -> ok;
+send_reply(From, Q) -> gen_server2:reply(From, Q).
matches(new, Q1, Q2) ->
%% i.e. not policy
@@ -197,7 +197,7 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-recovery_barrier(new) ->
+recovery_barrier(no_barrier) ->
ok;
recovery_barrier(BarrierPid) ->
MRef = erlang:monitor(process, BarrierPid),
@@ -232,8 +232,11 @@ terminate({shutdown, missing_owner} = Reason, State) ->
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
-terminate(Reason, State) ->
- terminate_shutdown(terminate_delete(true, Reason, State), State).
+terminate(normal, State) -> %% delete case
+ terminate_shutdown(terminate_delete(true, normal, State), State);
+%% If we crashed don't try to clean up the BQS, probably best to leave it.
+terminate(_Reason, State) ->
+ terminate_shutdown(fun (BQS) -> BQS end, State).
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
@@ -1084,6 +1087,9 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
+handle_cast(init, State) ->
+ init_it({no_barrier, non_clean_shutdown}, none, State);
+
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0fd64c26..465c0412 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,35 +18,33 @@
-behaviour(supervisor2).
--export([start_link/0, start_child/2]).
+-export([start_link/2]).
-export([init/1]).
-include("rabbit.hrl").
--define(SERVER, ?MODULE).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(start_child/2 ::
- (node(), [any()]) -> rabbit_types:ok(pid() | undefined) |
- rabbit_types:ok({pid(), any()}) |
- rabbit_types:error(any())).
+-spec(start_link/2 :: (rabbit_types:amqqueue(), rabbit_prequeue:start_mode()) ->
+ {'ok', pid(), pid()}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) ->
- supervisor2:start_child({?SERVER, Node}, Args).
-
-init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
- [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
- temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
+start_link(Q, StartMode) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ ChildSpec = {rabbit_amqqueue,
+ {rabbit_prequeue, start_link, [Q, StartMode, Marker]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
+ rabbit_mirror_queue_slave]},
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
+ unlink(Marker),
+ Marker ! stop,
+ {ok, SupPid, QPid}.
+
+init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
new file mode 100644
index 00000000..793cb7c9
--- /dev/null
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_amqqueue_sup_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_queue_process/3]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(),
+ 'declare' | 'recovery' | 'slave') -> pid()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_queue_process(Node, Q, StartMode) ->
+ {ok, _SupPid, QPid} = supervisor2:start_child(
+ {?SERVER, Node}, [Q, StartMode]),
+ QPid.
+
+init([]) ->
+ {ok, {{simple_one_for_one, 10, 10},
+ [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
+ temporary, ?MAX_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 098f5f43..310b8220 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -85,6 +85,10 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
+%% Called to clean up after a crashed queue. In this case we don't
+%% have a process and thus a state(), we are just removing on-disk data.
+-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'.
+
%% Remove all 'fetchable' messages from the queue, i.e. all messages
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index d887f26a..12082af8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) ->
absent_errs_only(Names) ->
Errs = [E || Name <- Names,
- {absent, _Q} = E <- [not_found_or_absent(Name)]],
+ {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]],
rabbit_misc:const(case Errs of
[] -> ok;
_ -> {error, {resources_missing, Errs}}
@@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) ->
{not_found, Name};
not_found_or_absent(#resource{kind = queue} = Name) ->
case rabbit_amqqueue:not_found_or_absent(Name) of
- not_found -> {not_found, Name};
- {absent, _Q} = R -> R
+ not_found -> {not_found, Name};
+ {absent, _Q, _Reason} = R -> R
end.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e5a90410..fc433898 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% must have been created between the stat and the
%% declare. Loop around again.
handle_method(Declare, none, State);
- {absent, Q} ->
- rabbit_misc:absent(Q);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
{owner_died, _Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
end;
- {error, {absent, Q}} ->
- rabbit_misc:absent(Q)
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
end,
- fun (not_found) -> {ok, 0};
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
@@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end) of
{error, {resources_missing, [{not_found, Name} | _]}} ->
rabbit_misc:not_found(Name);
- {error, {resources_missing, [{absent, Q} | _]}} ->
- rabbit_misc:absent(Q);
+ {error, {resources_missing, [{absent, Q, Reason} | _]}} ->
+ rabbit_misc:absent(Q, Reason);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9bccf5dd..1bea8042 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -24,7 +24,7 @@
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
msg_rates/1, info/2, invoke/3, is_duplicate/2]).
--export([start/1, stop/0]).
+-export([start/1, stop/0, delete_crashed/1]).
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
@@ -90,6 +90,9 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
+delete_crashed(_QName) ->
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
init(Q, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9e8c4a18..aec6f93d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -202,31 +202,20 @@ add_mirrors(QName, Nodes, SyncMode) ->
add_mirror(QName, MirrorNode, SyncMode) ->
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- start_child(Name, MirrorNode, Q, SyncMode);
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q, SyncMode)
- end
- end;
+ {ok, Q} ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun () ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ end);
{error, not_found} = E ->
E
end.
-start_child(Name, MirrorNode, Q, SyncMode) ->
- rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun () ->
- {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
- log_info(Name, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
- end).
-
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6d0064ab..2da2e7a5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2, info/1, go/2]).
+-export([set_maximum_since_use/2, info/1, go/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -71,8 +71,6 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -82,7 +80,7 @@ init(Q) ->
?store_proc_name(Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}}.
+ ?DESIRED_HIBERNATE}, ?MODULE}.
go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
go(SPid, async) -> gen_server2:cast(SPid, go).
@@ -122,6 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
+ ok = rabbit_queue_index:erase(QName), %% For crash recovery
BQS = bq_init(BQ, Q1, new),
State = #state { q = Q1,
gm = GM,
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
deleted file mode 100644
index b631cc31..00000000
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ /dev/null
@@ -1,37 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_mirror_queue_slave_sup).
-
--behaviour(supervisor2).
-
--export([start_link/0, start_child/2]).
-
--export([init/1]).
-
--include_lib("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
-start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
-
-init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
- [{rabbit_mirror_queue_slave,
- {rabbit_mirror_queue_slave, start_link, []},
- temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c4148bbf..77ac5c44 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, absent/1]).
+-export([not_found/1, absent/2]).
-export([type_class/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
@@ -119,7 +119,8 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
--spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
+-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason())
+ -> rabbit_types:channel_exit()).
-spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
@@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
-absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, nodedown) ->
%% The assertion of durability is mainly there because we mention
%% durability in the error message. That way we will hopefully
%% notice if at some future point our logic changes s.t. we get
%% here with non-durable queues.
protocol_error(not_found,
"home node '~s' of durable ~s is down or inaccessible",
- [node(QPid), rs(QueueName)]).
+ [node(QPid), rs(QueueName)]);
+
+absent(#amqqueue{name = QueueName}, crashed) ->
+ protocol_error(not_found,
+ "~s has crashed and failed to restart", [rs(QueueName)]).
type_class(byte) -> int;
type_class(short) -> int;
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
new file mode 100644
index 00000000..b1d92b89
--- /dev/null
+++ b/src/rabbit_prequeue.erl
@@ -0,0 +1,103 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_prequeue).
+
+%% This is the initial gen_server that all queue processes start off
+%% as. It handles the decision as to whether we need to start a new
+%% slave, a new master/unmirrored, or whether we are restarting (and
+%% if so, as what). Thus a crashing queue process can restart from here
+%% and always do the right thing.
+
+-export([start_link/3]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-behaviour(gen_server2).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([start_mode/0]).
+
+-type(start_mode() :: 'declare' | 'recovery' | 'slave').
+
+-spec(start_link/3 :: (rabbit_types:amqqueue(), start_mode(), pid())
+ -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Q, StartMode, Marker) ->
+ gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
+
+%%----------------------------------------------------------------------------
+
+init({Q, StartMode, Marker}) ->
+ init(Q, case {is_process_alive(Marker), StartMode} of
+ {true, slave} -> slave;
+ {true, _} -> master;
+ {false, _} -> restart
+ end).
+
+init(Q, master) -> rabbit_amqqueue_process:init(Q);
+init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
+
+init(#amqqueue{name = QueueName}, restart) ->
+ {ok, Q = #amqqueue{pid = QPid,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
+ Local = node(QPid) =:= node(),
+ Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> false = Local, %% assertion
+ rabbit_mirror_queue_slave:go(self(), async),
+ rabbit_mirror_queue_slave:init(Q); %% [1]
+ false -> case Local andalso Slaves =:= [] of
+ true -> crash_restart(Q); %% [2]
+ false -> timer:sleep(25),
+ init(Q, restart) %% [3]
+ end
+ end.
+%% [1] There is a master on another node. Regardless of whether we
+%% were originally a master or a slave, we are now a new slave.
+%%
+%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
+%%
+%% [3] The current master is dead but either there are alive slaves to
+%% take over or it's all happening on a different node anyway. This is
+%% not a stable situation. Sleep and wait for somebody else to make a
+%% move.
+
+crash_restart(Q = #amqqueue{name = QueueName}) ->
+ rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
+ gen_server2:cast(self(), init),
+ rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}).
+
+%%----------------------------------------------------------------------------
+
+%% This gen_server2 always hands over to some other module at the end
+%% of init/1.
+handle_call(_Msg, _From, _State) -> exit(unreachable).
+handle_cast(_Msg, _State) -> exit(unreachable).
+handle_info(_Msg, _State) -> exit(unreachable).
+terminate(_Reason, _State) -> exit(unreachable).
+code_change(_OldVsn, _State, _Extra) -> exit(unreachable).
+
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0f572866..f21b44bc 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_index).
--export([init/2, recover/5,
+-export([erase/1, init/2, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -200,6 +200,7 @@
{rabbit_types:msg_id(), non_neg_integer(), A})).
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
+-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
@@ -233,6 +234,13 @@
%% public API
%%----------------------------------------------------------------------------
+erase(Name) ->
+ #qistate { dir = Dir } = blank_state(Name),
+ case rabbit_file:is_dir(Dir) of
+ true -> rabbit_file:recursive_delete([Dir]);
+ false -> ok
+ end.
+
init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a186fb7a..7018bffe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1669,19 +1669,22 @@ test_declare_on_dead_queue(SecondaryNode) ->
Self ! {self(), killed, QPid}
end),
receive
- {Pid, killed, QPid} ->
- {existing, #amqqueue{name = QueueName,
- pid = QPid}} =
- rabbit_amqqueue:declare(QueueName, false, false, [], none),
- false = rabbit_misc:is_process_alive(QPid),
- {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
- none),
- true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {Pid, killed, OldPid} ->
+ Q = dead_queue_loop(QueueName, OldPid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
+dead_queue_loop(QueueName, OldPid) ->
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ case Q#amqqueue.pid of
+ OldPid -> timer:sleep(25),
+ dead_queue_loop(QueueName, OldPid);
+ _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ Q
+ end.
+
%%---------------------------------------------------------------------
control_action(Command, Args) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e97ed491..e858fb3d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,8 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
@@ -443,22 +444,25 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, Terms,
+%% We can be recovering a transient queue if it crashed
+init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ {PersistentClient, ContainsCheckFun} =
+ case IsDurable of
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun, AsyncCallback),
+ {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end};
+ false -> {undefined, fun(_MsgId) -> false end}
+ end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (MsgId) ->
- rabbit_msg_store:contains(MsgId, PersistentClient)
- end,
- MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
+ ContainsCheckFun, MsgIdxOnDiskFun),
+ init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
@@ -507,6 +511,9 @@ delete_and_terminate(_Reason, State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
+delete_crashed(QName) ->
+ ok = rabbit_queue_index:erase(QName).
+
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index cfa3add4..18d44225 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -94,13 +94,12 @@ delete(VHostPath) ->
[ok = Fun() || Fun <- Funs],
ok.
-assert_benign(ok) -> ok;
-assert_benign({ok, _}) -> ok;
-assert_benign({error, not_found}) -> ok;
-assert_benign({error, {absent, Q}}) ->
- %% We have a durable queue on a down node. Removing the mnesia
- %% entries here is safe. If/when the down node restarts, it will
- %% clear out the on-disk storage of the queue.
+assert_benign(ok) -> ok;
+assert_benign({ok, _}) -> ok;
+assert_benign({error, not_found}) -> ok;
+assert_benign({error, {absent, Q, _}}) ->
+ %% Removing the mnesia entries here is safe. If/when the down node
+ %% restarts, it will clear out the on-disk storage of the queue.
case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of
ok -> ok;
{error, not_found} -> ok
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 6fe65c12..212cf973 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -34,7 +34,7 @@
%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
- QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup],
+ QProcs = [rabbit_amqqueue_sup],
MsgIndexProcs = [msg_store_transient, msg_store_persistent],
MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),