diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-07-29 14:10:59 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-07-29 14:10:59 +0100 |
commit | e97c983c251463fa96e4adfde51f62bba6e611c3 (patch) | |
tree | f92c16dfa3dcd89f4ecad40263ee21ba1b610682 | |
parent | e1e3bab3b85dd8cdcf90cb956ed40ad09cac2296 (diff) | |
download | rabbitmq-server-e97c983c251463fa96e4adfde51f62bba6e611c3.tar.gz |
Implemented. Seems to work well. Changed the design and behaviour of the backoff as discussed with Matthias over IM. Documented in comments, reproduced here:
%% 5) init can return a 4th arg, {backoff, InitialTimeout,
%% MinimumTimeout, DesiredHibernatePeriod} (all in
%% milliseconds). Then, on all callbacks which can return a timeout
%% (including init), timeout can be 'hibernate'. When this is the
%% case, the current timeout value will be used (initially, the
%% InitialTimeout supplied from init). After this timeout has
%% occurred, handle_pre_hibernate/1 will be called. If that returns
%% {hibernate, State} then the process will be hibernated. Upon
%% awaking, a new current timeout value will be calculated, and then
%% handle_post_hibernate/1 will be called. The purpose is that the
%% gen_server2 takes care of adjusting the current timeout value such
%% that the process will increase the timeout value repeatedly if it
%% is unable to sleep for the DesiredHibernatePeriod. If it is able to
%% sleep for the DesiredHibernatePeriod it will decrease the current
%% timeout down to the MinimumTimeout, so that the process is put to
%% sleep sooner (and hopefully for longer). In short, should a process
%% using this receive a burst of messages, it should not hibernate
%% between those messages, but as the messages become less frequent,
%% the process will not only hibernate, it will do so sooner after
%% each message.
%%
%% Normal timeout values (i.e. not 'hibernate') can still be used, and
%% if they are used then the handle_info(timeout, State) will be
%% called as normal. In this case, returning 'hibernate' from
%% handle_info(timeout, State) will not hibernate the process
%% immediately, as it would if backoff wasn't being used. Instead
%% it'll wait for the current timeout as described above, before
%% calling handle_pre_hibernate(State).
-rw-r--r-- | src/gen_server2.erl | 190 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
2 files changed, 140 insertions, 68 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 438f4097..be2c5730 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -21,22 +21,34 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% -%% 5) On return from init/1, the timeout value {binary, Min} creates a -%% binary exponential timeout, where Min is the minimum number of -%% milliseconds permitted, and is also used as the current timeout -%% value. Returning from handle_* with the timeout value set to -%% 'binary' will use the current binary timeout value. handle_info/2 -%% with the Info of 'timeout' will function normally, and supports the -%% return value of {noreply, State, hibernate} which will hibernate -%% the process. The current timeout value is: +%% 5) init can return a 4th arg, {backoff, InitialTimeout, +%% MinimumTimeout, DesiredHibernatePeriod} (all in +%% milliseconds). Then, on all callbacks which can return a timeout +%% (including init), timeout can be 'hibernate'. When this is the +%% case, the current timeout value will be used (initially, the +%% InitialTimeout supplied from init). After this timeout has +%% occurred, handle_pre_hibernate/1 will be called. If that returns +%% {hibernate, State} then the process will be hibernated. Upon +%% awaking, a new current timeout value will be calculated, and then +%% handle_post_hibernate/1 will be called. The purpose is that the +%% gen_server2 takes care of adjusting the current timeout value such +%% that the process will increase the timeout value repeatedly if it +%% is unable to sleep for the DesiredHibernatePeriod. If it is able to +%% sleep for the DesiredHibernatePeriod it will decrease the current +%% timeout down to the MinimumTimeout, so that the process is put to +%% sleep sooner (and hopefully for longer). In short, should a process +%% using this receive a burst of messages, it should not hibernate +%% between those messages, but as the messages become less frequent, +%% the process will not only hibernate, it will do so sooner after +%% each message. %% -%% a) doubled if the time spent in hibernation is < 4 * the current value; -%% b) halved if the time spent in hibernation is > 16 * the current value; -%% c) maintained in all other cases -%% -%% Explicit timeouts (i.e. not 'binary') from the handle_* functions -%% are still supported, and do not have any effect on the current -%% timeout value. +%% Normal timeout values (i.e. not 'hibernate') can still be used, and +%% if they are used then the handle_info(timeout, State) will be +%% called as normal. In this case, returning 'hibernate' from +%% handle_info(timeout, State) will not hibernate the process +%% immediately, as it would if backoff wasn't being used. Instead +%% it'll wait for the current timeout as described above, before +%% calling handle_pre_hibernate(State). %% All modifications are (C) 2009 LShift Ltd. @@ -72,6 +84,7 @@ %%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} +%%% {ok, State, Timeout, Backoff} %%% ignore %%% {stop, Reason} %%% @@ -103,6 +116,17 @@ %%% %%% ==> ok %%% +%%% handle_pre_hibernate(State) +%%% +%%% ==> {hibernate, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% handle_post_hibernate(State) +%%% +%%% ==> {noreply, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% The work flow (of the server) can be described as follows: %%% @@ -133,7 +157,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7, wake_hib/8]). -export([behaviour_info/1]). @@ -307,7 +331,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -318,21 +342,29 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), infinity, undefined). + +enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, infinity, undefined); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug). + loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -357,9 +389,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> proc_lib:init_ack(Starter, {ok, self()}), - {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, - Debug); + loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -397,12 +430,6 @@ unregister_name({global,Name}) -> unregister_name(Pid) when is_pid(Pid) -> Pid. -build_timeout_state(Timeout) -> - case Timeout of - {binary, Min} -> {binary, {Min, Min, undefined}}; - _ -> {Timeout, undefined} - end. - %%%======================================================================== %%% Internal functions %%%======================================================================== @@ -412,10 +439,6 @@ build_timeout_state(Timeout) -> loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> proc_lib:hibernate(?MODULE,wake_hib, [Parent, Name, State, Mod, undefined, Queue, Debug]); -loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, - Debug) -> - proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, - {Current, Min, now()}, Queue, Debug]); loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, @@ -432,17 +455,27 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue1, Debug, Hib, Msg); {empty, Queue1} -> - Time1 = case {Time, TimeoutState} of - {binary, {Current, _Min, undefined}} -> Current; - _ -> Time - end, + {Time1, HibOnTimeout} + = case {Time, TimeoutState} of + {hibernate, {backoff, Current, _Min, _Desired}} -> + {Current, true}; + _ -> {Time, false} + end, receive Input -> loop(Parent, Name, State, Mod, Time, TimeoutState, in(Input, Queue1), Debug) after Time1 -> - process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Hib, timeout) + case HibOnTimeout of + true -> + backoff_pre_hibernate( + Parent, Name, State, Mod, TimeoutState, Queue1, + Debug); + false -> + process_msg( + Parent, Name, State, Mod, Time, TimeoutState, + Queue1, Debug, Hib, timeout) + end end end. @@ -451,29 +484,62 @@ wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Input -> Input end, - TimeoutState1 = adjust_hibernate_after(TimeoutState), - process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, + process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState, in(Msg, Queue), Debug, true). -adjust_hibernate_after(undefined) -> - undefined; -adjust_hibernate_after({Current, Min, HibernatedAt}) -> - NapLengthMicros = timer:now_diff(now(), HibernatedAt), - CurrentMicros = Current * 1000, - LowTargetMicros = CurrentMicros * 4, - HighTargetMicros = LowTargetMicros * 4, - if - NapLengthMicros < LowTargetMicros -> - %% nap was too short, don't go to sleep as soon - {Current * 2, Min, undefined}; - - NapLengthMicros > HighTargetMicros -> - %% nap was long, try going to sleep sooner - {lists:max([Min, round(Current / 2)]), Min, undefined}; - - true -> - %% nap and timeout seem to be in the right relationship. stay here - {Current, Min, undefined} +wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) -> + AwokeAt = now(), + Msg = receive + Input -> + Input + end, + backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, + TimeoutState, in(Msg, Queue), Debug). + +backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod, + now(), TimeoutState, + Queue, Debug]); + {stop, Reason, NState} -> + terminate(Reason, Name, pre_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, pre_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, + State, []) + end. + +backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, + {backoff, CurrentTO, MinimumTO, DesiredHibPeriod}, + Queue, Debug) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + CurrentTO1 = case (NapLengthMicros + CurrentMicros) > + (MinimumMicros + DesiredHibMicros) of + true -> + lists:max([MinimumTO, round(CurrentTO/2)]); + false -> + CurrentTO + MinimumTO + end, + TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod}, + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState, + Queue, Debug, true); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState, + Queue, Debug, true); + {stop, Reason, NState} -> + terminate(Reason, Name, post_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, post_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, post_hibernate, Mod, + State, []) end. in({'$gen_pcast', {Priority, Msg}}, Queue) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9a9e75d6..6a30503e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -37,10 +37,12 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([handle_pre_hibernate/1, handle_post_hibernate/1]). -import(queue). -import(erlang). @@ -101,7 +103,8 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, {binary, ?HIBERNATE_AFTER_MIN}}. + blocked_consumers = queue:new()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +119,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, binary}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. -noreply(NewState) -> {noreply, NewState, binary}. +noreply(NewState) -> {noreply, NewState, hibernate}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -813,9 +816,12 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> - {noreply, State, hibernate}; - handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. + +handle_pre_hibernate(State) -> + {hibernate, State}. + +handle_post_hibernate(State) -> + {noreply, State, hibernate}. |