diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:14:49 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:14:49 +0100 |
commit | 3a12c784c5b70b417b979f3cf0e789bb1915dcce (patch) | |
tree | 5e3c081b275a031a9621aefdb9cdcd5ae420a5d6 /src | |
parent | 5eb6dbe387b1aa93241333411aecb819bd9c864c (diff) | |
download | rabbitmq-server-3a12c784c5b70b417b979f3cf0e789bb1915dcce.tar.gz |
Restart exclusive queues correctly, update comment.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 86 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 6 |
2 files changed, 46 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e068b93c..d37e95c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -112,15 +112,49 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, ?MODULE}. -finish_init(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> +init_state(Q) -> + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0}, + rabbit_event:init_stats_timer(State, #q.stats_timer). + +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> + init_it2(Recover, From, State); + +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = new here. +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + init_it2(Recover, From, State); + false -> #q{backing_queue = undefined, + backing_queue_state = undefined, + q = Q} = State, + send_reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, backing_queue_state = BQS}} + end. + +init_it2(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> {Barrier, TermsOrNew} = recovery_status(Recover), case rabbit_amqqueue:internal_declare(Q, Recover /= new) of #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> - send_reply(From, Q), + send_reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( @@ -149,7 +183,7 @@ recovery_status(new) -> {no_barrier, new}; recovery_status({Recover, Terms}) -> {Recover, Terms}. send_reply(none, _Q) -> ok; -send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). +send_reply(From, Q) -> gen_server2:reply(From, Q). matches(new, Q1, Q2) -> %% i.e. not policy @@ -192,17 +226,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0}, - rabbit_event:init_stats_timer(State, #q.stats_timer). - terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> @@ -919,30 +942,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - finish_init(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - finish_init(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; +handle_call({init, Recover}, From, State) -> + init_it(Recover, From, State); handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1087,9 +1088,8 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). -%% TODO exclusive? -handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) -> - finish_init({no_barrier, non_clean_shutdown}, none, State); +handle_cast(init, State) -> + init_it({no_barrier, non_clean_shutdown}, none, State); handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 708cad53..b1d92b89 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -18,9 +18,9 @@ %% This is the initial gen_server that all queue processes start off %% as. It handles the decision as to whether we need to start a new -%% slave, a new master/unmirrored, whether we lost a race to declare a -%% new queue, or whether we are in recovery. Thus a crashing queue -%% process can restart from here and always do the right thing. +%% slave, a new master/unmirrored, or whether we are restarting (and +%% if so, as what). Thus a crashing queue process can restart from here +%% and always do the right thing. -export([start_link/3]). |