summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-10 12:14:49 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-10 12:14:49 +0100
commit3a12c784c5b70b417b979f3cf0e789bb1915dcce (patch)
tree5e3c081b275a031a9621aefdb9cdcd5ae420a5d6 /src
parent5eb6dbe387b1aa93241333411aecb819bd9c864c (diff)
downloadrabbitmq-server-3a12c784c5b70b417b979f3cf0e789bb1915dcce.tar.gz
Restart exclusive queues correctly, update comment.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl86
-rw-r--r--src/rabbit_prequeue.erl6
2 files changed, 46 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e068b93c..d37e95c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -112,15 +112,49 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
?MODULE}.
-finish_init(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
+init_state(Q) ->
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ consumers = rabbit_queue_consumers:new(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0},
+ rabbit_event:init_stats_timer(State, #q.stats_timer).
+
+init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ init_it2(Recover, From, State);
+
+%% You used to be able to declare an exclusive durable queue. Sadly we
+%% need to still tidy up after that case, there could be the remnants
+%% of one left over from an upgrade. So that's why we don't enforce
+%% Recover = new here.
+init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ init_it2(Recover, From, State);
+ false -> #q{backing_queue = undefined,
+ backing_queue_state = undefined,
+ q = Q} = State,
+ send_reply(From, {owner_died, Q}),
+ BQ = backing_queue_module(Q),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
+ %% Rely on terminate to delete the queue.
+ {stop, {shutdown, missing_owner},
+ State#q{backing_queue = BQ, backing_queue_state = BQS}}
+ end.
+
+init_it2(Recover, From, State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
{Barrier, TermsOrNew} = recovery_status(Recover),
case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
#amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
- send_reply(From, Q),
+ send_reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
@@ -149,7 +183,7 @@ recovery_status(new) -> {no_barrier, new};
recovery_status({Recover, Terms}) -> {Recover, Terms}.
send_reply(none, _Q) -> ok;
-send_reply(From, Q) -> gen_server2:reply(From, {new, Q}).
+send_reply(From, Q) -> gen_server2:reply(From, Q).
matches(new, Q1, Q2) ->
%% i.e. not policy
@@ -192,17 +226,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- consumers = rabbit_queue_consumers:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0},
- rabbit_event:init_stats_timer(State, #q.stats_timer).
-
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
@@ -919,30 +942,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = none}}) ->
- finish_init(Recover, From, State);
-
-%% You used to be able to declare an exclusive durable queue. Sadly we
-%% need to still tidy up after that case, there could be the remnants
-%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = new here.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rabbit_misc:is_process_alive(Owner) of
- true -> erlang:monitor(process, Owner),
- finish_init(Recover, From, State);
- false -> #q{backing_queue = undefined,
- backing_queue_state = undefined,
- q = Q} = State,
- gen_server2:reply(From, {owner_died, Q}),
- BQ = backing_queue_module(Q),
- {_, Terms} = recovery_status(Recover),
- BQS = bq_init(BQ, Q, Terms),
- %% Rely on terminate to delete the queue.
- {stop, {shutdown, missing_owner},
- State#q{backing_queue = BQ, backing_queue_state = BQS}}
- end;
+handle_call({init, Recover}, From, State) ->
+ init_it(Recover, From, State);
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1087,9 +1088,8 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
-%% TODO exclusive?
-handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
- finish_init({no_barrier, non_clean_shutdown}, none, State);
+handle_cast(init, State) ->
+ init_it({no_barrier, non_clean_shutdown}, none, State);
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 708cad53..b1d92b89 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -18,9 +18,9 @@
%% This is the initial gen_server that all queue processes start off
%% as. It handles the decision as to whether we need to start a new
-%% slave, a new master/unmirrored, whether we lost a race to declare a
-%% new queue, or whether we are in recovery. Thus a crashing queue
-%% process can restart from here and always do the right thing.
+%% slave, a new master/unmirrored, or whether we are restarting (and
+%% if so, as what). Thus a crashing queue process can restart from here
+%% and always do the right thing.
-export([start_link/3]).