summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl43
1 files changed, 22 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1df05922..f4459e45 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -122,11 +122,10 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
-
State = #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
+ backing_queue = undefined,
backing_queue_state = undefined,
active_consumers = queue:new(),
expires = undefined,
@@ -193,7 +192,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From, State = #q{q = Q,
- backing_queue = BQ,
+ backing_queue = undefined,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
#amqqueue{} = Q1 ->
@@ -205,9 +204,11 @@ declare(Recover, From, State = #q{q = Q,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
+ BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
recovery_barrier(Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
+ State1 = process_args(State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -1152,23 +1153,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
-handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -1298,6 +1282,23 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
+handle_cast(start_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_cast(wake_up, State) ->
noreply(State).