summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-08 14:12:26 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-08 14:12:26 +0100
commitce51765ac7299ea27796d57c3903a15e4f4120ca (patch)
tree131df7ebfbb697ee699f55a6f07cc85d51470284
parent644af26c9866e6af05dd58cd1cd02b39c8933647 (diff)
downloadrabbitmq-server-bug24038.tar.gz
Abstract out mainly timer maintanence functionsbug24038
-rw-r--r--src/rabbit_amqqueue_process.erl54
-rw-r--r--src/rabbit_amqqueue_process_utils.erl99
2 files changed, 118 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2b0fe17e..435edc07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,8 +21,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
#message_properties{expiry = undefined, needs_confirming = false}).
@@ -226,37 +224,27 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
false -> {stop_sync_timer(State1), hibernate}
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+stop_sync_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+sync_timer_getter(State) -> State#q.sync_timer_ref.
+sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}.
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{rate_timer_ref = undefined}.
+stop_rate_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+
+rate_timer_getter(State) -> State#q.rate_timer_ref.
+rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
@@ -1160,15 +1148,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- BQS3 = BQ:handle_pre_hibernate(BQS2),
+ BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ backing_queue_state = BQS1},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl
new file mode 100644
index 00000000..feb2a79c
--- /dev/null
+++ b/src/rabbit_amqqueue_process_utils.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 201-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_amqqueue_process_utils).
+
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
+-export([backing_queue_pre_hibernate/2,
+ ensure_sync_timer/3, stop_sync_timer/3,
+ ensure_rate_timer/3, stop_rate_timer/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(bq_mod() :: atom()).
+-type(bq_state() :: any()). %% A good example of dialyzer's shortcomings
+
+-type(queue_state() :: any()). %% Another such example.
+-type(getter(A) :: fun ((queue_state()) -> A)).
+-type(setter(A) :: fun ((A, queue_state()) -> queue_state())).
+
+-type(tref() :: term()). %% Sigh. According to timer docs.
+
+-spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()).
+
+-spec(ensure_sync_timer/3 :: (getter('undefined'|tref()),
+ setter('undefined'|tref()),
+ queue_state()) -> queue_state()).
+-spec(stop_sync_timer/3 :: (getter('undefined'|tref()),
+ setter('undefined'|tref()),
+ queue_state()) -> queue_state()).
+
+-spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
+ setter('undefined'|'just_measured'|tref()),
+ queue_state()) -> queue_state()).
+-spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
+ setter('undefined'|'just_measured'|tref()),
+ queue_state()) -> queue_state()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+backing_queue_pre_hibernate(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQ:handle_pre_hibernate(BQS2).
+
+ensure_sync_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue,
+ sync_timeout, [self()]),
+ Setter(TRef, State);
+ _TRef -> State
+ end.
+
+stop_sync_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> State;
+ TRef -> {ok, cancel} = timer:cancel(TRef),
+ Setter(undefined, State)
+ end.
+
+ensure_rate_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> {ok, TRef} =
+ timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue,
+ update_ram_duration, [self()]),
+ Setter(TRef, State);
+ just_measured -> Setter(undefined, State);
+ _TRef -> State
+ end.
+
+stop_rate_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> State;
+ just_measured -> Setter(undefined, State);
+ TRef -> {ok, cancel} = timer:cancel(TRef),
+ Setter(undefined, State)
+ end.