From c06cf2cbf604aa26936d5a2f67705b3a55ee43ce Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 13:29:19 +0100 Subject: First step along the way to queue restarts. Create a "prequeue" module whose job it is to decide how to proceed, and then have the supervisor start that. At the moment we only handle rabbit_amqqueue_process startup through this, but that will change. Also move the tx which decides whether we successfully declared the queue or not into the first part of the queue startup (before we call into the queue) since in future it will be needed for other things. --- src/rabbit_amqqueue.erl | 47 ++++++-------- src/rabbit_amqqueue_process.erl | 138 ++++++++++++++++------------------------ src/rabbit_amqqueue_sup.erl | 2 +- src/rabbit_prequeue.erl | 104 ++++++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 112 deletions(-) create mode 100644 src/rabbit_prequeue.erl diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 692179fc..e625572e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ cancel_sync_mirrors/1]). %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/1, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -76,9 +76,9 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). --spec(internal_declare/2 :: - (rabbit_types:amqqueue(), boolean()) - -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). +%% -spec(internal_declare/2 :: +%% (rabbit_types:amqqueue(), boolean()) +%% -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) @@ -196,6 +196,8 @@ arguments]). recover() -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_recovery, Marker), %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -212,7 +214,11 @@ recover() -> {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). + Recovered = recover_durable_queues( + lists:zip(DurableQueues, OrderedRecoveryTerms)), + unlink(Marker), + Marker ! stop, + Recovered. stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -271,29 +277,14 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). -internal_declare(Q, true) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); -internal_declare(Q = #amqqueue{name = QueueName}, false) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case not_found_or_absent(QueueName) of - not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - {absent, _Q} = R -> rabbit_misc:const(R) - end; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), - fun () -> TailFun(), ExistingQ end - end - end - end). +internal_declare(Q = #amqqueue{name = QueueName}) -> + case not_found_or_absent(QueueName) of + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + %% TODO can we simplify return here? + {new, fun () -> B(), Q end}; + {absent, _Q} = R -> R + end. update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index db297c1d..1c982dbb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,9 @@ -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster --export([start_link/1, info_keys/0]). +-export([info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_declared/3, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -61,8 +61,8 @@ -ifdef(use_specs). --spec(start_link/1 :: - (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). +%% -spec(start_link/1 :: +%% (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/7 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), @@ -102,19 +102,64 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(Q) -> +init(_) -> + exit(cannot_be_called_directly). + +%% We have just been declared or recovered +init_declared(Recover, From, Q = #amqqueue{name = QName, + exclusive_owner = Owner}) -> process_flag(trap_exit, true), - ?store_proc_name(Q#amqqueue.name), - {ok, init_state(Q#amqqueue{pid = self()}), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + ?store_proc_name(QName), + State = init_state(Q), + case Owner of + none -> finish_init(Recover, From, State); + _ -> case rabbit_misc:is_process_alive(Owner) of %% [1] + true -> erlang:monitor(process, Owner), + finish_init(Recover, From, State); + false -> gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, + backing_queue_state = BQS}} + end + end. +%% [1] You used to be able to declare an exclusive durable +%% queue. Sadly we need to still tidy up after that case, there could +%% be the remnants of one left over from an upgrade. So that's why we +%% don't enforce Recover = new here. + +finish_init(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> + {Recovery, TermsOrNew} = recovery_status(Recover), + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), + State1 = process_args_policy(State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, State1), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + {become, ?MODULE, State1, hibernate}. + +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. +%% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> case Owner of @@ -174,54 +219,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of - #amqqueue{} = Q1 -> - case matches(Recovery, Q, Q1) of - true -> - gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), - State1 = process_args_policy( - State#q{backing_queue = BQ, - backing_queue_state = BQS}), - notify_decorators(startup, State), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - false -> - {stop, normal, {existing, Q1}, State} - end; - Err -> - {stop, normal, Err, State} - end. - -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. - -matches(new, Q1, Q2) -> - %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso - Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; -matches(_, Q, Q) -> true; -matches(_, _Q, _Q1) -> false. - maybe_notify_decorators(false, State) -> State; maybe_notify_decorators(true, State) -> notify_decorators(State), State. @@ -915,31 +912,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - declare(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; - handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0fd64c26..137422d4 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -48,5 +48,5 @@ start_child(Node, Args) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, + [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl new file mode 100644 index 00000000..148f5968 --- /dev/null +++ b/src/rabbit_prequeue.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_prequeue). + +%% This is the initial gen_server that all queue processes start off +%% as. It handles the decision as to whether we need to start a new +%% slave, a new master/unmirrored, whether we lost a race to declare a +%% new queue, or whether we are in recovery. Thus a crashing queue +%% process can restart from here and always do the right thing. + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-behaviour(gen_server2). + +-include("rabbit.hrl"). + +start_link(Q) -> + gen_server2:start_link(?MODULE, Q, []). + +%%---------------------------------------------------------------------------- + +init(Q) -> + %% Hand back to supervisor ASAP + gen_server2:cast(self(), init), + {ok, Q#amqqueue{pid = self()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(init, Q) -> + case whereis(rabbit_recovery) of + undefined -> init_non_recovery(Q); + _Pid -> init_recovery(Q) + end; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +init_non_recovery(Q = #amqqueue{name = QueueName}) -> + Result = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + {decl, rabbit_amqqueue:internal_declare(Q)}; + [ExistingQ = #amqqueue{pid = QPid}] -> + case rabbit_misc:is_process_alive(QPid) of + true -> {decl, {existing, ExistingQ}}; + false -> exit(todo) + end + end + end), + case Result of + {decl, DeclResult} -> + %% We have just been declared. Block waiting for an init + %% call so that we don't respond to any other message first + receive {'$gen_call', From, {init, new}} -> + case DeclResult of + {new, Fun} -> + Q1 = Fun(), + rabbit_amqqueue_process:init_declared(new,From, Q1); + {F, _} when F =:= absent; F =:= existing -> + gen_server2:reply(From, DeclResult), + {stop, normal, Q} + end + end + end. + +init_recovery(Q) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok = rabbit_amqqueue:store_queue(Q) end), + %% Again block waiting for an init call. + receive {'$gen_call', From, {init, Terms}} -> + rabbit_amqqueue_process:init_declared(Terms, From, Q) + end. -- cgit v1.2.1 From f0ee2e3a51f3635c69b0058283cb58d1ef35530a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 13:58:13 +0100 Subject: Various spec-ish fixes. --- src/rabbit_amqqueue.erl | 10 ++++------ src/rabbit_amqqueue_process.erl | 2 -- src/rabbit_prequeue.erl | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e625572e..b93b6be6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -59,8 +59,6 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(queue_or_absent() :: rabbit_types:amqqueue() | - {'absent', rabbit_types:amqqueue()}). -type(not_found_or_absent() :: 'not_found' | {'absent', rabbit_types:amqqueue()}). -spec(recover/0 :: () -> [rabbit_types:amqqueue()]). @@ -76,9 +74,10 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -%% -spec(internal_declare/2 :: -%% (rabbit_types:amqqueue(), boolean()) -%% -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). +-spec(internal_declare/1 :: + (rabbit_types:amqqueue()) + -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | + {'absent', rabbit_types:amqqueue()}). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) @@ -281,7 +280,6 @@ internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of not_found -> ok = store_queue(Q), B = add_default_binding(Q), - %% TODO can we simplify return here? {new, fun () -> B(), Q end}; {absent, _Q} = R -> R end. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1c982dbb..951542f8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,8 +61,6 @@ -ifdef(use_specs). -%% -spec(start_link/1 :: -%% (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/7 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 148f5968..07df581b 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -44,7 +44,7 @@ init(Q) -> ?DESIRED_HIBERNATE}}. handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, State}. + {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. handle_cast(init, Q) -> case whereis(rabbit_recovery) of -- cgit v1.2.1 From 640fc3b30d9044e94b8466f233b58fe9dd5876cd Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 14:55:42 +0100 Subject: Roll slave startup into the new mechanism. --- src/rabbit.erl | 9 +------ src/rabbit_amqqueue.erl | 9 +++---- src/rabbit_amqqueue_process.erl | 3 +-- src/rabbit_amqqueue_sup.erl | 15 ++++++------ src/rabbit_mirror_queue_misc.erl | 8 ++++--- src/rabbit_mirror_queue_slave.erl | 45 ++++++++++------------------------- src/rabbit_mirror_queue_slave_sup.erl | 37 ---------------------------- src/rabbit_prequeue.erl | 42 +++++++++++++++++++++++++------- src/rabbit_vm.erl | 2 +- 9 files changed, 65 insertions(+), 105 deletions(-) delete mode 100644 src/rabbit_mirror_queue_slave_sup.erl diff --git a/src/rabbit.erl b/src/rabbit.erl index b00a1ad7..bd34cf8b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -134,17 +134,10 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({mirror_queue_slave_sup, - [{description, "mirror queue slave sup"}, - {mfa, {rabbit_sup, start_supervisor_child, - [rabbit_mirror_queue_slave_sup]}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({mirrored_queues, [{description, "adding mirrors to queues"}, {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, mirror_queue_slave_sup}, + {requires, recovery}, {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b93b6be6..e25e0f97 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -246,7 +246,7 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = - gen_server2:mcall([{start_queue_process(node(), Q), + gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q), {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", @@ -274,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). + gen_server2:call( + rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of @@ -331,10 +332,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -start_queue_process(Node, Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Pid. - add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 951542f8..84832f9f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -105,8 +105,7 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(_) -> - exit(cannot_be_called_directly). +init(_) -> exit(cannot_be_called_directly). %% We have just been declared or recovered init_declared(Recover, From, Q = #amqqueue{name = QName, diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 137422d4..149014e8 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_child/2]). +-export([start_link/0, start_queue_process/2]). -export([init/1]). @@ -31,10 +31,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_child/2 :: - (node(), [any()]) -> rabbit_types:ok(pid() | undefined) | - rabbit_types:ok({pid(), any()}) | - rabbit_types:error(any())). +-spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()). -endif. @@ -43,10 +40,12 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Node, Args) -> - supervisor2:start_child({?SERVER, Node}, Args). +start_queue_process(Node, Q) -> + {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]), + Pid. init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. + temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9e8c4a18..86f73366 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -220,11 +220,13 @@ start_child(Name, MirrorNode, Q, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), + SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q), log_info(Name, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) + case SyncMode of + sync -> rabbit_mirror_queue_slave:await(SPid); + async -> ok + end end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6d0064ab..7f65af65 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1, go/2]). +-export([set_maximum_since_use/2, info/1, init_slave/1, await/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -71,23 +71,17 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q) -> - ?store_proc_name(Q#amqqueue.name), - {ok, {not_started, Q}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}. +await(SPid) -> gen_server2:call(SPid, await, infinity). -go(SPid, sync) -> gen_server2:call(SPid, go, infinity); -go(SPid, async) -> gen_server2:cast(SPid, go). +init(_) -> exit(cannot_be_called_directly). -handle_go(Q = #amqqueue{name = QName}) -> +init_slave(Q = #amqqueue{name = QName}) -> + ?store_proc_name(QName), %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -141,25 +135,26 @@ handle_go(Q = #amqqueue{name = QName}) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {ok, State}; + {become, ?MODULE, State, hibernate}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), - {error, {stale_master_pid, StalePid}}; + {stop, {stale_master_pid, StalePid}, Q}; duplicate_live_master -> gm:leave(GM), - {error, {duplicate_live_master, Node}}; + {stop, {duplicate_live_master, Node}, Q}; existing -> gm:leave(GM), - {error, normal}; + {stop, normal, Q}; + %% TODO what about this case? master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {error, normal} + {stop, normal, Q} end. init_it(Self, GM, Node, QName) -> @@ -193,11 +188,8 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). -handle_call(go, _From, {not_started, Q} = NotStarted) -> - case handle_go(Q) of - {ok, State} -> {reply, ok, State}; - {error, Error} -> {stop, Error, NotStarted} - end; +handle_call(await, _From, State) -> + {reply, ok, State}; handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { @@ -235,12 +227,6 @@ handle_call({gm_deaths, DeadGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). -handle_cast(go, {not_started, Q} = NotStarted) -> - case handle_go(Q) of - {ok, State} -> {noreply, State}; - {error, Error} -> {stop, Error, NotStarted} - end; - handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -321,8 +307,6 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. -terminate(_Reason, {not_started, _Q}) -> - ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -361,9 +345,6 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_pre_hibernate({not_started, _Q} = State) -> - {hibernate, State}; - handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl deleted file mode 100644 index b631cc31..00000000 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ /dev/null @@ -1,37 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. -%% - --module(rabbit_mirror_queue_slave_sup). - --behaviour(supervisor2). - --export([start_link/0, start_child/2]). - --export([init/1]). - --include_lib("rabbit.hrl"). - --define(SERVER, ?MODULE). - -start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). - -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_mirror_queue_slave, - {rabbit_mirror_queue_slave, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 07df581b..ddf14326 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -71,16 +71,13 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> - {decl, rabbit_amqqueue:internal_declare(Q)}; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> {decl, {existing, ExistingQ}}; - false -> exit(todo) - end + {declared, rabbit_amqqueue:internal_declare(Q)}; + [ExistingQ] -> + init_existing(ExistingQ) end end), case Result of - {decl, DeclResult} -> + {declared, DeclResult} -> %% We have just been declared. Block waiting for an init %% call so that we don't respond to any other message first receive {'$gen_call', From, {init, new}} -> @@ -92,9 +89,38 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> gen_server2:reply(From, DeclResult), {stop, normal, Q} end - end + end; + new_slave -> + rabbit_mirror_queue_slave:init_slave(Q); + crash_restart -> + exit(todo); + sleep_retry -> + timer:sleep(25), + init_non_recovery(Q) end. +init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> + Alive = fun rabbit_misc:is_process_alive/1, + case {Alive(QPid), node(QPid) =:= node()} of + {true, true} -> {declared, {existing, Q}}; %% [1] + {true, false} -> new_slave; %% [2] + {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of + [] -> crash_restart; %% [3] + _ -> sleep_retry %% [4] + end + end. +%% [1] Lost a race to declare a queue - just return the winner. +%% +%% [2] There is a master on another node. Regardless of whether we +%% just crashed (as a master or slave) and restarted or were asked to +%% start as a slave, we are now a new slave. +%% +%% [3] Nothing is alive. We must have just died. Try to restart as a master. +%% +%% [4] The current master is dead but there are alive slaves. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move - those slaves should either promote one of their own or die. + init_recovery(Q) -> rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q) end), diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 6fe65c12..212cf973 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -34,7 +34,7 @@ %% Like erlang:memory(), but with awareness of rabbit-y things memory() -> ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], - QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup], + QProcs = [rabbit_amqqueue_sup], MsgIndexProcs = [msg_store_transient, msg_store_persistent], MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), -- cgit v1.2.1 From a34e13465725855a8707a8897a12088ec49d1a29 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 15:33:24 +0100 Subject: Another copy of the master_in_recovery check. Not sure that's very elegant, but I don't think we have much choice. --- src/rabbit_amqqueue.erl | 10 ++++---- src/rabbit_amqqueue_sup.erl | 9 +++---- src/rabbit_mirror_queue_misc.erl | 3 ++- src/rabbit_prequeue.erl | 52 +++++++++++++++++++++++++++++----------- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e25e0f97..391da1ae 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -246,9 +246,9 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = - gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q), - {init, {self(), Terms}}} || - {Q, Terms} <- QueuesAndRecoveryTerms]), + gen_server2:mcall( + [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery), + {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. @@ -274,8 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call( - rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity). + gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare), + {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 149014e8..8b6fcc01 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_queue_process/2]). +-export([start_link/0, start_queue_process/3]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()). +-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), + 'declare' | 'recovery' | 'slave') -> pid()). -endif. @@ -40,8 +41,8 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_queue_process(Node, Q) -> - {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]), +start_queue_process(Node, Q, Hint) -> + {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), Pid. init([]) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 86f73366..017a5187 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -220,7 +220,8 @@ start_child(Name, MirrorNode, Q, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q), + SPid = rabbit_amqqueue_sup:start_queue_process( + MirrorNode, Q, slave), log_info(Name, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), case SyncMode of diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index ddf14326..2bfffa28 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -22,7 +22,7 @@ %% new queue, or whether we are in recovery. Thus a crashing queue %% process can restart from here and always do the right thing. --export([start_link/1]). +-export([start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,25 +31,36 @@ -include("rabbit.hrl"). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +%%-spec(start_link/2 :: () -> rabbit_types:ok_pid_or_error()). + +-endif. %%---------------------------------------------------------------------------- -init(Q) -> +start_link(Q, Hint) -> + gen_server2:start_link(?MODULE, {Q, Hint}, []). + +%%---------------------------------------------------------------------------- + +init({Q, Hint}) -> %% Hand back to supervisor ASAP gen_server2:cast(self(), init), - {ok, Q#amqqueue{pid = self()}, hibernate, + {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, Q) -> +handle_cast(init, {Q, Hint}) -> case whereis(rabbit_recovery) of - undefined -> init_non_recovery(Q); - _Pid -> init_recovery(Q) + undefined -> init_non_recovery(Q, Hint); + _Pid -> recovery = Hint, %% assertion + init_recovery(Q) end; handle_cast(Msg, State) -> @@ -66,14 +77,12 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_non_recovery(Q = #amqqueue{name = QueueName}) -> +init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> Result = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - {declared, rabbit_amqqueue:internal_declare(Q)}; - [ExistingQ] -> - init_existing(ExistingQ) + [] -> init_missing(Q, Hint); + [ExistingQ] -> init_existing(ExistingQ) end end), case Result of @@ -96,8 +105,23 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> exit(todo); sleep_retry -> timer:sleep(25), - init_non_recovery(Q) + init_non_recovery(Q, Hint); + master_in_recovery -> + {stop, normal, Q} + end. + +%% The Hint is how we were originally started. Of course, if we +%% crashed it might no longer be true - but we can only get here if +%% there is no Mnesia record, which should mean we can't be here if we +%% crashed. +init_missing(Q, Hint) -> + case Hint of + declare -> {declared, rabbit_amqqueue:internal_declare(Q)}; + slave -> master_in_recovery %% [1] end. +%% [1] This is the same concept as the master_in_recovery case in the +%% slave startup code. Unfortunately since we start slaves with two +%% transactions we need to check twice. init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> Alive = fun rabbit_misc:is_process_alive/1, -- cgit v1.2.1 From e00d04982ae1b630939120dc4f1c94e95a1cc63d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 15:39:48 +0100 Subject: Remove a pointless check here - we will check from within an Mnesia tx in rabbit_mirror_queue_slave:init_it/4 anyway, which makes much more sense (and it will make little sense to check at slave start if we can restart a crashed slave). --- src/rabbit_mirror_queue_misc.erl | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 017a5187..7c7fbbeb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -202,34 +202,23 @@ add_mirrors(QName, Nodes, SyncMode) -> add_mirror(QName, MirrorNode, SyncMode) -> case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end; + {ok, Q} -> + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + SPid = rabbit_amqqueue_sup:start_queue_process( + MirrorNode, Q, slave), + log_info(QName, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), + case SyncMode of + sync -> rabbit_mirror_queue_slave:await(SPid); + async -> ok + end + end); {error, not_found} = E -> E end. -start_child(Name, MirrorNode, Q, SyncMode) -> - rabbit_misc:with_exit_handler( - rabbit_misc:const(ok), - fun () -> - SPid = rabbit_amqqueue_sup:start_queue_process( - MirrorNode, Q, slave), - log_info(Name, "Adding mirror on node ~p: ~p~n", - [MirrorNode, SPid]), - case SyncMode of - sync -> rabbit_mirror_queue_slave:await(SPid); - async -> ok - end - end). - report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> -- cgit v1.2.1 From 08ed6e8c220693d8dfe94f28de84debe8aa4383d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 16:12:55 +0100 Subject: Allow crashing queues to recover themselves. --- src/rabbit_amqqueue_process.erl | 79 +++++++++++++++++++++++------------------ src/rabbit_amqqueue_sup.erl | 2 +- src/rabbit_prequeue.erl | 3 +- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 84832f9f..590a8be0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -136,15 +136,15 @@ init_declared(Recover, From, Q = #amqqueue{name = QName, finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - gen_server2:reply(From, {new, Q}), + send_reply(From, Q), + {RecoveryPid, TermsOrNew} = recovery_status(Recover), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q), BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), + recovery_barrier(RecoveryPid), State1 = process_args_policy(State#q{backing_queue = BQ, backing_queue_state = BQS}), notify_decorators(startup, State1), @@ -153,8 +153,20 @@ finish_init(Recover, From, State = #q{q = Q, fun() -> emit_stats(State1) end), {become, ?MODULE, State1, hibernate}. -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. +recovery_status(new) -> {no_barrier, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. + +recovery_barrier(no_barrier) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + +send_reply(none, _Q) -> ok; +send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). %% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -194,8 +206,10 @@ terminate({shutdown, missing_owner} = Reason, State) -> terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State) -> - terminate_shutdown(terminate_delete(true, Reason, State), State). +terminate(normal, State) -> %% auto-delete case + terminate_shutdown(terminate_delete(true, normal, State), State); +terminate(_Reason, State) -> + terminate_crash(State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, @@ -211,6 +225,30 @@ terminate_delete(EmitStats, Reason, BQS1 end. +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = + lists:foldl(fun (F, S) -> F(S) end, State, + [fun stop_sync_timer/1, + fun stop_rate_timer/1, + fun stop_expiry_timer/1, + fun stop_ttl_timer/1]), + case BQS of + undefined -> State1; + _ -> ok = rabbit_memory_monitor:deregister(self()), + QName = qname(State), + notify_decorators(shutdown, State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _, _} <- + rabbit_queue_consumers:all(Consumers)], + State1#q{backing_queue_state = Fun(BQS)} + end. + +terminate_crash(State = #q{consumers = Consumers}) -> + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], + ok. + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -244,15 +282,6 @@ bq_init(BQ, Q, Recover) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). -recovery_barrier(new) -> - ok; -recovery_barrier(BarrierPid) -> - MRef = erlang:monitor(process, BarrierPid), - receive - {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); - {'DOWN', MRef, process, _, _} -> ok - end. - process_args_policy(State = #q{q = Q, args_policy_version = N}) -> ArgsTable = @@ -304,24 +333,6 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. -terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS, consumers = Consumers} = - lists:foldl(fun (F, S) -> F(S) end, State, - [fun stop_sync_timer/1, - fun stop_rate_timer/1, - fun stop_expiry_timer/1, - fun stop_ttl_timer/1]), - case BQS of - undefined -> State1; - _ -> ok = rabbit_memory_monitor:deregister(self()), - QName = qname(State), - notify_decorators(shutdown, State), - [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _, _} <- - rabbit_queue_consumers:all(Consumers)], - State1#q{backing_queue_state = Fun(BQS)} - end. - reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 8b6fcc01..99909e55 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -48,5 +48,5 @@ start_queue_process(Node, Q, Hint) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 2bfffa28..20808b1f 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -102,7 +102,8 @@ init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> new_slave -> rabbit_mirror_queue_slave:init_slave(Q); crash_restart -> - exit(todo); + rabbit_amqqueue_process:init_declared( + {no_barrier, non_clean_shutdown}, none, Q); sleep_retry -> timer:sleep(25), init_non_recovery(Q, Hint); -- cgit v1.2.1 From 33544e9bacdbe59ffdf6515a32abad29882aae78 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 19 Aug 2014 16:48:34 +0100 Subject: Log that we recovered. Also store updated pid in Mnesia! --- src/rabbit_prequeue.erl | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 20808b1f..4f5ce244 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -101,9 +101,17 @@ init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> end; new_slave -> rabbit_mirror_queue_slave:init_slave(Q); - crash_restart -> + {crash_restart, Q1} -> + rabbit_log:error( + "Recovering persistent messages from crashed ~s.~n", + [rabbit_misc:rs(QueueName)]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = rabbit_amqqueue:store_queue(Q1#amqqueue{pid = Self}) + end), rabbit_amqqueue_process:init_declared( - {no_barrier, non_clean_shutdown}, none, Q); + {no_barrier, non_clean_shutdown}, none, Q1); sleep_retry -> timer:sleep(25), init_non_recovery(Q, Hint); @@ -127,11 +135,11 @@ init_missing(Q, Hint) -> init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> Alive = fun rabbit_misc:is_process_alive/1, case {Alive(QPid), node(QPid) =:= node()} of - {true, true} -> {declared, {existing, Q}}; %% [1] - {true, false} -> new_slave; %% [2] + {true, true} -> {declared, {existing, Q}}; %% [1] + {true, false} -> new_slave; %% [2] {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of - [] -> crash_restart; %% [3] - _ -> sleep_retry %% [4] + [] -> {crash_restart, Q}; %% [3] + _ -> sleep_retry %% [4] end end. %% [1] Lost a race to declare a queue - just return the winner. -- cgit v1.2.1 From 403d476e5e077b9753bccc4c1404a214517c4f5c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 11:03:38 +0100 Subject: Add an intermediate supervisor so that if a queue exceeds its restart limit we don't bring the whole server down! --- src/rabbit_amqqueue.erl | 17 +++++++------- src/rabbit_amqqueue_sup.erl | 29 ++++++++++------------- src/rabbit_amqqueue_sup_sup.erl | 51 ++++++++++++++++++++++++++++++++++++++++ src/rabbit_mirror_queue_misc.erl | 2 +- 4 files changed, 73 insertions(+), 26 deletions(-) create mode 100644 src/rabbit_amqqueue_sup_sup.erl diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 391da1ae..8f25bf2e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -210,9 +210,9 @@ recover() -> BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, - {rabbit_amqqueue_sup, - {rabbit_amqqueue_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), Recovered = recover_durable_queues( lists:zip(DurableQueues, OrderedRecoveryTerms)), unlink(Marker), @@ -220,8 +220,8 @@ recover() -> Recovered. stop() -> - ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), - ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup), + ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), + ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:stop(). @@ -247,7 +247,7 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = gen_server2:mcall( - [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery), + [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery), {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", [Pid, Error]) || {Pid, Error} <- Failures], @@ -274,8 +274,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare), - {init, new}, infinity). + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), + {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 99909e55..591de408 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,35 +18,30 @@ -behaviour(supervisor2). --export([start_link/0, start_queue_process/3]). +-export([start_link/2]). -export([init/1]). -include("rabbit.hrl"). --define(SERVER, ?MODULE). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), - 'declare' | 'recovery' | 'slave') -> pid()). +-spec(start_link/2 :: + (rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') -> + {'ok', pid(), pid()}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_queue_process(Node, Q, Hint) -> - {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), - Pid. +start_link(Q, Hint) -> + ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]}, + transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + rabbit_mirror_queue_slave]}, + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), + {ok, SupPid, QPid}. -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, - rabbit_mirror_queue_slave]}]}}. +init([]) -> {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl new file mode 100644 index 00000000..6870f7c4 --- /dev/null +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_queue_process/3]). + +-export([init/1]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), + 'declare' | 'recovery' | 'slave') -> pid()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_queue_process(Node, Q, Hint) -> + {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), + QPid. + +init([]) -> + {ok, {{simple_one_for_one, 10, 10}, + [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, + temporary, ?MAX_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 7c7fbbeb..5cb871e8 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -206,7 +206,7 @@ add_mirror(QName, MirrorNode, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup:start_queue_process( + SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), -- cgit v1.2.1 From fca583793b4b35550daff097a78b57ea4ee48871 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 11:07:14 +0100 Subject: We handled that case in 359da524140b --- src/rabbit_mirror_queue_slave.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7f65af65..c8bff0c4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -147,7 +147,6 @@ init_slave(Q = #amqqueue{name = QName}) -> existing -> gm:leave(GM), {stop, normal, Q}; - %% TODO what about this case? master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting -- cgit v1.2.1 From 5985501260a9f95d4de8002dbb069098cec09266 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 14:34:35 +0100 Subject: Allow transient queues to recover. --- src/rabbit_variable_queue.erl | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e97ed491..c960fad4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -443,22 +443,25 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, Terms, +%% We can be recovering a transient queue if it crashed +init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + {PersistentClient, ContainsCheckFun} = + case IsDurable of + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun, AsyncCallback), + {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + false -> {undefined, fun(_MsgId) -> false end} + end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (MsgId) -> - rabbit_msg_store:contains(MsgId, PersistentClient) - end, - MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, + ContainsCheckFun, MsgIdxOnDiskFun), + init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> -- cgit v1.2.1 From 9d592379afcc7b67f0d1d5c8e7789ad0049a979f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 14:45:00 +0100 Subject: This is perhaps more honest; if the queue was transient we will not recover anything and if the queue is mirrored we will restart a slave and then also not recover anything. --- src/rabbit_prequeue.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 4f5ce244..059297cb 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -103,8 +103,7 @@ init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> rabbit_mirror_queue_slave:init_slave(Q); {crash_restart, Q1} -> rabbit_log:error( - "Recovering persistent messages from crashed ~s.~n", - [rabbit_misc:rs(QueueName)]), + "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), Self = self(), rabbit_misc:execute_mnesia_transaction( fun () -> -- cgit v1.2.1 From f1c0c943194a6bfb8503a38e1dbf15382ab0e0ff Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 15:02:02 +0100 Subject: Allow crashing queue processes to come back as slaves if that's what we need. --- src/rabbit_mirror_queue_slave.erl | 1 + src/rabbit_queue_index.erl | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c8bff0c4..0c9e6c21 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -116,6 +116,7 @@ init_slave(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, + ok = rabbit_queue_index:erase(QName), %% For crash recovery BQS = bq_init(BQ, Q1, new), State = #state { q = Q1, gm = GM, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0f572866..f21b44bc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_index). --export([init/2, recover/5, +-export([erase/1, init/2, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -200,6 +200,7 @@ {rabbit_types:msg_id(), non_neg_integer(), A})). -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). +-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> @@ -233,6 +234,13 @@ %% public API %%---------------------------------------------------------------------------- +erase(Name) -> + #qistate { dir = Dir } = blank_state(Name), + case rabbit_file:is_dir(Dir) of + true -> rabbit_file:recursive_delete([Dir]); + false -> ok + end. + init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir -- cgit v1.2.1 From b4d0c121fc0d2507da17b7878dc7ffb7ebe4587e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 18:15:43 +0100 Subject: Switch to making our restart decisions based on explicitly determining whether we are starting for the first time or not. This is not very OTPish but it turns out to be necessary: there's no way to distinguish between losing a race to declare, starting a new slave, and restarting to a new slave otherwise. As an upside this code is shorter and more obviously correct (to me at least). --- src/rabbit_amqqueue.erl | 8 +-- src/rabbit_amqqueue_sup.erl | 8 ++- src/rabbit_amqqueue_sup_sup.erl | 5 +- src/rabbit_prequeue.erl | 153 ++++++++++++++++++---------------------- 4 files changed, 78 insertions(+), 96 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8f25bf2e..0ad6af51 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,8 +195,6 @@ arguments]). recover() -> - Marker = spawn_link(fun() -> receive stop -> ok end end), - register(rabbit_recovery, Marker), %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -213,11 +211,7 @@ recover() -> {rabbit_amqqueue_sup_sup, {rabbit_amqqueue_sup_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), - Recovered = recover_durable_queues( - lists:zip(DurableQueues, OrderedRecoveryTerms)), - unlink(Marker), - Marker ! stop, - Recovered. + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 591de408..9f7060d7 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -36,12 +36,16 @@ %%---------------------------------------------------------------------------- -start_link(Q, Hint) -> - ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]}, +start_link(Q, StartMode) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + ChildSpec = {rabbit_amqqueue, + {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}, {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), + unlink(Marker), + Marker ! stop, {ok, SupPid, QPid}. init([]) -> {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 6870f7c4..793cb7c9 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -41,8 +41,9 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_queue_process(Node, Q, Hint) -> - {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), +start_queue_process(Node, Q, StartMode) -> + {ok, _SupPid, QPid} = supervisor2:start_child( + {?SERVER, Node}, [Q, StartMode]), QPid. init([]) -> diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 059297cb..b084967d 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -22,7 +22,7 @@ %% new queue, or whether we are in recovery. Thus a crashing queue %% process can restart from here and always do the right thing. --export([start_link/2]). +-export([start_link/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,27 +41,29 @@ %%---------------------------------------------------------------------------- -start_link(Q, Hint) -> - gen_server2:start_link(?MODULE, {Q, Hint}, []). +start_link(Q, StartMode, Marker) -> + gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). %%---------------------------------------------------------------------------- -init({Q, Hint}) -> +init({Q, StartMode0, Marker}) -> %% Hand back to supervisor ASAP gen_server2:cast(self(), init), - {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate, + StartMode = case is_process_alive(Marker) of + true -> StartMode0; + false -> restart + end, + {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, {Q, Hint}) -> - case whereis(rabbit_recovery) of - undefined -> init_non_recovery(Q, Hint); - _Pid -> recovery = Hint, %% assertion - init_recovery(Q) - end; +handle_cast(init, {Q, declare}) -> init_declared(Q); +handle_cast(init, {Q, recovery}) -> init_recovery(Q); +handle_cast(init, {Q, slave}) -> init_slave(Q); +handle_cast(init, {Q, restart}) -> init_restart(Q); handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. @@ -77,82 +79,27 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> - Result = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> init_missing(Q, Hint); - [ExistingQ] -> init_existing(ExistingQ) - end - end), - case Result of - {declared, DeclResult} -> - %% We have just been declared. Block waiting for an init - %% call so that we don't respond to any other message first - receive {'$gen_call', From, {init, new}} -> - case DeclResult of - {new, Fun} -> - Q1 = Fun(), - rabbit_amqqueue_process:init_declared(new,From, Q1); - {F, _} when F =:= absent; F =:= existing -> - gen_server2:reply(From, DeclResult), - {stop, normal, Q} - end - end; - new_slave -> - rabbit_mirror_queue_slave:init_slave(Q); - {crash_restart, Q1} -> - rabbit_log:error( - "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - ok = rabbit_amqqueue:store_queue(Q1#amqqueue{pid = Self}) - end), - rabbit_amqqueue_process:init_declared( - {no_barrier, non_clean_shutdown}, none, Q1); - sleep_retry -> - timer:sleep(25), - init_non_recovery(Q, Hint); - master_in_recovery -> - {stop, normal, Q} +init_declared(Q = #amqqueue{name = QueueName}) -> + Decl = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> rabbit_amqqueue:internal_declare(Q); + [ExistingQ] -> {existing, ExistingQ} + end + end), + %% We have just been declared. Block waiting for an init + %% call so that we don't respond to any other message first + receive {'$gen_call', From, {init, new}} -> + case Decl of + {new, Fun} -> + Q1 = Fun(), + rabbit_amqqueue_process:init_declared(new,From, Q1); + {F, _} when F =:= absent; F =:= existing -> + gen_server2:reply(From, Decl), + {stop, normal, Q} + end end. -%% The Hint is how we were originally started. Of course, if we -%% crashed it might no longer be true - but we can only get here if -%% there is no Mnesia record, which should mean we can't be here if we -%% crashed. -init_missing(Q, Hint) -> - case Hint of - declare -> {declared, rabbit_amqqueue:internal_declare(Q)}; - slave -> master_in_recovery %% [1] - end. -%% [1] This is the same concept as the master_in_recovery case in the -%% slave startup code. Unfortunately since we start slaves with two -%% transactions we need to check twice. - -init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> - Alive = fun rabbit_misc:is_process_alive/1, - case {Alive(QPid), node(QPid) =:= node()} of - {true, true} -> {declared, {existing, Q}}; %% [1] - {true, false} -> new_slave; %% [2] - {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of - [] -> {crash_restart, Q}; %% [3] - _ -> sleep_retry %% [4] - end - end. -%% [1] Lost a race to declare a queue - just return the winner. -%% -%% [2] There is a master on another node. Regardless of whether we -%% just crashed (as a master or slave) and restarted or were asked to -%% start as a slave, we are now a new slave. -%% -%% [3] Nothing is alive. We must have just died. Try to restart as a master. -%% -%% [4] The current master is dead but there are alive slaves. This is -%% not a stable situation. Sleep and wait for somebody else to make a -%% move - those slaves should either promote one of their own or die. - init_recovery(Q) -> rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q) end), @@ -160,3 +107,39 @@ init_recovery(Q) -> receive {'$gen_call', From, {init, Terms}} -> rabbit_amqqueue_process:init_declared(Terms, From, Q) end. + +init_slave(Q) -> + rabbit_mirror_queue_slave:init_slave(Q). + +init_restart(#amqqueue{name = QueueName}) -> + {ok, Q = #amqqueue{pid = QPid, + slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), + Local = node(QPid) =:= node(), + Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], + case rabbit_misc:is_process_alive(QPid) of + true -> false = Local, %% assertion + rabbit_mirror_queue_slave:init_slave(Q); %% [1] + false -> case Local andalso Slaves =:= [] of + true -> crash_restart(Q); %% [2] + false -> timer:sleep(25), + init_restart(Q) %% [3] + end + end. +%% [1] There is a master on another node. Regardless of whether we +%% were originally a master or a slave, we are now a new slave. +%% +%% [2] Nothing is alive. We are the last best hope. Try to restart as a master. +%% +%% [3] The current master is dead but either there are alive slaves to +%% take over or it's all happening on a different node anyway. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move. + +crash_restart(Q = #amqqueue{name = QueueName}) -> + rabbit_log:error( + "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), + rabbit_amqqueue_process:init_declared( + {no_barrier, non_clean_shutdown}, none, Q). -- cgit v1.2.1 From 36aa869f19d098280501c9c67fe6fa379e5b1b38 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 18:16:46 +0100 Subject: Make this test make sense for the new regime. There will of course need to be lots of new tests but at least this passes now. --- src/rabbit_tests.erl | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a186fb7a..bd11e8bf 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1666,16 +1666,12 @@ test_declare_on_dead_queue(SecondaryNode) -> rabbit_amqqueue:declare(QueueName, false, false, [], none), exit(QPid, kill), - Self ! {self(), killed, QPid} + Self ! {self(), killed} end), receive - {Pid, killed, QPid} -> - {existing, #amqqueue{name = QueueName, - pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], none), - false = rabbit_misc:is_process_alive(QPid), - {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], - none), + {Pid, killed} -> + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), true = rabbit_misc:is_process_alive(Q#amqqueue.pid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed -- cgit v1.2.1 From cad192fa5534246f0984e8af5a233b3543968971 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 20 Aug 2014 18:21:32 +0100 Subject: Cosmetic --- src/rabbit_prequeue.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index b084967d..8035f48f 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -136,8 +136,7 @@ init_restart(#amqqueue{name = QueueName}) -> %% move. crash_restart(Q = #amqqueue{name = QueueName}) -> - rabbit_log:error( - "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), + rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), Self = self(), rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), -- cgit v1.2.1 From 0a44fffb317dad08d73be561d64f1cf8322d4ed8 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 21 Aug 2014 13:50:38 +0100 Subject: Remove silly race. --- src/rabbit_tests.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bd11e8bf..7018bffe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1666,18 +1666,25 @@ test_declare_on_dead_queue(SecondaryNode) -> rabbit_amqqueue:declare(QueueName, false, false, [], none), exit(QPid, kill), - Self ! {self(), killed} + Self ! {self(), killed, QPid} end), receive - {Pid, killed} -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], - none), - true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {Pid, killed, OldPid} -> + Q = dead_queue_loop(QueueName, OldPid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. +dead_queue_loop(QueueName, OldPid) -> + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none), + case Q#amqqueue.pid of + OldPid -> timer:sleep(25), + dead_queue_loop(QueueName, OldPid); + _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + Q + end. + %%--------------------------------------------------------------------- control_action(Command, Args) -> -- cgit v1.2.1 From d71f4809dae43189d357bf2f4f6c9762b3cf4b14 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 21 Aug 2014 16:18:04 +0100 Subject: Don't leak supervisors. --- src/rabbit_amqqueue_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 9f7060d7..fba7a861 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -40,7 +40,7 @@ start_link(Q, StartMode) -> Marker = spawn_link(fun() -> receive stop -> ok end end), ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, - transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}, {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), -- cgit v1.2.1 From cb29af7579dc22e1ae63f2479e9f942317e47a65 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 21 Aug 2014 16:21:24 +0100 Subject: Unmislead comment. --- src/rabbit_amqqueue_process.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 590a8be0..02f1fa16 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -206,7 +206,7 @@ terminate({shutdown, missing_owner} = Reason, State) -> terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(normal, State) -> %% auto-delete case +terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); terminate(_Reason, State) -> terminate_crash(State). -- cgit v1.2.1 From b293c4fb3ad6b4928e35b4d2dd10f2126b81a5a7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 8 Sep 2014 16:08:04 +0100 Subject: Spec --- src/rabbit_prequeue.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 8035f48f..5400278c 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -35,7 +35,10 @@ -ifdef(use_specs). -%%-spec(start_link/2 :: () -> rabbit_types:ok_pid_or_error()). +-type(start_mode() :: 'declare' | 'recovery' | 'slave'). + +-spec(start_link/3 :: (rabbit_types:amqqueue(), start_mode(), pid()) + -> rabbit_types:ok_pid_or_error()). -endif. -- cgit v1.2.1 From 6775106769e34d8cce836a89d81f1915d7e02704 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 10:47:16 +0100 Subject: Try to be consistent. --- src/rabbit_amqqueue_sup.erl | 3 +-- src/rabbit_prequeue.erl | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index fba7a861..465c0412 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -28,8 +28,7 @@ -ifdef(use_specs). --spec(start_link/2 :: - (rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') -> +-spec(start_link/2 :: (rabbit_types:amqqueue(), rabbit_prequeue:start_mode()) -> {'ok', pid(), pid()}). -endif. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 5400278c..d71a68c5 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -35,6 +35,8 @@ -ifdef(use_specs). +-export_type([start_mode/0]). + -type(start_mode() :: 'declare' | 'recovery' | 'slave'). -spec(start_link/3 :: (rabbit_types:amqqueue(), start_mode(), pid()) -- cgit v1.2.1 From c95e1a0a32703a6c482e9e43652cfced11d1a953 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 13:36:06 +0100 Subject: Introduce the idea that queues can be absent for a reason. The traditional absent reason is 'nodedown' and we have a new reason, 'crashed', for when crash recovery has failed and the supervisor has given up. An absent crashed queue is nearly the same as an absent nodedown queue, but we allow for deleting it since it can't be recovered by bringing a node back up. Currently absent crashed queues are not handled properly by mgmt (they appear to still be there); we might defer that to bug 26151. --- src/rabbit_amqqueue.erl | 61 +++++++++++++++++++++++++++++++------------ src/rabbit_backing_queue.erl | 4 +++ src/rabbit_binding.erl | 6 ++--- src/rabbit_channel.erl | 18 +++++++------ src/rabbit_misc.erl | 13 ++++++--- src/rabbit_prequeue.erl | 5 +++- src/rabbit_variable_queue.erl | 6 ++++- src/rabbit_vhost.erl | 8 +++--- 8 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0ad6af51..4906b252 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,7 +17,9 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, declare/6, - delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). + delete_immediately/1, delete/3, + delete_crashed/1, delete_crashed_internal/1, + purge/1, forget_all_durable/1]). -export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -49,7 +51,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, absent_reason/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -59,8 +61,9 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(not_found_or_absent() :: 'not_found' | - {'absent', rabbit_types:amqqueue()}). +-type(absent_reason() :: 'nodedown' | 'crashed'). +-type(not_found_or_absent() :: + 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}). -spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok'). @@ -72,8 +75,9 @@ -spec(declare/6 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) - -> {'new' | 'existing' | 'absent' | 'owner_died', - rabbit_types:amqqueue()} | rabbit_types:channel_exit()). + -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | + {'absent', rabbit_types:amqqueue(), absent_reason()} | + rabbit_types:channel_exit()). -spec(internal_declare/1 :: (rabbit_types:amqqueue()) -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | @@ -137,6 +141,8 @@ -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty')). +-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> @@ -274,10 +280,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of - not_found -> ok = store_queue(Q), - B = add_default_binding(Q), - {new, fun () -> B(), Q end}; - {absent, _Q} = R -> R + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + {new, fun () -> B(), Q end}; + {absent, Q, _Reason} = R -> R end. update(Name, Fun) -> @@ -349,7 +355,7 @@ not_found_or_absent(Name) -> %% rabbit_queue and not found anything case mnesia:read({rabbit_durable_queue, Name}) of [] -> not_found; - [Q] -> {absent, Q} %% Q exists on stopped node + [Q] -> {absent, Q, nodedown} %% Q exists on stopped node end. not_found_or_absent_dirty(Name) -> @@ -358,7 +364,7 @@ not_found_or_absent_dirty(Name) -> %% and only affect the error kind. case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of {error, not_found} -> not_found; - {ok, Q} -> {absent, Q} + {ok, Q} -> {absent, Q, nodedown} end. with(Name, F, E) -> @@ -372,8 +378,11 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - timer:sleep(25), - with(Name, F, E) + case crashed_or_recovering(Q) of + crashed -> E({absent, Q, crashed}); + recovering -> timer:sleep(25), + with(Name, F, E) + end end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -382,10 +391,22 @@ with(Name, F, E) -> with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> - with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); - ({absent, Q}) -> rabbit_misc:absent(Q) + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). +%% TODO we could still be wrong here if we happen to call in the +%% middle of a crash-failover. We could try to figure out whether +%% that's happening by looking for the supervisor - but we'd need some +%% additional book keeping to know what it is... +crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> + case lists:member(node(QPid), [node() | nodes()]) of + true -> crashed; + false -> recovering + end; +crashed_or_recovering(_Q) -> + recovering. + assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> @@ -546,6 +567,14 @@ delete_immediately(QPids) -> delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate:call(QPid, {delete, IfUnused, IfEmpty}). +delete_crashed(#amqqueue{ pid = QPid } = Q) -> + rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). + +delete_crashed_internal(#amqqueue{ name = QName }) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQ:delete_crashed(QName), + ok = internal_delete(QName). + purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 098f5f43..310b8220 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -85,6 +85,10 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). +%% Called to clean up after a crashed queue. In this case we don't +%% have a process and thus a state(), we are just removing on-disk data. +-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'. + %% Remove all 'fetchable' messages from the queue, i.e. all messages %% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index d887f26a..12082af8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) -> absent_errs_only(Names) -> Errs = [E || Name <- Names, - {absent, _Q} = E <- [not_found_or_absent(Name)]], + {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]], rabbit_misc:const(case Errs of [] -> ok; _ -> {error, {resources_missing, Errs}} @@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) -> {not_found, Name}; not_found_or_absent(#resource{kind = queue} = Name) -> case rabbit_amqqueue:not_found_or_absent(Name) of - not_found -> {not_found, Name}; - {absent, _Q} = R -> R + not_found -> {not_found, Name}; + {absent, _Q, _Reason} = R -> R end. contains(Table, MatchHead) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e5a90410..fc433898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% must have been created between the stat and the %% declare. Loop around again. handle_method(Declare, none, State); - {absent, Q} -> - rabbit_misc:absent(Q); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); {owner_died, _Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. return_queue_declare_ok(QueueName, NoWait, 0, 0, State) end; - {error, {absent, Q}} -> - rabbit_misc:absent(Q) + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) end; handle_method(#'queue.declare'{queue = QueueNameBin, @@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin, rabbit_amqqueue:check_exclusive_access(Q, ConnPid), rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end, - fun (not_found) -> {ok, 0}; - ({absent, Q}) -> rabbit_misc:absent(Q) + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); @@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end) of {error, {resources_missing, [{not_found, Name} | _]}} -> rabbit_misc:not_found(Name); - {error, {resources_missing, [{absent, Q} | _]}} -> - rabbit_misc:absent(Q); + {error, {resources_missing, [{absent, Q, Reason} | _]}} -> + rabbit_misc:absent(Q, Reason); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c4148bbf..cf3a2edb 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, absent/1]). +-export([not_found/1, absent/2]). -export([type_class/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). @@ -119,7 +119,8 @@ -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). --spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). +-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason()) + -> rabbit_types:channel_exit()). -spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), @@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). -absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) -> +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, down) -> %% The assertion of durability is mainly there because we mention %% durability in the error message. That way we will hopefully %% notice if at some future point our logic changes s.t. we get %% here with non-durable queues. protocol_error(not_found, "home node '~s' of durable ~s is down or inaccessible", - [node(QPid), rs(QueueName)]). + [node(QPid), rs(QueueName)]); + +absent(#amqqueue{name = QueueName}, crashed) -> + protocol_error(not_found, + "~s has crashed and failed to restart", [rs(QueueName)]). type_class(byte) -> int; type_class(short) -> int; diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index d71a68c5..fd20e926 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -99,7 +99,10 @@ init_declared(Q = #amqqueue{name = QueueName}) -> {new, Fun} -> Q1 = Fun(), rabbit_amqqueue_process:init_declared(new,From, Q1); - {F, _} when F =:= absent; F =:= existing -> + {absent, _, _} -> + gen_server2:reply(From, Decl), + {stop, normal, Q}; + {existing, _} -> gen_server2:reply(From, Decl), {stop, normal, Q} end diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c960fad4..e858fb3d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,8 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, +-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, @@ -510,6 +511,9 @@ delete_and_terminate(_Reason, State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). +delete_crashed(QName) -> + ok = rabbit_queue_index:erase(QName). + purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index cfa3add4..2c1e15f0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -94,10 +94,10 @@ delete(VHostPath) -> [ok = Fun() || Fun <- Funs], ok. -assert_benign(ok) -> ok; -assert_benign({ok, _}) -> ok; -assert_benign({error, not_found}) -> ok; -assert_benign({error, {absent, Q}}) -> +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q, nodedown}}) -> %% We have a durable queue on a down node. Removing the mnesia %% entries here is safe. If/when the down node restarts, it will %% clear out the on-disk storage of the queue. -- cgit v1.2.1 From 6f19bae6147a3f3430ad66b10778f3334b661fb2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 13:41:38 +0100 Subject: Cosmetic --- src/rabbit_amqqueue.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4906b252..8ce24540 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,9 +17,8 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, declare/6, - delete_immediately/1, delete/3, - delete_crashed/1, delete_crashed_internal/1, - purge/1, forget_all_durable/1]). + delete_immediately/1, delete/3, purge/1, forget_all_durable/1 + delete_crashed/1, delete_crashed_internal/1]). -export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -280,10 +279,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of - not_found -> ok = store_queue(Q), - B = add_default_binding(Q), - {new, fun () -> B(), Q end}; - {absent, Q, _Reason} = R -> R + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + {new, fun () -> B(), Q end}; + {absent, _Q, _Reason} = R -> R end. update(Name, Fun) -> -- cgit v1.2.1 From 9cc2a0bcbe61539b6a4ef52bd3ea1165b07f9085 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 13:46:53 +0100 Subject: Oops --- src/rabbit_amqqueue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8ce24540..8b13ac49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,7 +17,7 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, declare/6, - delete_immediately/1, delete/3, purge/1, forget_all_durable/1 + delete_immediately/1, delete/3, purge/1, forget_all_durable/1, delete_crashed/1, delete_crashed_internal/1]). -export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, -- cgit v1.2.1 From 1c155a643f5985b0e3a34cc3b1edb9da087b8a9b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 14:37:23 +0100 Subject: Oops --- src/rabbit_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index cf3a2edb..77ac5c44 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -293,7 +293,7 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). -absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, down) -> +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, nodedown) -> %% The assertion of durability is mainly there because we mention %% durability in the error message. That way we will hopefully %% notice if at some future point our logic changes s.t. we get -- cgit v1.2.1 From 0c5e442e2c02157d2721e04c506ff16812cf30af Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 16:03:47 +0100 Subject: Rename slightly and improve comments. --- src/rabbit_amqqueue_process.erl | 6 +++--- src/rabbit_mirror_queue_slave.erl | 4 ++-- src/rabbit_prequeue.erl | 20 +++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 02f1fa16..537eaff5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([info_keys/0]). --export([init_declared/3, init_with_backing_queue_state/7]). +-export([become/3, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -108,8 +108,8 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). init(_) -> exit(cannot_be_called_directly). %% We have just been declared or recovered -init_declared(Recover, From, Q = #amqqueue{name = QName, - exclusive_owner = Owner}) -> +become(Recover, From, Q = #amqqueue{name = QName, + exclusive_owner = Owner}) -> process_flag(trap_exit, true), ?store_proc_name(QName), State = init_state(Q), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 0c9e6c21..c2bf1b89 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([set_maximum_since_use/2, info/1, init_slave/1, await/1]). +-export([set_maximum_since_use/2, info/1, become/1, await/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -80,7 +80,7 @@ await(SPid) -> gen_server2:call(SPid, await, infinity). init(_) -> exit(cannot_be_called_directly). -init_slave(Q = #amqqueue{name = QName}) -> +become(Q = #amqqueue{name = QName}) -> ?store_proc_name(QName), %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index fd20e926..cfe21494 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -92,13 +92,14 @@ init_declared(Q = #amqqueue{name = QueueName}) -> [ExistingQ] -> {existing, ExistingQ} end end), - %% We have just been declared. Block waiting for an init - %% call so that we don't respond to any other message first + %% We have just been declared. Block waiting for an init call so + %% that we can let the declarer know whether we actually started + %% something. receive {'$gen_call', From, {init, new}} -> case Decl of {new, Fun} -> Q1 = Fun(), - rabbit_amqqueue_process:init_declared(new,From, Q1); + rabbit_amqqueue_process:become(new, From, Q1); {absent, _, _} -> gen_server2:reply(From, Decl), {stop, normal, Q}; @@ -111,9 +112,10 @@ init_declared(Q = #amqqueue{name = QueueName}) -> init_recovery(Q) -> rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q) end), - %% Again block waiting for an init call. + %% This time we have an init call to ensure the recovery terms do + %% not sit in the supervisor forever. receive {'$gen_call', From, {init, Terms}} -> - rabbit_amqqueue_process:init_declared(Terms, From, Q) + rabbit_amqqueue_process:become(Terms, From, Q) end. init_slave(Q) -> @@ -126,11 +128,11 @@ init_restart(#amqqueue{name = QueueName}) -> Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], case rabbit_misc:is_process_alive(QPid) of true -> false = Local, %% assertion - rabbit_mirror_queue_slave:init_slave(Q); %% [1] + rabbit_mirror_queue_slave:become(Q); %% [1] false -> case Local andalso Slaves =:= [] of - true -> crash_restart(Q); %% [2] + true -> crash_restart(Q); %% [2] false -> timer:sleep(25), - init_restart(Q) %% [3] + init_restart(Q) %% [3] end end. %% [1] There is a master on another node. Regardless of whether we @@ -148,5 +150,5 @@ crash_restart(Q = #amqqueue{name = QueueName}) -> Self = self(), rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), - rabbit_amqqueue_process:init_declared( + rabbit_amqqueue_process:become( {no_barrier, non_clean_shutdown}, none, Q). -- cgit v1.2.1 From 5a0e34e615827aa8bf24bd0bc85a103a65218ab6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 16:14:55 +0100 Subject: Oops --- src/rabbit_prequeue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index cfe21494..5733c732 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -119,7 +119,7 @@ init_recovery(Q) -> end. init_slave(Q) -> - rabbit_mirror_queue_slave:init_slave(Q). + rabbit_mirror_queue_slave:become(Q). init_restart(#amqqueue{name = QueueName}) -> {ok, Q = #amqqueue{pid = QPid, -- cgit v1.2.1 From c243ede4fd2a08e058a4609ca6c747de48902526 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 16:22:22 +0100 Subject: Minor simplification. --- src/rabbit_amqqueue_process.erl | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 537eaff5..829c5523 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -208,8 +208,9 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); +%% If we crashed don't try to clean up the BQS, probably best to leave it. terminate(_Reason, State) -> - terminate_crash(State). + terminate_shutdown(fun (BQS) -> BQS end, State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, @@ -243,12 +244,6 @@ terminate_shutdown(Fun, State) -> State1#q{backing_queue_state = Fun(BQS)} end. -terminate_crash(State = #q{consumers = Consumers}) -> - QName = qname(State), - [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], - ok. - code_change(_OldVsn, State, _Extra) -> {ok, State}. -- cgit v1.2.1 From 215887d34c8964b3a83b5fa4929dc1d06bc8bf69 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 9 Sep 2014 19:06:06 +0100 Subject: Reduce distance to default, especially WRT rabbit_amqqueue_process startup. --- src/rabbit_amqqueue.erl | 33 +++++++---- src/rabbit_amqqueue_process.erl | 119 +++++++++++++++++++++++++--------------- src/rabbit_prequeue.erl | 49 ++--------------- 3 files changed, 105 insertions(+), 96 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8b13ac49..aadedda7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -36,7 +36,7 @@ cancel_sync_mirrors/1]). %% internal --export([internal_declare/1, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -77,8 +77,9 @@ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | {'absent', rabbit_types:amqqueue(), absent_reason()} | rabbit_types:channel_exit()). --spec(internal_declare/1 :: - (rabbit_types:amqqueue()) +%% TODO nonsense +-spec(internal_declare/2 :: + (rabbit_types:amqqueue(), boolean()) -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | {'absent', rabbit_types:amqqueue()}). -spec(update/2 :: @@ -277,13 +278,25 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), {init, new}, infinity). -internal_declare(Q = #amqqueue{name = QueueName}) -> - case not_found_or_absent(QueueName) of - not_found -> ok = store_queue(Q), - B = add_default_binding(Q), - {new, fun () -> B(), Q end}; - {absent, _Q, _Reason} = R -> R - end. +internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); +internal_declare(Q = #amqqueue{name = QueueName}, false) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case not_found_or_absent(QueueName) of + not_found -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {absent, _Q, _} = R -> rabbit_misc:const(R) + end; + [ExistingQ] -> + rabbit_misc:const(ExistingQ) + end + end). update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 829c5523..b21a4c88 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([info_keys/0]). --export([become/3, init_with_backing_queue_state/7]). +-export([become/1, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -107,55 +107,62 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). init(_) -> exit(cannot_be_called_directly). -%% We have just been declared or recovered -become(Recover, From, Q = #amqqueue{name = QName, - exclusive_owner = Owner}) -> +become(Q = #amqqueue{name = QName}) -> process_flag(trap_exit, true), ?store_proc_name(QName), - State = init_state(Q), - case Owner of - none -> finish_init(Recover, From, State); - _ -> case rabbit_misc:is_process_alive(Owner) of %% [1] - true -> erlang:monitor(process, Owner), - finish_init(Recover, From, State); - false -> gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, - backing_queue_state = BQS}} - end - end. -%% [1] You used to be able to declare an exclusive durable -%% queue. Sadly we need to still tidy up after that case, there could -%% be the remnants of one left over from an upgrade. So that's why we -%% don't enforce Recover = new here. + {become, ?MODULE, init_state(Q), hibernate}. finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - send_reply(From, Q), - {RecoveryPid, TermsOrNew} = recovery_status(Recover), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQ = backing_queue_module(Q), - BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(RecoveryPid), - State1 = process_args_policy(State#q{backing_queue = BQ, - backing_queue_state = BQS}), - notify_decorators(startup, State1), - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - {become, ?MODULE, State1, hibernate}. + {Barrier, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recover /= new) of + #amqqueue{} = Q1 -> + case matches(Recover, Q, Q1) of + true -> + send_reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q1), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Barrier), + State1 = process_args_policy( + State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, State), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + noreply(State1); + false -> + {stop, normal, {existing, Q1}, State} + end; + Err -> + {stop, normal, Err, State} + end. recovery_status(new) -> {no_barrier, new}; recovery_status({Recover, Terms}) -> {Recover, Terms}. +send_reply(none, _Q) -> ok; +send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). + +matches(new, Q1, Q2) -> + %% i.e. not policy + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; +matches(_, Q, Q) -> true; +matches(_, _Q, _Q1) -> false. + recovery_barrier(no_barrier) -> ok; recovery_barrier(BarrierPid) -> @@ -165,9 +172,6 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -send_reply(none, _Q) -> ok; -send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). - %% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> @@ -915,6 +919,31 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = none}}) -> + finish_init(Recover, From, State); + +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = new here. +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + finish_init(Recover, From, State); + false -> #q{backing_queue = undefined, + backing_queue_state = undefined, + q = Q} = State, + gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, backing_queue_state = BQS}} + end; + handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1058,6 +1087,10 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). +%% TODO exclusive? +handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) -> + finish_init({no_barrier, non_clean_shutdown}, none, State); + handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 5733c732..f95b0ced 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -65,8 +65,8 @@ init({Q, StartMode0, Marker}) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, {Q, declare}) -> init_declared(Q); -handle_cast(init, {Q, recovery}) -> init_recovery(Q); +handle_cast(init, {Q, declare}) -> init_master(Q); +handle_cast(init, {Q, recovery}) -> init_master(Q); handle_cast(init, {Q, slave}) -> init_slave(Q); handle_cast(init, {Q, restart}) -> init_restart(Q); @@ -84,42 +84,8 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_declared(Q = #amqqueue{name = QueueName}) -> - Decl = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> rabbit_amqqueue:internal_declare(Q); - [ExistingQ] -> {existing, ExistingQ} - end - end), - %% We have just been declared. Block waiting for an init call so - %% that we can let the declarer know whether we actually started - %% something. - receive {'$gen_call', From, {init, new}} -> - case Decl of - {new, Fun} -> - Q1 = Fun(), - rabbit_amqqueue_process:become(new, From, Q1); - {absent, _, _} -> - gen_server2:reply(From, Decl), - {stop, normal, Q}; - {existing, _} -> - gen_server2:reply(From, Decl), - {stop, normal, Q} - end - end. - -init_recovery(Q) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> ok = rabbit_amqqueue:store_queue(Q) end), - %% This time we have an init call to ensure the recovery terms do - %% not sit in the supervisor forever. - receive {'$gen_call', From, {init, Terms}} -> - rabbit_amqqueue_process:become(Terms, From, Q) - end. - -init_slave(Q) -> - rabbit_mirror_queue_slave:become(Q). +init_master(Q) -> rabbit_amqqueue_process:become(Q). +init_slave(Q) -> rabbit_mirror_queue_slave:become(Q). init_restart(#amqqueue{name = QueueName}) -> {ok, Q = #amqqueue{pid = QPid, @@ -147,8 +113,5 @@ init_restart(#amqqueue{name = QueueName}) -> crash_restart(Q = #amqqueue{name = QueueName}) -> rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), - rabbit_amqqueue_process:become( - {no_barrier, non_clean_shutdown}, none, Q). + gen_server2:cast(self(), init), + rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}). -- cgit v1.2.1 From 183007616bc81421ccd0bdf96f333591ca637b83 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 10 Sep 2014 11:24:14 +0100 Subject: Fix type spec, fix a behaviour warning. --- src/rabbit_amqqueue.erl | 6 +++--- src/rabbit_mirror_queue_master.erl | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index aadedda7..e026279f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -61,6 +61,8 @@ -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -type(absent_reason() :: 'nodedown' | 'crashed'). +-type(queue_or_absent() :: rabbit_types:amqqueue() | + {'absent', rabbit_types:amqqueue(),absent_reason()}). -type(not_found_or_absent() :: 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}). -spec(recover/0 :: () -> [rabbit_types:amqqueue()]). @@ -77,11 +79,9 @@ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | {'absent', rabbit_types:amqqueue(), absent_reason()} | rabbit_types:channel_exit()). -%% TODO nonsense -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | - {'absent', rabbit_types:amqqueue()}). + -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9bccf5dd..1bea8042 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -24,7 +24,7 @@ needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2]). --export([start/1, stop/0]). +-export([start/1, stop/0, delete_crashed/1]). -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). @@ -90,6 +90,9 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). +delete_crashed(_QName) -> + exit({not_valid_for_generic_backing_queue, ?MODULE}). + init(Q, Recover, AsyncCallback) -> {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), -- cgit v1.2.1 From 5eb6dbe387b1aa93241333411aecb819bd9c864c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 10 Sep 2014 11:59:06 +0100 Subject: Further reduce distance to default (especially in slave), by getting the prequeue to pass straight into the right module at the end of init/1. --- src/gen_server2.erl | 14 ++++++++- src/rabbit_amqqueue_process.erl | 12 ++++---- src/rabbit_mirror_queue_misc.erl | 5 +-- src/rabbit_mirror_queue_slave.erl | 42 +++++++++++++++++-------- src/rabbit_prequeue.erl | 64 +++++++++++++++------------------------ 5 files changed, 75 insertions(+), 62 deletions(-) diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ee82bcb3..d2f96b52 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -69,7 +69,9 @@ %% which will be passed into any of the callback functions in the new %% module. Note there is no form also encompassing a reply, thus if %% you wish to reply in handle_call/3 and change the callback module, -%% you need to use gen_server2:reply/2 to issue the reply manually. +%% you need to use gen_server2:reply/2 to issue the reply +%% manually. The init function can similarly return a 5th argument, +%% Module, in order to dynamically decide the callback module on init. %% %% 8) The callback module can optionally implement %% format_message_queue/2 which is the equivalent of format_status/2 @@ -125,6 +127,7 @@ %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} +%%% {ok, State, Timeout, Backoff, Module} %%% ignore %%% {stop, Reason} %%% @@ -242,6 +245,8 @@ {ok, State :: term(), timeout() | hibernate} | {ok, State :: term(), timeout() | hibernate, {backoff, millis(), millis(), millis()}} | + {ok, State :: term(), timeout() | hibernate, + {backoff, millis(), millis(), millis()}, atom()} | ignore | {stop, Reason :: term()}. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, @@ -568,6 +573,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(GS2State #gs2_state { state = State, time = Timeout, timeout_state = Backoff1 }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { mod = Mod1, + state = State, + time = Timeout, + timeout_state = Backoff1 }); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b21a4c88..e068b93c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([info_keys/0]). --export([become/1, init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -105,12 +105,12 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(_) -> exit(cannot_be_called_directly). - -become(Q = #amqqueue{name = QName}) -> +init(Q) -> process_flag(trap_exit, true), - ?store_proc_name(QName), - {become, ?MODULE, init_state(Q), hibernate}. + ?store_proc_name(Q#amqqueue.name), + {ok, init_state(Q#amqqueue{pid = self()}), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, + ?MODULE}. finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5cb871e8..aec6f93d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -210,10 +210,7 @@ add_mirror(QName, MirrorNode, SyncMode) -> MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), - case SyncMode of - sync -> rabbit_mirror_queue_slave:await(SPid); - async -> ok - end + rabbit_mirror_queue_slave:go(SPid, SyncMode) end); {error, not_found} = E -> E diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c2bf1b89..2da2e7a5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([set_maximum_since_use/2, info/1, become/1, await/1]). +-export([set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -76,12 +76,16 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -await(SPid) -> gen_server2:call(SPid, await, infinity). +init(Q) -> + ?store_proc_name(Q#amqqueue.name), + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}, ?MODULE}. -init(_) -> exit(cannot_be_called_directly). +go(SPid, sync) -> gen_server2:call(SPid, go, infinity); +go(SPid, async) -> gen_server2:cast(SPid, go). -become(Q = #amqqueue{name = QName}) -> - ?store_proc_name(QName), +handle_go(Q = #amqqueue{name = QName}) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -136,25 +140,25 @@ become(Q = #amqqueue{name = QName}) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {become, ?MODULE, State, hibernate}; + {ok, State}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), - {stop, {stale_master_pid, StalePid}, Q}; + {error, {stale_master_pid, StalePid}}; duplicate_live_master -> gm:leave(GM), - {stop, {duplicate_live_master, Node}, Q}; + {error, {duplicate_live_master, Node}}; existing -> gm:leave(GM), - {stop, normal, Q}; + {error, normal}; master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {stop, normal, Q} + {error, normal} end. init_it(Self, GM, Node, QName) -> @@ -188,8 +192,11 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). -handle_call(await, _From, State) -> - {reply, ok, State}; +handle_call(go, _From, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {reply, ok, State}; + {error, Error} -> {stop, Error, NotStarted} + end; handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { @@ -227,6 +234,12 @@ handle_call({gm_deaths, DeadGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {noreply, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -307,6 +320,8 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. +terminate(_Reason, {not_started, _Q}) -> + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -345,6 +360,9 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate({not_started, _Q} = State) -> + {hibernate, State}; + handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index f95b0ced..708cad53 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -51,54 +51,29 @@ start_link(Q, StartMode, Marker) -> %%---------------------------------------------------------------------------- -init({Q, StartMode0, Marker}) -> - %% Hand back to supervisor ASAP - gen_server2:cast(self(), init), - StartMode = case is_process_alive(Marker) of - true -> StartMode0; - false -> restart - end, - {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}. - -handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. - -handle_cast(init, {Q, declare}) -> init_master(Q); -handle_cast(init, {Q, recovery}) -> init_master(Q); -handle_cast(init, {Q, slave}) -> init_slave(Q); -handle_cast(init, {Q, restart}) -> init_restart(Q); - -handle_cast(Msg, State) -> - {stop, {unexpected_cast, Msg}, State}. +init({Q, StartMode, Marker}) -> + init(Q, case {is_process_alive(Marker), StartMode} of + {true, slave} -> slave; + {true, _} -> master; + {false, _} -> restart + end). -handle_info(Msg, State) -> - {stop, {unexpected_info, Msg}, State}. +init(Q, master) -> rabbit_amqqueue_process:init(Q); +init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- - -init_master(Q) -> rabbit_amqqueue_process:become(Q). -init_slave(Q) -> rabbit_mirror_queue_slave:become(Q). - -init_restart(#amqqueue{name = QueueName}) -> +init(#amqqueue{name = QueueName}, restart) -> {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), Local = node(QPid) =:= node(), Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], case rabbit_misc:is_process_alive(QPid) of true -> false = Local, %% assertion - rabbit_mirror_queue_slave:become(Q); %% [1] + rabbit_mirror_queue_slave:go(self(), async), + rabbit_mirror_queue_slave:init(Q); %% [1] false -> case Local andalso Slaves =:= [] of - true -> crash_restart(Q); %% [2] + true -> crash_restart(Q); %% [2] false -> timer:sleep(25), - init_restart(Q) %% [3] + init(Q, restart) %% [3] end end. %% [1] There is a master on another node. Regardless of whether we @@ -114,4 +89,15 @@ init_restart(#amqqueue{name = QueueName}) -> crash_restart(Q = #amqqueue{name = QueueName}) -> rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), gen_server2:cast(self(), init), - rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}). + rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}). + +%%---------------------------------------------------------------------------- + +%% This gen_server2 always hands over to some other module at the end +%% of init/1. +handle_call(_Msg, _From, _State) -> exit(unreachable). +handle_cast(_Msg, _State) -> exit(unreachable). +handle_info(_Msg, _State) -> exit(unreachable). +terminate(_Reason, _State) -> exit(unreachable). +code_change(_OldVsn, _State, _Extra) -> exit(unreachable). + -- cgit v1.2.1 From 3a12c784c5b70b417b979f3cf0e789bb1915dcce Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 10 Sep 2014 12:14:49 +0100 Subject: Restart exclusive queues correctly, update comment. --- src/rabbit_amqqueue_process.erl | 86 ++++++++++++++++++++--------------------- src/rabbit_prequeue.erl | 6 +-- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e068b93c..d37e95c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -112,15 +112,49 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, ?MODULE}. -finish_init(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> +init_state(Q) -> + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0}, + rabbit_event:init_stats_timer(State, #q.stats_timer). + +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> + init_it2(Recover, From, State); + +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = new here. +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + init_it2(Recover, From, State); + false -> #q{backing_queue = undefined, + backing_queue_state = undefined, + q = Q} = State, + send_reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, backing_queue_state = BQS}} + end. + +init_it2(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> {Barrier, TermsOrNew} = recovery_status(Recover), case rabbit_amqqueue:internal_declare(Q, Recover /= new) of #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> - send_reply(From, Q), + send_reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( @@ -149,7 +183,7 @@ recovery_status(new) -> {no_barrier, new}; recovery_status({Recover, Terms}) -> {Recover, Terms}. send_reply(none, _Q) -> ok; -send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). +send_reply(From, Q) -> gen_server2:reply(From, Q). matches(new, Q1, Q2) -> %% i.e. not policy @@ -192,17 +226,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0}, - rabbit_event:init_stats_timer(State, #q.stats_timer). - terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> @@ -919,30 +942,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - finish_init(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - finish_init(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; +handle_call({init, Recover}, From, State) -> + init_it(Recover, From, State); handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1087,9 +1088,8 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). -%% TODO exclusive? -handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) -> - finish_init({no_barrier, non_clean_shutdown}, none, State); +handle_cast(init, State) -> + init_it({no_barrier, non_clean_shutdown}, none, State); handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 708cad53..b1d92b89 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -18,9 +18,9 @@ %% This is the initial gen_server that all queue processes start off %% as. It handles the decision as to whether we need to start a new -%% slave, a new master/unmirrored, whether we lost a race to declare a -%% new queue, or whether we are in recovery. Thus a crashing queue -%% process can restart from here and always do the right thing. +%% slave, a new master/unmirrored, or whether we are restarting (and +%% if so, as what). Thus a crashing queue process can restart from here +%% and always do the right thing. -export([start_link/3]). -- cgit v1.2.1 From 01138a540397d641497224ccbbe14eedc4571e6a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Sep 2014 10:53:43 +0100 Subject: Improve comments --- src/rabbit_amqqueue.erl | 10 ++++++---- src/rabbit_amqqueue_process.erl | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e026279f..6f299a9e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -407,10 +407,12 @@ with_or_die(Name, F) -> ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). -%% TODO we could still be wrong here if we happen to call in the -%% middle of a crash-failover. We could try to figure out whether -%% that's happening by looking for the supervisor - but we'd need some -%% additional book keeping to know what it is... +%% TODO we could say we are crashed when we mean recovering if we +%% happen to call in the middle of a crash-failover. We could try to +%% figure out whether that's happening by looking for the supervisor - +%% but we'd need some additional book keeping to know what it is. And +%% it will just mean a temporary glitch while crashing, which is +%% fairly tolerable. crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> case lists:member(node(QPid), [node() | nodes()]) of true -> crashed; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d37e95c3..12a3c9f0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -206,7 +206,6 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -%% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> case Owner of -- cgit v1.2.1 From acc7038fde6c9f244a4a0b8188cc293c1cbbe33c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Sep 2014 10:54:02 +0100 Subject: You should be able to delete a vhost containing a crashed queue too. --- src/rabbit_vhost.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 2c1e15f0..18d44225 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -97,10 +97,9 @@ delete(VHostPath) -> assert_benign(ok) -> ok; assert_benign({ok, _}) -> ok; assert_benign({error, not_found}) -> ok; -assert_benign({error, {absent, Q, nodedown}}) -> - %% We have a durable queue on a down node. Removing the mnesia - %% entries here is safe. If/when the down node restarts, it will - %% clear out the on-disk storage of the queue. +assert_benign({error, {absent, Q, _}}) -> + %% Removing the mnesia entries here is safe. If/when the down node + %% restarts, it will clear out the on-disk storage of the queue. case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of ok -> ok; {error, not_found} -> ok -- cgit v1.2.1