summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-29 14:10:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-29 14:10:59 +0100
commite97c983c251463fa96e4adfde51f62bba6e611c3 (patch)
treef92c16dfa3dcd89f4ecad40263ee21ba1b610682
parente1e3bab3b85dd8cdcf90cb956ed40ad09cac2296 (diff)
downloadrabbitmq-server-e97c983c251463fa96e4adfde51f62bba6e611c3.tar.gz
Implemented. Seems to work well. Changed the design and behaviour of the backoff as discussed with Matthias over IM. Documented in comments, reproduced here:
%% 5) init can return a 4th arg, {backoff, InitialTimeout, %% MinimumTimeout, DesiredHibernatePeriod} (all in %% milliseconds). Then, on all callbacks which can return a timeout %% (including init), timeout can be 'hibernate'. When this is the %% case, the current timeout value will be used (initially, the %% InitialTimeout supplied from init). After this timeout has %% occurred, handle_pre_hibernate/1 will be called. If that returns %% {hibernate, State} then the process will be hibernated. Upon %% awaking, a new current timeout value will be calculated, and then %% handle_post_hibernate/1 will be called. The purpose is that the %% gen_server2 takes care of adjusting the current timeout value such %% that the process will increase the timeout value repeatedly if it %% is unable to sleep for the DesiredHibernatePeriod. If it is able to %% sleep for the DesiredHibernatePeriod it will decrease the current %% timeout down to the MinimumTimeout, so that the process is put to %% sleep sooner (and hopefully for longer). In short, should a process %% using this receive a burst of messages, it should not hibernate %% between those messages, but as the messages become less frequent, %% the process will not only hibernate, it will do so sooner after %% each message. %% %% Normal timeout values (i.e. not 'hibernate') can still be used, and %% if they are used then the handle_info(timeout, State) will be %% called as normal. In this case, returning 'hibernate' from %% handle_info(timeout, State) will not hibernate the process %% immediately, as it would if backoff wasn't being used. Instead %% it'll wait for the current timeout as described above, before %% calling handle_pre_hibernate(State).
-rw-r--r--src/gen_server2.erl190
-rw-r--r--src/rabbit_amqqueue_process.erl18
2 files changed, 140 insertions, 68 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 438f4097..be2c5730 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -21,22 +21,34 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
-%% 5) On return from init/1, the timeout value {binary, Min} creates a
-%% binary exponential timeout, where Min is the minimum number of
-%% milliseconds permitted, and is also used as the current timeout
-%% value. Returning from handle_* with the timeout value set to
-%% 'binary' will use the current binary timeout value. handle_info/2
-%% with the Info of 'timeout' will function normally, and supports the
-%% return value of {noreply, State, hibernate} which will hibernate
-%% the process. The current timeout value is:
+%% 5) init can return a 4th arg, {backoff, InitialTimeout,
+%% MinimumTimeout, DesiredHibernatePeriod} (all in
+%% milliseconds). Then, on all callbacks which can return a timeout
+%% (including init), timeout can be 'hibernate'. When this is the
+%% case, the current timeout value will be used (initially, the
+%% InitialTimeout supplied from init). After this timeout has
+%% occurred, handle_pre_hibernate/1 will be called. If that returns
+%% {hibernate, State} then the process will be hibernated. Upon
+%% awaking, a new current timeout value will be calculated, and then
+%% handle_post_hibernate/1 will be called. The purpose is that the
+%% gen_server2 takes care of adjusting the current timeout value such
+%% that the process will increase the timeout value repeatedly if it
+%% is unable to sleep for the DesiredHibernatePeriod. If it is able to
+%% sleep for the DesiredHibernatePeriod it will decrease the current
+%% timeout down to the MinimumTimeout, so that the process is put to
+%% sleep sooner (and hopefully for longer). In short, should a process
+%% using this receive a burst of messages, it should not hibernate
+%% between those messages, but as the messages become less frequent,
+%% the process will not only hibernate, it will do so sooner after
+%% each message.
%%
-%% a) doubled if the time spent in hibernation is < 4 * the current value;
-%% b) halved if the time spent in hibernation is > 16 * the current value;
-%% c) maintained in all other cases
-%%
-%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
-%% are still supported, and do not have any effect on the current
-%% timeout value.
+%% Normal timeout values (i.e. not 'hibernate') can still be used, and
+%% if they are used then the handle_info(timeout, State) will be
+%% called as normal. In this case, returning 'hibernate' from
+%% handle_info(timeout, State) will not hibernate the process
+%% immediately, as it would if backoff wasn't being used. Instead
+%% it'll wait for the current timeout as described above, before
+%% calling handle_pre_hibernate(State).
%% All modifications are (C) 2009 LShift Ltd.
@@ -72,6 +84,7 @@
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
+%%% {ok, State, Timeout, Backoff}
%%% ignore
%%% {stop, Reason}
%%%
@@ -103,6 +116,17 @@
%%%
%%% ==> ok
%%%
+%%% handle_pre_hibernate(State)
+%%%
+%%% ==> {hibernate, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% handle_post_hibernate(State)
+%%%
+%%% ==> {noreply, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% The work flow (of the server) can be described as follows:
%%%
@@ -133,7 +157,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7, wake_hib/8]).
-export([behaviour_info/1]).
@@ -307,7 +331,7 @@ multi_call(Nodes, Name, Req, Timeout)
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
%%
%% Description: Makes an existing process into a gen_server.
%% The calling process will enter the gen_server receive
@@ -318,21 +342,29 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), infinity).
+ enter_loop(Mod, Options, State, self(), infinity, undefined).
+
+enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity);
+ enter_loop(Mod, Options, State, ServerName, infinity, undefined);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), Timeout).
+ enter_loop(Mod, Options, State, self(), Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- {Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
+ loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -357,9 +389,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
- {Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue,
- Debug);
+ loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -397,12 +430,6 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
-build_timeout_state(Timeout) ->
- case Timeout of
- {binary, Min} -> {binary, {Min, Min, undefined}};
- _ -> {Timeout, undefined}
- end.
-
%%%========================================================================
%%% Internal functions
%%%========================================================================
@@ -412,10 +439,6 @@ build_timeout_state(Timeout) ->
loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, State, Mod, undefined, Queue, Debug]);
-loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue,
- Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
- {Current, Min, now()}, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
@@ -432,17 +455,27 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
- Time1 = case {Time, TimeoutState} of
- {binary, {Current, _Min, undefined}} -> Current;
- _ -> Time
- end,
+ {Time1, HibOnTimeout}
+ = case {Time, TimeoutState} of
+ {hibernate, {backoff, Current, _Min, _Desired}} ->
+ {Current, true};
+ _ -> {Time, false}
+ end,
receive
Input ->
loop(Parent, Name, State, Mod,
Time, TimeoutState, in(Input, Queue1), Debug)
after Time1 ->
- process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, timeout)
+ case HibOnTimeout of
+ true ->
+ backoff_pre_hibernate(
+ Parent, Name, State, Mod, TimeoutState, Queue1,
+ Debug);
+ false ->
+ process_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ Queue1, Debug, Hib, timeout)
+ end
end
end.
@@ -451,29 +484,62 @@ wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Input ->
Input
end,
- TimeoutState1 = adjust_hibernate_after(TimeoutState),
- process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
+ process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
in(Msg, Queue), Debug, true).
-adjust_hibernate_after(undefined) ->
- undefined;
-adjust_hibernate_after({Current, Min, HibernatedAt}) ->
- NapLengthMicros = timer:now_diff(now(), HibernatedAt),
- CurrentMicros = Current * 1000,
- LowTargetMicros = CurrentMicros * 4,
- HighTargetMicros = LowTargetMicros * 4,
- if
- NapLengthMicros < LowTargetMicros ->
- %% nap was too short, don't go to sleep as soon
- {Current * 2, Min, undefined};
-
- NapLengthMicros > HighTargetMicros ->
- %% nap was long, try going to sleep sooner
- {lists:max([Min, round(Current / 2)]), Min, undefined};
-
- true ->
- %% nap and timeout seem to be in the right relationship. stay here
- {Current, Min, undefined}
+wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
+ AwokeAt = now(),
+ Msg = receive
+ Input ->
+ Input
+ end,
+ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
+ TimeoutState, in(Msg, Queue), Debug).
+
+backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod,
+ now(), TimeoutState,
+ Queue, Debug]);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, pre_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, pre_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
+ State, [])
+ end.
+
+backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
+ {backoff, CurrentTO, MinimumTO, DesiredHibPeriod},
+ Queue, Debug) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ CurrentTO1 = case (NapLengthMicros + CurrentMicros) >
+ (MinimumMicros + DesiredHibMicros) of
+ true ->
+ lists:max([MinimumTO, round(CurrentTO/2)]);
+ false ->
+ CurrentTO + MinimumTO
+ end,
+ TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod},
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState,
+ Queue, Debug, true);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState,
+ Queue, Debug, true);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, post_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, post_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, post_hibernate, Mod,
+ State, [])
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9a9e75d6..6a30503e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,10 +37,12 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([handle_pre_hibernate/1, handle_post_hibernate/1]).
-import(queue).
-import(erlang).
@@ -101,7 +103,8 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, {binary, ?HIBERNATE_AFTER_MIN}}.
+ blocked_consumers = queue:new()}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -116,9 +119,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, binary}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
-noreply(NewState) -> {noreply, NewState, binary}.
+noreply(NewState) -> {noreply, NewState, hibernate}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -813,9 +816,12 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State) ->
- {noreply, State, hibernate};
-
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
+
+handle_pre_hibernate(State) ->
+ {hibernate, State}.
+
+handle_post_hibernate(State) ->
+ {noreply, State, hibernate}.