From 665eda30bf499749fb187fc9a4cc97ad30fcdb23 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Tue, 6 Oct 2009 18:21:58 +0100 Subject: Memory monitor code. --- src/rabbit.erl | 3 +- src/rabbit_memory_monitor.erl | 207 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 src/rabbit_memory_monitor.erl diff --git a/src/rabbit.erl b/src/rabbit.erl index 18fd1b17..01e06db3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -146,7 +146,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) + ok = start_child(rabbit_node_monitor), + ok = start_child(rabbit_memory_monitor) end}, {"recovery", fun () -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl new file mode 100644 index 00000000..8bdd394b --- /dev/null +++ b/src/rabbit_memory_monitor.erl @@ -0,0 +1,207 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + + +%% This module handles the node-wide memory statistics. +%% It receives statistics from all queues, counts the desired +%% queue length (in seconds), and sends this information back to +%% queues. +%% +%% Normally, messages are exchanged like that: +%% +%% (1) (2) (3) +%% Timer | | +%% v v +%% Queue -----+--------+-----<***hibernated***>-------------> +%% | ^ | ^ ^ +%% v | v | | +%% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> +%% +%% Or to put it in words. Queue periodically sends (casts) 'push_drain_ratio' +%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor +%% _always_ replies with a 'set_bufsec_limit' cast. This way, +%% we're pretty sure that the Queue is not hibernated. +%% Monitor periodically recounts numbers ('X' on asciiart). If, during this +%% update we notice that a queue was using too much memory, we send a message +%% back. This will happen even if the queue is hibernated, as we really do want +%% it to reduce its memory footprint. + + +-module(rabbit_memory_monitor). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-export([register/1]). + +-record(state, {timer, %% 'internal_update' timer + drain_dict, %% dict, queue_pid:seconds_till_queue_is_empty + drain_avg, %% global, the desired queue depth (in seconds) + memory_limit %% how much memory we intend to use + }). + +-define(SERVER, ?MODULE). +-define(DEFAULT_UPDATE_INTERVAL_MS, 2500). + +%% Enable debug reports in stdout: +-define(debug, true). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +update() -> + gen_server2:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +register(Pid) -> + gen_server2:cast(?SERVER, {register, Pid}). + +%%---------------------------------------------------------------------------- + +init([]) -> + %% TODO: References to os_mon and rabbit_memsup_linux + %% should go away as bug 21457 removes it. + %% BTW: memsup:get_system_memory_data() doesn't work. + {state, TotalMemory, _Allocated} = rabbit_memsup_linux:update({state, 0,0}), + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + ?SERVER, update, []), + MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), + MemoryLimit = erlang:trunc(TotalMemory * MemoryHighWatermark), + {ok, #state{timer = TRef, + drain_dict = dict:new(), + drain_avg = infinity, + memory_limit = MemoryLimit}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast({register, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State}; + +handle_cast({push_drain_ratio, Pid, DrainRatio}, State) -> + gen_server2:cast(Pid, {set_bufsec_limit, State#state.drain_avg}), + {noreply, State#state{drain_dict = + dict:store(Pid, DrainRatio, State#state.drain_dict)}}; + +handle_cast(_Request, State) -> + {noreply, State}. + + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + {noreply, State#state{drain_dict = dict:erase(Pid, State#state.drain_dict)}}; + +handle_info(_Info, State) -> + {noreply, State}. + + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-ifdef(debug). +ftoa(Float) -> + Str = case is_float(Float) of + true -> io_lib:format("~11.3f",[Float]); + false -> io_lib:format("~p", [Float]) + end, + lists:flatten(Str). + +print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, + PerQueueSeconds, QueueSec) -> + io:format("Update ~s/~s ~s/~s PerQueueSeconds:~s ~s~n", + [ftoa(UsedSeconds), ftoa(AvailableSeconds), + ftoa(UsedMemory/1024.0/1024.0), ftoa(TotalMemory/1024.0/1024.0), + ftoa(PerQueueSeconds), + [" "] ++ lists:flatten([ftoa(Q)++" " || Q <- QueueSec]) + ]). +-else. +print_debug_info(_UsedSeconds, _AvailableSeconds, _UsedMemory, _TotalMemory, + _PerQueueSeconds, _QueueSec) -> + ok. + +-endif. + +internal_update(State) -> + UsedMemory = erlang:memory(total), + TotalMemory = State#state.memory_limit, + QueueSec = [V || {_K, V} <- dict:to_list(State#state.drain_dict) ], + UsedSeconds = lists:sum( lists:filter(fun (A) -> + is_number(A) or is_float(A) + end, + QueueSec) ), + AvailableSeconds = case UsedSeconds of + 0 -> infinity; + 0.0 -> infinity; + _ -> TotalMemory / (UsedMemory / UsedSeconds) + end, + QueuesNumber = dict:size(State#state.drain_dict), + PerQueueSeconds = case (QueuesNumber > 0) and (AvailableSeconds /= infinity) of + true -> AvailableSeconds / QueuesNumber; + false -> infinity + end, + print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, + PerQueueSeconds, QueueSec), + %% Inform the queue to reduce it's memory usage when needed. + %% This can sometimes wake the queue from hibernation. Well, we don't care. + ReduceMemory = fun ({Pid, QueueS}) -> + case QueueS > PerQueueSeconds of + true -> + gen_server2:cast(Pid, {set_bufsec_limit, PerQueueSeconds}); + _ -> ok + end + end, + lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)), + State#state{drain_avg = PerQueueSeconds}. + + -- cgit v1.2.1 From cd9371e17e8e64072dd04f6b10de3e05e1d52fe9 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Tue, 6 Oct 2009 18:22:36 +0100 Subject: Changes to amqqueue_process required to proove that the code works. --- src/rabbit_amqqueue_process.erl | 65 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 3 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509..fa3d17a8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,6 +42,7 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([send_memory_monitor_update/1]). -import(queue). -import(erlang). @@ -55,12 +56,18 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + drain_ratio}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(ratio, {ratio, %% float. messages/microsecond_us + t0, %% previous timestamp (us) + next_msg_id %% previous next_msg_id + }). + %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, @@ -92,9 +99,15 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- +now_us() -> + {Megaseconds,Seconds,Microseconds} = erlang:now(), + Megaseconds * 1000000 * 1000000 + Seconds * 1000000 + Microseconds. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + rabbit_memory_monitor:register(self()), + %% Beware. This breaks hibernation! + timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -102,7 +115,11 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + drain_ratio = #ratio{ratio = 0.0, + t0 = now_us(), + next_msg_id = 1} + }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> @@ -797,7 +814,49 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast(send_memory_monitor_update, State) -> + DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), + MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec + BufSec = case MsgSec < 0.016 of %% less than 1 msg/1 minute + true -> infinity; + false -> queue:len(State#q.message_buffer) / MsgSec + end, + gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}), + noreply(State#q{drain_ratio = DrainRatio1}); + +handle_cast({set_bufsec_limit, BufSec}, State) -> + DrainRatio = State#q.drain_ratio, + DesiredQueueLength = case BufSec of + infinity -> infinity; + _ -> BufSec * DrainRatio#ratio.ratio * 1000000 + end, + %% Just to proove that something is happening. + io:format("Queue size is ~8p, should be ~p~n", + [queue:len(State#q.message_buffer), DesiredQueueLength]), + noreply(State). + + +%% Based on kernel load average, as descibed: +%% http://www.teamquest.com/resources/gunther/display/5/ +calc_load(Load, Exp, N) -> + Load*Exp + N*(1.0-Exp). + +update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) -> + T1 = now_us(), + Td = T1 - T0, + MsgCount = MsgCount1 - MsgCount0, + MsgUSec = MsgCount / Td, % msg/usec + %% Td is in usec. We're interested in "load average" from last 30 seconds. + Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), + + #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. + + +send_memory_monitor_update(Pid) -> + gen_server2:cast(Pid, send_memory_monitor_update). + handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> -- cgit v1.2.1 From 3270c8e1c60ca72bd2c7ece7eef6fc83c45981f1 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Wed, 7 Oct 2009 16:27:41 +0100 Subject: Few more lines of comments --- src/rabbit_memory_monitor.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 8bdd394b..ebbae94a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -53,6 +53,20 @@ %% update we notice that a queue was using too much memory, we send a message %% back. This will happen even if the queue is hibernated, as we really do want %% it to reduce its memory footprint. +%% +%% +%% The main job of this module, is to make sure that all the queues have +%% more or less the same number of seconds till become drained. +%% This average, seconds-till-queue-is-drained, is then multiplied by +%% the ratio of Used/Total memory. So, if we can 'afford' more memory to be +%% used, we'll report greater number back to the queues. In the out of +%% memory case, we are going to reduce the average drain-seconds. +%% To acheive all this we need to accumulate the information from every +%% queue, and count an average from that. +%% +%% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) +%% memory_overcommit = used_memory / allowed_memory +%% desired_drain_avg = memory_overcommit * real_drain_avg -module(rabbit_memory_monitor). -- cgit v1.2.1 From 9e23636552326c807e4e1fa5618cace2c6e75f9e Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Wed, 7 Oct 2009 18:16:48 +0100 Subject: Rewritten the couting, hopefully, it's simplified now --- src/rabbit_memory_monitor.erl | 81 +++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index ebbae94a..e878edda 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -66,7 +66,7 @@ %% %% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) %% memory_overcommit = used_memory / allowed_memory -%% desired_drain_avg = memory_overcommit * real_drain_avg +%% desired_drain_avg = real_drain_avg / memory_overcommit -module(rabbit_memory_monitor). @@ -115,16 +115,30 @@ register(Pid) -> %%---------------------------------------------------------------------------- -init([]) -> +get_user_memory_limit() -> %% TODO: References to os_mon and rabbit_memsup_linux %% should go away as bug 21457 removes it. %% BTW: memsup:get_system_memory_data() doesn't work. {state, TotalMemory, _Allocated} = rabbit_memsup_linux:update({state, 0,0}), + MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), + Limit = erlang:trunc(TotalMemory * MemoryHighWatermark), + %% no more than two gigs on 32 bits. + case (Limit > 2*1024*1024*1024) and (erlang:system_info(wordsize) == 4) of + true -> 2*1024*1024*1024; + false -> Limit + end. + + +init([]) -> + %% We should never use more memory than user requested. As the memory + %% manager doesn't really know how much memory queues are using, we shall + %% try to remain safe distance from real limit. + MemoryLimit = get_user_memory_limit() * 0.6, + rabbit_log:warning("Memory monitor limit: ~pMB~n", + [erlang:trunc(MemoryLimit/1024/1024)]), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, ?SERVER, update, []), - MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), - MemoryLimit = erlang:trunc(TotalMemory * MemoryHighWatermark), {ok, #state{timer = TRef, drain_dict = dict:new(), drain_avg = infinity, @@ -171,51 +185,50 @@ ftoa(Float) -> end, lists:flatten(Str). -print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, - PerQueueSeconds, QueueSec) -> - io:format("Update ~s/~s ~s/~s PerQueueSeconds:~s ~s~n", - [ftoa(UsedSeconds), ftoa(AvailableSeconds), - ftoa(UsedMemory/1024.0/1024.0), ftoa(TotalMemory/1024.0/1024.0), - ftoa(PerQueueSeconds), - [" "] ++ lists:flatten([ftoa(Q)++" " || Q <- QueueSec]) - ]). +print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit) -> + io:format("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", + [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), + ftoa(MemoryOvercommit)]). -else. -print_debug_info(_UsedSeconds, _AvailableSeconds, _UsedMemory, _TotalMemory, - _PerQueueSeconds, _QueueSec) -> +print_debug_info(_RealDrainAvg, _DesiredDrainAvg, _MemoryOvercommit) -> ok. -endif. +%% Count average from numbers, excluding atoms in the list. +count_average(List) -> + List1 = [V || V <- List, is_number(V) or is_float(V)], + case length(List1) of + 0 -> infinity; + Len -> lists:sum(List1) / Len + end. + internal_update(State) -> - UsedMemory = erlang:memory(total), - TotalMemory = State#state.memory_limit, - QueueSec = [V || {_K, V} <- dict:to_list(State#state.drain_dict) ], - UsedSeconds = lists:sum( lists:filter(fun (A) -> - is_number(A) or is_float(A) - end, - QueueSec) ), - AvailableSeconds = case UsedSeconds of + %% used memory / available memory + MemoryOvercommit = erlang:memory(total) / State#state.memory_limit, + + RealDrainAvg = count_average([V || {_K, V} <- + dict:to_list(State#state.drain_dict)]), + %% In case of no active queues, feel free to grow. We can't make any + %% decisionswe have no clue what is the average ram_usage/second. + %% Not does the queue. + DesiredDrainAvg = case RealDrainAvg of + infinity -> infinity; 0 -> infinity; 0.0 -> infinity; - _ -> TotalMemory / (UsedMemory / UsedSeconds) - end, - QueuesNumber = dict:size(State#state.drain_dict), - PerQueueSeconds = case (QueuesNumber > 0) and (AvailableSeconds /= infinity) of - true -> AvailableSeconds / QueuesNumber; - false -> infinity + _ -> RealDrainAvg / MemoryOvercommit end, - print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, - PerQueueSeconds, QueueSec), + print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit), %% Inform the queue to reduce it's memory usage when needed. %% This can sometimes wake the queue from hibernation. Well, we don't care. - ReduceMemory = fun ({Pid, QueueS}) -> - case QueueS > PerQueueSeconds of + ReduceMemory = fun ({Pid, QueueDrain}) -> + case QueueDrain > DesiredDrainAvg of true -> - gen_server2:cast(Pid, {set_bufsec_limit, PerQueueSeconds}); + gen_server2:cast(Pid, {set_bufsec_limit, DesiredDrainAvg}); _ -> ok end end, lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)), - State#state{drain_avg = PerQueueSeconds}. + State#state{drain_avg = DesiredDrainAvg}. -- cgit v1.2.1 From 504c65eb7ccdd77d7ca30360deb943fa34dd7625 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Thu, 22 Oct 2009 09:00:57 -0400 Subject: QA: specs, and LOGDEBUGS --- src/rabbit_amqqueue_process.erl | 2 +- src/rabbit_memory_monitor.erl | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fa3d17a8..d0123989 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -833,7 +833,7 @@ handle_cast({set_bufsec_limit, BufSec}, State) -> _ -> BufSec * DrainRatio#ratio.ratio * 1000000 end, %% Just to proove that something is happening. - io:format("Queue size is ~8p, should be ~p~n", + ?LOGDEBUG("Queue size is ~8p, should be ~p~n", [queue:len(State#q.message_buffer), DesiredQueueLength]), noreply(State). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index e878edda..87ee96ad 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -70,6 +70,7 @@ -module(rabbit_memory_monitor). +-include("rabbit.hrl"). -behaviour(gen_server2). @@ -91,13 +92,20 @@ -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL_MS, 2500). -%% Enable debug reports in stdout: --define(debug, true). - %%---------------------------------------------------------------------------- - -ifdef(use_specs). +-spec(start_link/0 :: () -> 'ignore' | {'error',_} | {'ok',pid()}). +-spec(register/1 :: (pid()) -> ok). + +-spec(init/1 :: ([]) -> {ok, #state{}}). + +-ifdef(debug). +-spec(ftoa/1 :: (any()) -> string()). +-endif. + +-spec(count_average/1 :: (list()) -> float() | infinity ). +-spec(internal_update/1 :: (#state{}) -> #state{}). -endif. %%---------------------------------------------------------------------------- @@ -133,7 +141,7 @@ init([]) -> %% We should never use more memory than user requested. As the memory %% manager doesn't really know how much memory queues are using, we shall %% try to remain safe distance from real limit. - MemoryLimit = get_user_memory_limit() * 0.6, + MemoryLimit = trunc(get_user_memory_limit() * 0.6), rabbit_log:warning("Memory monitor limit: ~pMB~n", [erlang:trunc(MemoryLimit/1024/1024)]), @@ -184,15 +192,6 @@ ftoa(Float) -> false -> io_lib:format("~p", [Float]) end, lists:flatten(Str). - -print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit) -> - io:format("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", - [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), - ftoa(MemoryOvercommit)]). --else. -print_debug_info(_RealDrainAvg, _DesiredDrainAvg, _MemoryOvercommit) -> - ok. - -endif. %% Count average from numbers, excluding atoms in the list. @@ -214,11 +213,12 @@ internal_update(State) -> %% Not does the queue. DesiredDrainAvg = case RealDrainAvg of infinity -> infinity; - 0 -> infinity; 0.0 -> infinity; _ -> RealDrainAvg / MemoryOvercommit end, - print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit), + ?LOGDEBUG("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", + [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), + ftoa(MemoryOvercommit)]), %% Inform the queue to reduce it's memory usage when needed. %% This can sometimes wake the queue from hibernation. Well, we don't care. ReduceMemory = fun ({Pid, QueueDrain}) -> -- cgit v1.2.1 From 155dab5b8d4cdf6cbea0b3a229cfff3f54ed6db5 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Thu, 22 Oct 2009 12:13:14 -0400 Subject: QA: changed names to: queue_duration, changed MemoryOvercommit to be available/used (instead of used/available) --- src/rabbit_amqqueue_process.erl | 2 +- src/rabbit_memory_monitor.erl | 53 ++++++++++++++++++++++------------------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d0123989..a5400254 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -823,7 +823,7 @@ handle_cast(send_memory_monitor_update, State) -> true -> infinity; false -> queue:len(State#q.message_buffer) / MsgSec end, - gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}), + rabbit_memory_monitor:push_queue_duration(self(), BufSec), noreply(State#q{drain_ratio = DrainRatio1}); handle_cast({set_bufsec_limit, BufSec}, State) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 87ee96ad..8c1db615 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -45,7 +45,7 @@ %% v | v | | %% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> %% -%% Or to put it in words. Queue periodically sends (casts) 'push_drain_ratio' +%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' %% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor %% _always_ replies with a 'set_bufsec_limit' cast. This way, %% we're pretty sure that the Queue is not hibernated. @@ -58,15 +58,15 @@ %% The main job of this module, is to make sure that all the queues have %% more or less the same number of seconds till become drained. %% This average, seconds-till-queue-is-drained, is then multiplied by -%% the ratio of Used/Total memory. So, if we can 'afford' more memory to be +%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be %% used, we'll report greater number back to the queues. In the out of %% memory case, we are going to reduce the average drain-seconds. %% To acheive all this we need to accumulate the information from every %% queue, and count an average from that. %% -%% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) -%% memory_overcommit = used_memory / allowed_memory -%% desired_drain_avg = real_drain_avg / memory_overcommit +%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) +%% memory_overcommit = allowed_memory / used_memory +%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit -module(rabbit_memory_monitor). @@ -81,12 +81,12 @@ -export([update/0]). --export([register/1]). +-export([register/1, push_queue_duration/2]). --record(state, {timer, %% 'internal_update' timer - drain_dict, %% dict, queue_pid:seconds_till_queue_is_empty - drain_avg, %% global, the desired queue depth (in seconds) - memory_limit %% how much memory we intend to use +-record(state, {timer, %% 'internal_update' timer + queue_duration_dict, %% dict, qpid:seconds_till_queue_is_empty + queue_duration_avg, %% global, the desired queue depth (in sec) + memory_limit %% how much memory we intend to use }). -define(SERVER, ?MODULE). @@ -97,6 +97,7 @@ -spec(start_link/0 :: () -> 'ignore' | {'error',_} | {'ok',pid()}). -spec(register/1 :: (pid()) -> ok). +-spec(push_queue_duration/2 :: (pid(), float() | infinity) -> ok). -spec(init/1 :: ([]) -> {ok, #state{}}). @@ -121,6 +122,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). +push_queue_duration(Pid, BufSec) -> + gen_server2:cast(rabbit_memory_monitor, {push_queue_duration, Pid, BufSec}). + %%---------------------------------------------------------------------------- get_user_memory_limit() -> @@ -148,8 +152,8 @@ init([]) -> {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, ?SERVER, update, []), {ok, #state{timer = TRef, - drain_dict = dict:new(), - drain_avg = infinity, + queue_duration_dict = dict:new(), + queue_duration_avg = infinity, memory_limit = MemoryLimit}}. handle_call(_Request, _From, State) -> @@ -163,17 +167,18 @@ handle_cast({register, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}; -handle_cast({push_drain_ratio, Pid, DrainRatio}, State) -> - gen_server2:cast(Pid, {set_bufsec_limit, State#state.drain_avg}), - {noreply, State#state{drain_dict = - dict:store(Pid, DrainRatio, State#state.drain_dict)}}; +handle_cast({push_queue_duration, Pid, DrainRatio}, State) -> + gen_server2:cast(Pid, {set_bufsec_limit, State#state.queue_duration_avg}), + {noreply, State#state{queue_duration_dict = + dict:store(Pid, DrainRatio, State#state.queue_duration_dict)}}; handle_cast(_Request, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {noreply, State#state{drain_dict = dict:erase(Pid, State#state.drain_dict)}}; + {noreply, State#state{queue_duration_dict = + dict:erase(Pid, State#state.queue_duration_dict)}}; handle_info(_Info, State) -> {noreply, State}. @@ -203,18 +208,18 @@ count_average(List) -> end. internal_update(State) -> - %% used memory / available memory - MemoryOvercommit = erlang:memory(total) / State#state.memory_limit, - + %% available memory / used memory + UsedMemory = erlang:memory(total), + MemoryOvercommit = State#state.memory_limit / UsedMemory, RealDrainAvg = count_average([V || {_K, V} <- - dict:to_list(State#state.drain_dict)]), + dict:to_list(State#state.queue_duration_dict)]), %% In case of no active queues, feel free to grow. We can't make any %% decisionswe have no clue what is the average ram_usage/second. %% Not does the queue. DesiredDrainAvg = case RealDrainAvg of infinity -> infinity; 0.0 -> infinity; - _ -> RealDrainAvg / MemoryOvercommit + _ -> RealDrainAvg * MemoryOvercommit end, ?LOGDEBUG("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), @@ -228,7 +233,7 @@ internal_update(State) -> _ -> ok end end, - lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)), - State#state{drain_avg = DesiredDrainAvg}. + lists:map(ReduceMemory, dict:to_list(State#state.queue_duration_dict)), + State#state{queue_duration_avg = DesiredDrainAvg}. -- cgit v1.2.1 From c6ceacd274f52ccbb40d1e4ace14e17b1939de7e Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Fri, 23 Oct 2009 10:59:29 -0400 Subject: dict->ets, and a refactoring --- src/rabbit_amqqueue_process.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a5400254..d402ef97 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -826,11 +826,11 @@ handle_cast(send_memory_monitor_update, State) -> rabbit_memory_monitor:push_queue_duration(self(), BufSec), noreply(State#q{drain_ratio = DrainRatio1}); -handle_cast({set_bufsec_limit, BufSec}, State) -> +handle_cast({set_queue_duration, QueueDuration}, State) -> DrainRatio = State#q.drain_ratio, - DesiredQueueLength = case BufSec of + DesiredQueueLength = case QueueDuration of infinity -> infinity; - _ -> BufSec * DrainRatio#ratio.ratio * 1000000 + _ -> QueueDuration * DrainRatio#ratio.ratio * 1000000 end, %% Just to proove that something is happening. ?LOGDEBUG("Queue size is ~8p, should be ~p~n", -- cgit v1.2.1 From a1e8ba425ab8cf0a3c882ce14a059f3bcd5f09cd Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Mon, 26 Oct 2009 12:54:48 -0400 Subject: Major refactoring including ets, saving last-send-value and call instead of cast. --- src/rabbit_amqqueue_process.erl | 18 +++-- src/rabbit_memory_monitor.erl | 169 ++++++++++++++++++++++++++-------------- 2 files changed, 120 insertions(+), 67 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d402ef97..3cedfd20 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -819,22 +819,26 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec - BufSec = case MsgSec < 0.016 of %% less than 1 msg/1 minute + QueueDuration = case MsgSec < 0.016 of %% less than 1 msg/1 minute true -> infinity; false -> queue:len(State#q.message_buffer) / MsgSec end, - rabbit_memory_monitor:push_queue_duration(self(), BufSec), + DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration( + self(), QueueDuration), + ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", + [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), noreply(State#q{drain_ratio = DrainRatio1}); -handle_cast({set_queue_duration, QueueDuration}, State) -> +handle_cast({set_queue_duration, DesiredQueueDuration}, State) -> DrainRatio = State#q.drain_ratio, - DesiredQueueLength = case QueueDuration of + DesiredBufLength = case DesiredQueueDuration of infinity -> infinity; - _ -> QueueDuration * DrainRatio#ratio.ratio * 1000000 + _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 end, %% Just to proove that something is happening. - ?LOGDEBUG("Queue size is ~8p, should be ~p~n", - [queue:len(State#q.message_buffer), DesiredQueueLength]), + ?LOGDEBUG("~p Queue length is~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + DesiredBufLength]), noreply(State). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 8c1db615..db4949e4 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -41,13 +41,13 @@ %% Timer | | %% v v %% Queue -----+--------+-----<***hibernated***>-------------> -%% | ^ | ^ ^ +%% | ^ | ^ ^ %% v | v | | %% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> %% %% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' %% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor -%% _always_ replies with a 'set_bufsec_limit' cast. This way, +%% _always_ replies with a 'set_queue_duration' cast. This way, %% we're pretty sure that the Queue is not hibernated. %% Monitor periodically recounts numbers ('X' on asciiart). If, during this %% update we notice that a queue was using too much memory, we send a message @@ -84,29 +84,37 @@ -export([register/1, push_queue_duration/2]). -record(state, {timer, %% 'internal_update' timer - queue_duration_dict, %% dict, qpid:seconds_till_queue_is_empty - queue_duration_avg, %% global, the desired queue depth (in sec) - memory_limit %% how much memory we intend to use + queue_durations, %% ets, (qpid, seconds_till_queue_is_empty) + queue_duration_sum, %% sum of all queue_durations + queue_duration_items,%% number of elements in sum + memory_limit, %% how much memory we intend to use + memory_ratio %% how much more memory we can use }). -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL_MS, 2500). - +-define(TABLE_NAME, ?MODULE). +-define(MAX_QUEUE_DURATION_ALLOWED, 60*60*24). % 1 day %%---------------------------------------------------------------------------- -ifdef(use_specs). - --spec(start_link/0 :: () -> 'ignore' | {'error',_} | {'ok',pid()}). +-type(state() :: #state{timer :: timer:tref(), + queue_durations :: tid(), + queue_duration_sum :: float(), + queue_duration_items:: non_neg_integer(), + memory_limit :: pos_integer(), + memory_ratio :: float() }). + +-spec(start_link/0 :: () -> ignore | {error, _} | {ok, pid()}). -spec(register/1 :: (pid()) -> ok). -spec(push_queue_duration/2 :: (pid(), float() | infinity) -> ok). --spec(init/1 :: ([]) -> {ok, #state{}}). +-spec(init/1 :: ([]) -> {ok, state()}). -ifdef(debug). -spec(ftoa/1 :: (any()) -> string()). -endif. --spec(count_average/1 :: (list()) -> float() | infinity ). --spec(internal_update/1 :: (#state{}) -> #state{}). +-spec(internal_update/1 :: (state()) -> state()). -endif. %%---------------------------------------------------------------------------- @@ -122,8 +130,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). -push_queue_duration(Pid, BufSec) -> - gen_server2:cast(rabbit_memory_monitor, {push_queue_duration, Pid, BufSec}). +push_queue_duration(Pid, QueueDuration) -> + gen_server2:call(rabbit_memory_monitor, + {push_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- @@ -141,20 +150,58 @@ get_user_memory_limit() -> end. -init([]) -> +init([]) -> %% We should never use more memory than user requested. As the memory %% manager doesn't really know how much memory queues are using, we shall %% try to remain safe distance from real limit. MemoryLimit = trunc(get_user_memory_limit() * 0.6), rabbit_log:warning("Memory monitor limit: ~pMB~n", - [erlang:trunc(MemoryLimit/1024/1024)]), - + [erlang:trunc(MemoryLimit/1048576)]), + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, ?SERVER, update, []), {ok, #state{timer = TRef, - queue_duration_dict = dict:new(), - queue_duration_avg = infinity, - memory_limit = MemoryLimit}}. + queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_duration_sum = 0.0, + queue_duration_items = 0, + memory_limit = MemoryLimit, + memory_ratio = 1.0}}. + +get_avg_duration(#state{queue_duration_sum = Sum, + queue_duration_items = Items}) -> + case Items of + 0 -> infinity; + _ -> Sum / Items + end. + +get_desired_duration(State) -> + case get_avg_duration(State) of + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * State#state.memory_ratio + end. + +handle_call({push_queue_duration, Pid, QueueDuration0}, From, State) -> + SendDuration = get_desired_duration(State), + gen_server2:reply(From, SendDuration), + + QueueDuration = case QueueDuration0 > ?MAX_QUEUE_DURATION_ALLOWED of + true -> infinity; + false -> QueueDuration0 + end, + + {Sum, Items} = {State#state.queue_duration_sum, + State#state.queue_duration_items}, + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), + {Sum1, Items1} = + case {PrevQueueDuration == infinity, QueueDuration == infinity} of + {true, true} -> {Sum, Items}; + {true, false} -> {Sum + QueueDuration, Items + 1}; + {false, true} -> {Sum - PrevQueueDuration, Items - 1}; + {false, false} -> {Sum - PrevQueueDuration + QueueDuration, Items} + end, + ets:insert(State#state.queue_durations, {Pid, QueueDuration, SendDuration}), + {noreply, State#state{queue_duration_sum = Sum1, + queue_duration_items = Items1}}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -165,20 +212,24 @@ handle_cast(update, State) -> handle_cast({register, Pid}, State) -> _MRef = erlang:monitor(process, Pid), + ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), {noreply, State}; -handle_cast({push_queue_duration, Pid, DrainRatio}, State) -> - gen_server2:cast(Pid, {set_bufsec_limit, State#state.queue_duration_avg}), - {noreply, State#state{queue_duration_dict = - dict:store(Pid, DrainRatio, State#state.queue_duration_dict)}}; - handle_cast(_Request, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {noreply, State#state{queue_duration_dict = - dict:erase(Pid, State#state.queue_duration_dict)}}; + {Sum, Items} = {State#state.queue_duration_sum, + State#state.queue_duration_items}, + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), + Sum1 = case PrevQueueDuration == infinity of + true -> Sum; + false -> Sum - PrevQueueDuration + end, + ets:delete(State#state.queue_durations, Pid), + {noreply, State#state{queue_duration_sum = Sum1, + queue_duration_items = Items-1}}; handle_info(_Info, State) -> {noreply, State}. @@ -190,7 +241,11 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. --ifdef(debug). + +set_queue_duration(Pid, QueueDuration) -> + gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). + +-ifdef(debug). ftoa(Float) -> Str = case is_float(Float) of true -> io_lib:format("~11.3f",[Float]); @@ -199,41 +254,35 @@ ftoa(Float) -> lists:flatten(Str). -endif. -%% Count average from numbers, excluding atoms in the list. -count_average(List) -> - List1 = [V || V <- List, is_number(V) or is_float(V)], - case length(List1) of - 0 -> infinity; - Len -> lists:sum(List1) / Len - end. -internal_update(State) -> +%% Update memory ratio. Count new DesiredQueueDuration. +%% Get queues that are using more than that, and send +%% pessimistic information back to them. +internal_update(State0) -> %% available memory / used memory - UsedMemory = erlang:memory(total), - MemoryOvercommit = State#state.memory_limit / UsedMemory, - RealDrainAvg = count_average([V || {_K, V} <- - dict:to_list(State#state.queue_duration_dict)]), - %% In case of no active queues, feel free to grow. We can't make any - %% decisionswe have no clue what is the average ram_usage/second. - %% Not does the queue. - DesiredDrainAvg = case RealDrainAvg of - infinity -> infinity; - 0.0 -> infinity; - _ -> RealDrainAvg * MemoryOvercommit - end, - ?LOGDEBUG("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", - [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), - ftoa(MemoryOvercommit)]), - %% Inform the queue to reduce it's memory usage when needed. - %% This can sometimes wake the queue from hibernation. Well, we don't care. - ReduceMemory = fun ({Pid, QueueDrain}) -> - case QueueDrain > DesiredDrainAvg of - true -> - gen_server2:cast(Pid, {set_bufsec_limit, DesiredDrainAvg}); - _ -> ok - end + MemoryRatio = State0#state.memory_limit / erlang:memory(total), + State = State0#state{memory_ratio = MemoryRatio}, + + DesiredDurationAvg = get_desired_duration(State), + + ?LOGDEBUG("Avg duration: real/desired:~s/~s Memory ratio:~s Queues:~p~n", + [ftoa(get_avg_duration(State)), ftoa(DesiredDurationAvg), + ftoa(MemoryRatio), + ets:foldl(fun (_, Acc) -> Acc+1 end, + 0, State#state.queue_durations)] ), + + %% If we have pessimistic information, we need to inform queues + %% to reduce it's memory usage when needed. + %% This sometimes wakes up queues from hibernation. Well, we don't care. + PromptReduceDuraton = fun ({Pid, QueueDuration, PrevSendDuration}, Acc) -> + case (PrevSendDuration > DesiredDurationAvg) and (QueueDuration > DesiredDurationAvg) of + true -> set_queue_duration(Pid, DesiredDurationAvg), + ets:insert(State#state.queue_durations, {Pid, QueueDuration, DesiredDurationAvg}), + Acc + 1; + _ -> Acc + end end, - lists:map(ReduceMemory, dict:to_list(State#state.queue_duration_dict)), - State#state{queue_duration_avg = DesiredDrainAvg}. + ets:foldl(PromptReduceDuraton, 0, State#state.queue_durations), + State. -- cgit v1.2.1 From 3fa441167d0501b7c2cbec4d68f84c1f898f5b9d Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Mon, 9 Nov 2009 09:57:28 -0500 Subject: Using vm_memory_manager. --- src/rabbit_memory_monitor.erl | 26 +++++++++++--------------- src/vm_memory_monitor.erl | 14 +++++++++++++- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index db4949e4..0629591a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -95,6 +95,10 @@ -define(DEFAULT_UPDATE_INTERVAL_MS, 2500). -define(TABLE_NAME, ?MODULE). -define(MAX_QUEUE_DURATION_ALLOWED, 60*60*24). % 1 day + +%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. +-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -type(state() :: #state{timer :: timer:tref(), @@ -136,26 +140,18 @@ push_queue_duration(Pid, QueueDuration) -> %%---------------------------------------------------------------------------- -get_user_memory_limit() -> - %% TODO: References to os_mon and rabbit_memsup_linux - %% should go away as bug 21457 removes it. - %% BTW: memsup:get_system_memory_data() doesn't work. - {state, TotalMemory, _Allocated} = rabbit_memsup_linux:update({state, 0,0}), - MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), - Limit = erlang:trunc(TotalMemory * MemoryHighWatermark), - %% no more than two gigs on 32 bits. - case (Limit > 2*1024*1024*1024) and (erlang:system_info(wordsize) == 4) of - true -> 2*1024*1024*1024; - false -> Limit +get_memory_limit() -> + RabbitMemoryLimit = case vm_memory_monitor:get_memory_limit() of + undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; + A -> A end. - init([]) -> %% We should never use more memory than user requested. As the memory %% manager doesn't really know how much memory queues are using, we shall - %% try to remain safe distance from real limit. - MemoryLimit = trunc(get_user_memory_limit() * 0.6), - rabbit_log:warning("Memory monitor limit: ~pMB~n", + %% try to remain safe distance from real throttle limit. + MemoryLimit = trunc(get_memory_limit() * 0.6), + rabbit_log:warning("Queues go to disk when memory is above: ~pMB~n", [erlang:trunc(MemoryLimit/1048576)]), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 6da47933..6da6704d 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -51,7 +51,8 @@ -export([update/0, get_total_memory/0, get_check_interval/0, set_check_interval/1, - get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]). + get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, + get_memory_limit/0]). -define(SERVER, ?MODULE). @@ -76,6 +77,7 @@ -spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)). +-spec(get_memory_limit/0 :: () -> (non_neg_integer() | undefined)). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -128,6 +130,9 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call(get_memory_limit, _From, State) -> + {reply, State#state.memory_limit, State}; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -168,6 +173,13 @@ get_vm_memory_high_watermark() -> set_vm_memory_high_watermark(Fraction) -> gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). +get_memory_limit() -> + try + gen_server2:call(?MODULE, get_memory_limit) + catch + exit:{noproc, _} -> undefined + end. + %%---------------------------------------------------------------------------- %% Server Internals %%---------------------------------------------------------------------------- -- cgit v1.2.1 From 68fea373e2da2d5930976421e0cdd39c8574476d Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Wed, 11 Nov 2009 06:25:21 -0500 Subject: QA: quoted atoms in specs, timer:now_diff --- src/rabbit_amqqueue_process.erl | 15 ++++----------- src/rabbit_memory_monitor.erl | 8 ++++---- src/vm_memory_monitor.erl | 6 +++--- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fad36f2c..9b97fe86 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,10 +99,6 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- -now_us() -> - {Megaseconds,Seconds,Microseconds} = erlang:now(), - Megaseconds * 1000000 * 1000000 + Seconds * 1000000 + Microseconds. - init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), rabbit_memory_monitor:register(self()), @@ -117,7 +113,7 @@ init(Q) -> active_consumers = queue:new(), blocked_consumers = queue:new(), drain_ratio = #ratio{ratio = 0.0, - t0 = now_us(), + t0 = now(), next_msg_id = 1} }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -825,10 +821,7 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec - QueueDuration = case MsgSec < 0.016 of %% less than 1 msg/1 minute - true -> infinity; - false -> queue:len(State#q.message_buffer) / MsgSec - end, + QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration( self(), QueueDuration), ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", @@ -854,8 +847,8 @@ calc_load(Load, Exp, N) -> Load*Exp + N*(1.0-Exp). update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) -> - T1 = now_us(), - Td = T1 - T0, + T1 = now(), + Td = timer:now_diff(T1, T0), MsgCount = MsgCount1 - MsgCount0, MsgUSec = MsgCount / Td, % msg/usec %% Td is in usec. We're interested in "load average" from last 30 seconds. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 0629591a..ff7684bd 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -108,11 +108,11 @@ memory_limit :: pos_integer(), memory_ratio :: float() }). --spec(start_link/0 :: () -> ignore | {error, _} | {ok, pid()}). --spec(register/1 :: (pid()) -> ok). --spec(push_queue_duration/2 :: (pid(), float() | infinity) -> ok). +-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). +-spec(register/1 :: (pid()) -> 'ok'). +-spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). --spec(init/1 :: ([]) -> {ok, state()}). +-spec(init/1 :: ([]) -> {'ok', state()}). -ifdef(debug). -spec(ftoa/1 :: (any()) -> string()). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 6da6704d..65d4a451 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -74,10 +74,10 @@ -ifdef(use_specs). --spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})). +-spec(start_link/1 :: (float()) -> ('ignore' | {'error', any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). --spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)). --spec(get_memory_limit/0 :: () -> (non_neg_integer() | undefined)). +-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). -- cgit v1.2.1 From 761d2515c086c7b3b221e120c28b5d22c81b4c3f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 11 Nov 2009 15:02:08 +0000 Subject: Cosmetics --- src/rabbit_memory_monitor.erl | 194 ++++++++++++++++++++---------------------- 1 file changed, 90 insertions(+), 104 deletions(-) diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index ff7684bd..4880b260 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -30,7 +30,7 @@ %% -%% This module handles the node-wide memory statistics. +%% This module handles the node-wide memory statistics. %% It receives statistics from all queues, counts the desired %% queue length (in seconds), and sends this information back to %% queues. @@ -46,8 +46,8 @@ %% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> %% %% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' -%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor -%% _always_ replies with a 'set_queue_duration' cast. This way, +%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor +%% _always_ replies with a 'set_queue_duration' cast. This way, %% we're pretty sure that the Queue is not hibernated. %% Monitor periodically recounts numbers ('X' on asciiart). If, during this %% update we notice that a queue was using too much memory, we send a message @@ -57,13 +57,13 @@ %% %% The main job of this module, is to make sure that all the queues have %% more or less the same number of seconds till become drained. -%% This average, seconds-till-queue-is-drained, is then multiplied by +%% This average, seconds-till-queue-is-drained, is then multiplied by %% the ratio of Total/Used memory. So, if we can 'afford' more memory to be %% used, we'll report greater number back to the queues. In the out of %% memory case, we are going to reduce the average drain-seconds. %% To acheive all this we need to accumulate the information from every %% queue, and count an average from that. -%% +%% %% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) %% memory_overcommit = allowed_memory / used_memory %% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit @@ -84,9 +84,9 @@ -export([register/1, push_queue_duration/2]). -record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, seconds_till_queue_is_empty) + queue_durations, %% ets, (qpid, last_reported, last_sent) queue_duration_sum, %% sum of all queue_durations - queue_duration_items,%% number of elements in sum + queue_duration_count,%% number of elements in sum memory_limit, %% how much memory we intend to use memory_ratio %% how much more memory we can use }). @@ -94,7 +94,7 @@ -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL_MS, 2500). -define(TABLE_NAME, ?MODULE). --define(MAX_QUEUE_DURATION_ALLOWED, 60*60*24). % 1 day +-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). @@ -104,7 +104,7 @@ -type(state() :: #state{timer :: timer:tref(), queue_durations :: tid(), queue_duration_sum :: float(), - queue_duration_items:: non_neg_integer(), + queue_duration_count:: non_neg_integer(), memory_limit :: pos_integer(), memory_ratio :: float() }). @@ -114,10 +114,6 @@ -spec(init/1 :: ([]) -> {'ok', state()}). --ifdef(debug). --spec(ftoa/1 :: (any()) -> string()). --endif. - -spec(internal_update/1 :: (state()) -> state()). -endif. @@ -136,68 +132,67 @@ register(Pid) -> push_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {push_queue_duration, Pid, QueueDuration}). + {push_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- get_memory_limit() -> - RabbitMemoryLimit = case vm_memory_monitor:get_memory_limit() of + case vm_memory_monitor:get_memory_limit() of undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; A -> A end. init([]) -> - %% We should never use more memory than user requested. As the memory + %% We should never use more memory than user requested. As the memory %% manager doesn't really know how much memory queues are using, we shall %% try to remain safe distance from real throttle limit. MemoryLimit = trunc(get_memory_limit() * 0.6), - rabbit_log:warning("Queues go to disk when memory is above: ~pMB~n", - [erlang:trunc(MemoryLimit/1048576)]), - - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, - ?SERVER, update, []), - {ok, #state{timer = TRef, - queue_durations = ets:new(?TABLE_NAME, [set, private]), - queue_duration_sum = 0.0, - queue_duration_items = 0, - memory_limit = MemoryLimit, - memory_ratio = 1.0}}. + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + ?SERVER, update, []), + {ok, #state{timer = TRef, + queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + memory_ratio = 1.0}}. get_avg_duration(#state{queue_duration_sum = Sum, - queue_duration_items = Items}) -> - case Items of + queue_duration_count = Count}) -> + case Count of 0 -> infinity; - _ -> Sum / Items + _ -> Sum / Count end. -get_desired_duration(State) -> +get_desired_duration(State = #state{memory_ratio = Ratio}) -> case get_avg_duration(State) of - infinity -> infinity; - AvgQueueDuration -> AvgQueueDuration * State#state.memory_ratio + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * Ratio end. -handle_call({push_queue_duration, Pid, QueueDuration0}, From, State) -> +handle_call({push_queue_duration, Pid, QueueDuration}, From, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations}) -> SendDuration = get_desired_duration(State), gen_server2:reply(From, SendDuration), - QueueDuration = case QueueDuration0 > ?MAX_QUEUE_DURATION_ALLOWED of - true -> infinity; - false -> QueueDuration0 - end, - - {Sum, Items} = {State#state.queue_duration_sum, - State#state.queue_duration_items}, - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), - {Sum1, Items1} = - case {PrevQueueDuration == infinity, QueueDuration == infinity} of - {true, true} -> {Sum, Items}; - {true, false} -> {Sum + QueueDuration, Items + 1}; - {false, true} -> {Sum - PrevQueueDuration, Items - 1}; - {false, false} -> {Sum - PrevQueueDuration + QueueDuration, Items} - end, - ets:insert(State#state.queue_durations, {Pid, QueueDuration, SendDuration}), + QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of + true -> infinity; + false -> QueueDuration + end, + + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + {Sum1, Count1} = + case {PrevQueueDuration, QueueDuration1} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration1, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count} + end, + true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration}), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_items = Items1}}; + queue_duration_count = Count1}}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -208,77 +203,68 @@ handle_cast(update, State) -> handle_cast({register, Pid}, State) -> _MRef = erlang:monitor(process, Pid), - ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), + true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {Sum, Items} = {State#state.queue_duration_sum, - State#state.queue_duration_items}, - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), - Sum1 = case PrevQueueDuration == infinity of - true -> Sum; - false -> Sum - PrevQueueDuration - end, - ets:delete(State#state.queue_durations, Pid), +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations}) -> + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + Sum1 = case PrevQueueDuration of + infinity -> Sum; + _ -> Sum - PrevQueueDuration + end, + true = ets:delete(State#state.queue_durations, Pid), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_items = Items-1}}; + queue_duration_count = Count-1}}; -handle_info(_Info, State) -> +handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> +code_change(_OldVsn, State, _Extra) -> {ok, State}. - set_queue_duration(Pid, QueueDuration) -> gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). --ifdef(debug). -ftoa(Float) -> - Str = case is_float(Float) of - true -> io_lib:format("~11.3f",[Float]); - false -> io_lib:format("~p", [Float]) - end, - lists:flatten(Str). --endif. - - -%% Update memory ratio. Count new DesiredQueueDuration. -%% Get queues that are using more than that, and send -%% pessimistic information back to them. -internal_update(State0) -> - %% available memory / used memory - MemoryRatio = State0#state.memory_limit / erlang:memory(total), - State = State0#state{memory_ratio = MemoryRatio}, - +internal_update(State = #state{memory_limit = Limit, + queue_durations = Durations}) -> DesiredDurationAvg = get_desired_duration(State), - - ?LOGDEBUG("Avg duration: real/desired:~s/~s Memory ratio:~s Queues:~p~n", - [ftoa(get_avg_duration(State)), ftoa(DesiredDurationAvg), - ftoa(MemoryRatio), - ets:foldl(fun (_, Acc) -> Acc+1 end, - 0, State#state.queue_durations)] ), - - %% If we have pessimistic information, we need to inform queues - %% to reduce it's memory usage when needed. - %% This sometimes wakes up queues from hibernation. Well, we don't care. - PromptReduceDuraton = fun ({Pid, QueueDuration, PrevSendDuration}, Acc) -> - case (PrevSendDuration > DesiredDurationAvg) and (QueueDuration > DesiredDurationAvg) of - true -> set_queue_duration(Pid, DesiredDurationAvg), - ets:insert(State#state.queue_durations, {Pid, QueueDuration, DesiredDurationAvg}), - Acc + 1; - _ -> Acc - end + %% available memory / used memory + MemoryRatio = Limit / erlang:memory(total), + State1 = State#state{memory_ratio = MemoryRatio}, + DesiredDurationAvg1 = get_desired_duration(State1), + + %% only inform queues immediately if the desired duration has + %% decreased + case DesiredDurationAvg1 < DesiredDurationAvg of + true -> + %% If we have pessimistic information, we need to inform + %% queues to reduce it's memory usage when needed. This + %% sometimes wakes up queues from hibernation. + true = ets:foldl( + fun ({Pid, QueueDuration, PrevSendDuration}, true) -> + case DesiredDurationAvg1 < + lists:min([PrevSendDuration, QueueDuration]) of + true -> + set_queue_duration(Pid, + DesiredDurationAvg1), + ets:insert(Durations, + {Pid, QueueDuration, + DesiredDurationAvg1}); + _ -> true + end + end, true, Durations); + false -> ok end, - ets:foldl(PromptReduceDuraton, 0, State#state.queue_durations), - State. - + State1. -- cgit v1.2.1 From 194e7bc09679687eebce6691b4387786030d548c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 11 Nov 2009 15:16:20 +0000 Subject: Unhappy with the idea that the desired duration is affected immediately by new queues and queues dying, but the memory ratio, which reflects the amount of memory erlang has used, is updated periodically. This mix of up-to-date and stale information in the calculation of the desired duration alarms me. Thus store the desired duration in the state, and always report that. That is then update periodically, thus is only ever calculated using current values. --- src/rabbit_amqqueue_process.erl | 4 +-- src/rabbit_memory_monitor.erl | 71 ++++++++++++++++++++--------------------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b97fe86..0bfa6df1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -822,8 +822,8 @@ handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds - DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration( - self(), QueueDuration), + DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( + self(), QueueDuration), ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), noreply(State#q{drain_ratio = DrainRatio1}); diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 4880b260..7bd03c9c 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -81,18 +81,19 @@ -export([update/0]). --export([register/1, push_queue_duration/2]). - --record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, last_reported, last_sent) - queue_duration_sum, %% sum of all queue_durations - queue_duration_count,%% number of elements in sum - memory_limit, %% how much memory we intend to use - memory_ratio %% how much more memory we can use +-export([register/1, report_queue_duration/2]). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets, (qpid, last_reported, last_sent) + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + memory_ratio, %% how much more memory we can use + desired_duration %% the desired queue duration }). -define(SERVER, ?MODULE). --define(DEFAULT_UPDATE_INTERVAL_MS, 2500). +-define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). -define(MAX_QUEUE_DURATION, 60*60*24). % 1 day @@ -106,11 +107,12 @@ queue_duration_sum :: float(), queue_duration_count:: non_neg_integer(), memory_limit :: pos_integer(), - memory_ratio :: float() }). + memory_ratio :: float(), + desired_duration :: float() | 'infinity' }). -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). -spec(register/1 :: (pid()) -> 'ok'). --spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). +-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). -spec(init/1 :: ([]) -> {'ok', state()}). @@ -130,9 +132,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). -push_queue_duration(Pid, QueueDuration) -> +report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {push_queue_duration, Pid, QueueDuration}). + {report_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- @@ -148,33 +150,21 @@ init([]) -> %% try to remain safe distance from real throttle limit. MemoryLimit = trunc(get_memory_limit() * 0.6), - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), {ok, #state{timer = TRef, queue_durations = ets:new(?TABLE_NAME, [set, private]), queue_duration_sum = 0.0, queue_duration_count = 0, memory_limit = MemoryLimit, - memory_ratio = 1.0}}. + memory_ratio = 1.0, + desired_duration = infinity}}. -get_avg_duration(#state{queue_duration_sum = Sum, - queue_duration_count = Count}) -> - case Count of - 0 -> infinity; - _ -> Sum / Count - end. - -get_desired_duration(State = #state{memory_ratio = Ratio}) -> - case get_avg_duration(State) of - infinity -> infinity; - AvgQueueDuration -> AvgQueueDuration * Ratio - end. - -handle_call({push_queue_duration, Pid, QueueDuration}, From, +handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations}) -> - SendDuration = get_desired_duration(State), + queue_durations = Durations, + desired_duration = SendDuration}) -> gen_server2:reply(From, SendDuration), QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of @@ -237,12 +227,22 @@ set_queue_duration(Pid, QueueDuration) -> gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). internal_update(State = #state{memory_limit = Limit, - queue_durations = Durations}) -> - DesiredDurationAvg = get_desired_duration(State), + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), - State1 = State#state{memory_ratio = MemoryRatio}, - DesiredDurationAvg1 = get_desired_duration(State1), + AvgDuration = case Count of + 0 -> infinity; + _ -> Sum / Count + end, + DesiredDurationAvg1 = case AvgDuration of + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * MemoryRatio + end, + State1 = State#state{memory_ratio = MemoryRatio, + desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased @@ -267,4 +267,3 @@ internal_update(State = #state{memory_limit = Limit, false -> ok end, State1. - -- cgit v1.2.1 From 390bbf151ae557a4197f8199204b21417f85cf94 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 11 Nov 2009 16:29:14 +0000 Subject: Various amounts of tidying, post testing and a large amount of cosmetics. --- src/rabbit_amqqueue.erl | 11 ++++- src/rabbit_amqqueue_process.erl | 48 +++++++++++----------- src/rabbit_memory_monitor.erl | 89 +++++++++++++++++++++-------------------- 3 files changed, 81 insertions(+), 67 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d7..4abfcd0b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,8 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, set_queue_duration/2, + send_memory_monitor_update/1]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -101,6 +102,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(send_memory_monitor_update/1 :: (pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -308,6 +311,12 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). +set_queue_duration(QPid, Duration) -> + gen_server2:pcast(QPid, 7, {set_queue_duration, Duration}). + +send_memory_monitor_update(QPid) -> + gen_server2:pcast(QPid, 7, send_memory_monitor_update). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0bfa6df1..2d264fc2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,6 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([send_memory_monitor_update/1]). -import(queue). -import(erlang). @@ -101,9 +100,11 @@ start_link(Q) -> %%---------------------------------------------------------------------------- init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - rabbit_memory_monitor:register(self()), + rabbit_memory_monitor:register(self(), {rabbit_amqqueue, set_queue_duration, + [self()]}), %% Beware. This breaks hibernation! - timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]), + timer:apply_interval(2500, rabbit_amqqueue, send_memory_monitor_update, + [self()]), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -821,26 +822,33 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec - QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds + QueueDuration = + case MsgSec == 0 of + true -> infinity; + false -> queue:len(State#q.message_buffer) / MsgSec % seconds + end, DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( self(), QueueDuration), - ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", - [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), + ?LOGDEBUG("TIMER ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + case DesiredQueueDuration of + infinity -> infinity; + _ -> MsgSec * DesiredQueueDuration + end]), noreply(State#q{drain_ratio = DrainRatio1}); handle_cast({set_queue_duration, DesiredQueueDuration}, State) -> DrainRatio = State#q.drain_ratio, - DesiredBufLength = case DesiredQueueDuration of - infinity -> infinity; - _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 - end, - %% Just to proove that something is happening. - ?LOGDEBUG("~p Queue length is~8p, should be ~p~n", - [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), - DesiredBufLength]), + DesiredBufLength = + case DesiredQueueDuration of + infinity -> infinity; + _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 + end, + ?LOGDEBUG("MAGIC ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + DesiredBufLength]), noreply(State). - %% Based on kernel load average, as descibed: %% http://www.teamquest.com/resources/gunther/display/5/ calc_load(Load, Exp, N) -> @@ -852,14 +860,8 @@ update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount = MsgCount1 - MsgCount0, MsgUSec = MsgCount / Td, % msg/usec %% Td is in usec. We're interested in "load average" from last 30 seconds. - Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), - - #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. - - -send_memory_monitor_update(Pid) -> - gen_server2:cast(Pid, send_memory_monitor_update). - + Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), + #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 7bd03c9c..cf184f3f 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -74,22 +74,19 @@ -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/0, update/0, register/2, report_queue_duration/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([update/0]). - --export([register/1, report_queue_duration/2]). - -record(state, {timer, %% 'internal_update' timer queue_durations, %% ets, (qpid, last_reported, last_sent) queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum memory_limit, %% how much memory we intend to use memory_ratio, %% how much more memory we can use - desired_duration %% the desired queue duration + desired_duration, %% the desired queue duration + callbacks %% a dict of qpid -> {M,F,A}s }). -define(SERVER, ?MODULE). @@ -101,24 +98,18 @@ -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). %%---------------------------------------------------------------------------- + -ifdef(use_specs). --type(state() :: #state{timer :: timer:tref(), - queue_durations :: tid(), - queue_duration_sum :: float(), - queue_duration_count:: non_neg_integer(), - memory_limit :: pos_integer(), - memory_ratio :: float(), - desired_duration :: float() | 'infinity' }). -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). --spec(register/1 :: (pid()) -> 'ok'). +-spec(update/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). --spec(init/1 :: ([]) -> {'ok', state()}). - --spec(internal_update/1 :: (state()) -> state()). -endif. +%%---------------------------------------------------------------------------- +%% Public API %%---------------------------------------------------------------------------- start_link() -> @@ -127,22 +118,17 @@ start_link() -> update() -> gen_server2:cast(?SERVER, update). -%%---------------------------------------------------------------------------- - -register(Pid) -> - gen_server2:cast(?SERVER, {register, Pid}). +register(Pid, MFA = {_M, _F, _A}) -> + gen_server2:cast(?SERVER, {register, Pid, MFA}). report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, {report_queue_duration, Pid, QueueDuration}). -%%---------------------------------------------------------------------------- -get_memory_limit() -> - case vm_memory_monitor:get_memory_limit() of - undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; - A -> A - end. +%%---------------------------------------------------------------------------- +%% Gen_server callbacks +%%---------------------------------------------------------------------------- init([]) -> %% We should never use more memory than user requested. As the memory @@ -158,7 +144,8 @@ init([]) -> queue_duration_count = 0, memory_limit = MemoryLimit, memory_ratio = 1.0, - desired_duration = infinity}}. + desired_duration = infinity, + callbacks = dict:new()}}. handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, @@ -187,23 +174,23 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, handle_call(_Request, _From, State) -> {noreply, State}. - handle_cast(update, State) -> {noreply, internal_update(State)}; -handle_cast({register, Pid}, State) -> +handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations, + callbacks = Callbacks}) -> _MRef = erlang:monitor(process, Pid), - true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), - {noreply, State}; + true = ets:insert(Durations, {Pid, infinity, infinity}), + {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; handle_cast(_Request, State) -> {noreply, State}. - handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations}) -> + queue_durations = Durations, + callbacks = Callbacks}) -> [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), Sum1 = case PrevQueueDuration of infinity -> Sum; @@ -211,26 +198,30 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, end, true = ets:delete(State#state.queue_durations, Pid), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_count = Count-1}}; + queue_duration_count = Count-1, + callbacks = dict:erase(Pid, Callbacks)}}; handle_info(_Info, State) -> {noreply, State}. - -terminate(_Reason, _State) -> +terminate(_Reason, #state{timer = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -set_queue_duration(Pid, QueueDuration) -> - gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- internal_update(State = #state{memory_limit = Limit, queue_durations = Durations, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, - queue_duration_count = Count}) -> + queue_duration_count = Count, + callbacks = Callbacks}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), AvgDuration = case Count of @@ -246,7 +237,8 @@ internal_update(State = #state{memory_limit = Limit, %% only inform queues immediately if the desired duration has %% decreased - case DesiredDurationAvg1 < DesiredDurationAvg of + case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity) + orelse (DesiredDurationAvg1 < DesiredDurationAvg) of true -> %% If we have pessimistic information, we need to inform %% queues to reduce it's memory usage when needed. This @@ -256,8 +248,9 @@ internal_update(State = #state{memory_limit = Limit, case DesiredDurationAvg1 < lists:min([PrevSendDuration, QueueDuration]) of true -> - set_queue_duration(Pid, - DesiredDurationAvg1), + ok = + set_queue_duration( + Pid, DesiredDurationAvg1, Callbacks), ets:insert(Durations, {Pid, QueueDuration, DesiredDurationAvg1}); @@ -267,3 +260,13 @@ internal_update(State = #state{memory_limit = Limit, false -> ok end, State1. + +get_memory_limit() -> + case vm_memory_monitor:get_memory_limit() of + undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; + A -> A + end. + +set_queue_duration(Pid, QueueDuration, Callbacks) -> + {M,F,A} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(M, F, A++[QueueDuration]). -- cgit v1.2.1