summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-12-12 18:11:40 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-12-12 18:11:40 +0000
commit7b91034557990edd2546fc8b86976f32b0279b97 (patch)
tree317db48716359f947001cf5a88c6f991814a8562
parent0d6becf0d292553430d0c5c93dd2ec0984e0ca27 (diff)
parentb26a13371e30a09281d2f3d4d1d8db804682403d (diff)
downloadrabbitmq-server-7b91034557990edd2546fc8b86976f32b0279b97.tar.gz
Merge bug25353
-rw-r--r--src/rabbit_amqqueue_process.erl30
1 files changed, 12 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f4459e45..ab735be6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -284,21 +284,17 @@ terminate_shutdown(Fun, State) ->
end.
reply(Reply, NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {reply, Reply, NewState1, Timeout}.
+ {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
noreply(NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {noreply, NewState1, Timeout}.
+ {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_stats_timer(
- ensure_rate_timer(
- confirm_messages(MsgIds, State#q{
- backing_queue_state = BQS1}))),
+ State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -328,15 +324,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
TRef = erlang:send_after(
?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
@@ -1322,10 +1314,10 @@ handle_info(drop_expired, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- {noreply, State1, Timeout} = noreply(State),
- %% Need to reset *after* we've been through noreply/1 so we do not
- %% just create another timer always and therefore never hibernate
- {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(rabbit_event:reset_stats_timer(
+ State, #q.stats_timer)),
+ {noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1349,8 +1341,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined,
+ backing_queue_state = BQS2}),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));