summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl86
1 files changed, 41 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe1ddba0..46f6674b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -118,19 +118,19 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ State = #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = dict:new()},
+ {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
- State = requeue_and_run(
- AckTags,
- process_args(
- #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = MTC})),
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = MTC},
+ State1 = requeue_and_run(AckTags, process_args(
+ rabbit_event:init_stats_timer(
+ State, #q.stats_timer))),
lists:foldl(
fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
- State, Deliveries).
+ State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From,
- State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
- stats_timer = StatsTimer}) ->
+declare(Recover, From, State = #q{q = Q,
+ backing_queue = BQ,
+ backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -199,7 +198,7 @@ declare(Recover, From,
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(StatsTimer,
+ rabbit_event:if_enabled(State1, #q.stats_timer,
fun() -> emit_stats(State1) end),
noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
@@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
-ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = #amqqueue{pid = QPid}}) ->
- State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, QPid, emit_stats)}.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) ->
handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer),
assert_invariant(State1),
{noreply, State1, hibernate};
@@ -1167,18 +1164,17 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- stats_timer = StatsTimer}) ->
+ backing_queue_state = BQS}) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
- StatsTimer,
+ State, #q.stats_timer,
fun () -> emit_stats(State, [{idle_since, now()}]) end),
- State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
+ #q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).