diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-11-11 16:38:44 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-11-11 16:38:44 +0000 |
commit | 648f6336ad9c2c2c3cf1166980bdf258a0d81b07 (patch) | |
tree | 56673a17b42b783ef86171f1a8dffb8056d09620 | |
parent | 88433c43c79f45b045656aa26e11650776d003d8 (diff) | |
parent | 390bbf151ae557a4197f8199204b21417f85cf94 (diff) | |
download | rabbitmq-server-bug21742.tar.gz |
merging in from defaultbug21742
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 66 | ||||
-rw-r--r-- | src/rabbit_memory_monitor.erl | 272 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 18 |
5 files changed, 361 insertions, 9 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 29407e4e..b17711b4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -154,7 +154,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) + ok = start_child(rabbit_node_monitor), + ok = start_child(rabbit_memory_monitor) end}, {"recovery", fun () -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d7..4abfcd0b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,8 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, set_queue_duration/2, + send_memory_monitor_update/1]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -101,6 +102,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(send_memory_monitor_update/1 :: (pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -308,6 +311,12 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). +set_queue_duration(QPid, Duration) -> + gen_server2:pcast(QPid, 7, {set_queue_duration, Duration}). + +send_memory_monitor_update(QPid) -> + gen_server2:pcast(QPid, 7, send_memory_monitor_update). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6e88f259..2d264fc2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,12 +55,18 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + drain_ratio}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(ratio, {ratio, %% float. messages/microsecond_us + t0, %% previous timestamp (us) + next_msg_id %% previous next_msg_id + }). + %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, @@ -92,9 +98,13 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- - init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + rabbit_memory_monitor:register(self(), {rabbit_amqqueue, set_queue_duration, + [self()]}), + %% Beware. This breaks hibernation! + timer:apply_interval(2500, rabbit_amqqueue, send_memory_monitor_update, + [self()]), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -102,7 +112,11 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + drain_ratio = #ratio{ratio = 0.0, + t0 = now(), + next_msg_id = 1} + }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> @@ -803,7 +817,51 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast(send_memory_monitor_update, State) -> + DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), + MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec + QueueDuration = + case MsgSec == 0 of + true -> infinity; + false -> queue:len(State#q.message_buffer) / MsgSec % seconds + end, + DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( + self(), QueueDuration), + ?LOGDEBUG("TIMER ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + case DesiredQueueDuration of + infinity -> infinity; + _ -> MsgSec * DesiredQueueDuration + end]), + noreply(State#q{drain_ratio = DrainRatio1}); + +handle_cast({set_queue_duration, DesiredQueueDuration}, State) -> + DrainRatio = State#q.drain_ratio, + DesiredBufLength = + case DesiredQueueDuration of + infinity -> infinity; + _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 + end, + ?LOGDEBUG("MAGIC ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + DesiredBufLength]), + noreply(State). + +%% Based on kernel load average, as descibed: +%% http://www.teamquest.com/resources/gunther/display/5/ +calc_load(Load, Exp, N) -> + Load*Exp + N*(1.0-Exp). + +update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) -> + T1 = now(), + Td = timer:now_diff(T1, T0), + MsgCount = MsgCount1 - MsgCount0, + MsgUSec = MsgCount / Td, % msg/usec + %% Td is in usec. We're interested in "load average" from last 30 seconds. + Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), + #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl new file mode 100644 index 00000000..cf184f3f --- /dev/null +++ b/src/rabbit_memory_monitor.erl @@ -0,0 +1,272 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + + +%% This module handles the node-wide memory statistics. +%% It receives statistics from all queues, counts the desired +%% queue length (in seconds), and sends this information back to +%% queues. +%% +%% Normally, messages are exchanged like that: +%% +%% (1) (2) (3) +%% Timer | | +%% v v +%% Queue -----+--------+-----<***hibernated***>-------------> +%% | ^ | ^ ^ +%% v | v | | +%% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> +%% +%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' +%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor +%% _always_ replies with a 'set_queue_duration' cast. This way, +%% we're pretty sure that the Queue is not hibernated. +%% Monitor periodically recounts numbers ('X' on asciiart). If, during this +%% update we notice that a queue was using too much memory, we send a message +%% back. This will happen even if the queue is hibernated, as we really do want +%% it to reduce its memory footprint. +%% +%% +%% The main job of this module, is to make sure that all the queues have +%% more or less the same number of seconds till become drained. +%% This average, seconds-till-queue-is-drained, is then multiplied by +%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be +%% used, we'll report greater number back to the queues. In the out of +%% memory case, we are going to reduce the average drain-seconds. +%% To acheive all this we need to accumulate the information from every +%% queue, and count an average from that. +%% +%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) +%% memory_overcommit = allowed_memory / used_memory +%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit + + +-module(rabbit_memory_monitor). +-include("rabbit.hrl"). + +-behaviour(gen_server2). + +-export([start_link/0, update/0, register/2, report_queue_duration/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets, (qpid, last_reported, last_sent) + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + memory_ratio, %% how much more memory we can use + desired_duration, %% the desired queue duration + callbacks %% a dict of qpid -> {M,F,A}s + }). + +-define(SERVER, ?MODULE). +-define(DEFAULT_UPDATE_INTERVAL, 2500). +-define(TABLE_NAME, ?MODULE). +-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day + +%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. +-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). +-spec(update/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). +-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +update() -> + gen_server2:cast(?SERVER, update). + +register(Pid, MFA = {_M, _F, _A}) -> + gen_server2:cast(?SERVER, {register, Pid, MFA}). + +report_queue_duration(Pid, QueueDuration) -> + gen_server2:call(rabbit_memory_monitor, + {report_queue_duration, Pid, QueueDuration}). + + +%%---------------------------------------------------------------------------- +%% Gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + %% We should never use more memory than user requested. As the memory + %% manager doesn't really know how much memory queues are using, we shall + %% try to remain safe distance from real throttle limit. + MemoryLimit = trunc(get_memory_limit() * 0.6), + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, + ?SERVER, update, []), + {ok, #state{timer = TRef, + queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + memory_ratio = 1.0, + desired_duration = infinity, + callbacks = dict:new()}}. + +handle_call({report_queue_duration, Pid, QueueDuration}, From, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + desired_duration = SendDuration}) -> + gen_server2:reply(From, SendDuration), + + QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of + true -> infinity; + false -> QueueDuration + end, + + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + {Sum1, Count1} = + case {PrevQueueDuration, QueueDuration1} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration1, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count} + end, + true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration}), + {noreply, State#state{queue_duration_sum = Sum1, + queue_duration_count = Count1}}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations, + callbacks = Callbacks}) -> + _MRef = erlang:monitor(process, Pid), + true = ets:insert(Durations, {Pid, infinity, infinity}), + {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + callbacks = Callbacks}) -> + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + Sum1 = case PrevQueueDuration of + infinity -> Sum; + _ -> Sum - PrevQueueDuration + end, + true = ets:delete(State#state.queue_durations, Pid), + {noreply, State#state{queue_duration_sum = Sum1, + queue_duration_count = Count-1, + callbacks = dict:erase(Pid, Callbacks)}}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{timer = TRef}) -> + timer:cancel(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +internal_update(State = #state{memory_limit = Limit, + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count, + callbacks = Callbacks}) -> + %% available memory / used memory + MemoryRatio = Limit / erlang:memory(total), + AvgDuration = case Count of + 0 -> infinity; + _ -> Sum / Count + end, + DesiredDurationAvg1 = case AvgDuration of + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * MemoryRatio + end, + State1 = State#state{memory_ratio = MemoryRatio, + desired_duration = DesiredDurationAvg1}, + + %% only inform queues immediately if the desired duration has + %% decreased + case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity) + orelse (DesiredDurationAvg1 < DesiredDurationAvg) of + true -> + %% If we have pessimistic information, we need to inform + %% queues to reduce it's memory usage when needed. This + %% sometimes wakes up queues from hibernation. + true = ets:foldl( + fun ({Pid, QueueDuration, PrevSendDuration}, true) -> + case DesiredDurationAvg1 < + lists:min([PrevSendDuration, QueueDuration]) of + true -> + ok = + set_queue_duration( + Pid, DesiredDurationAvg1, Callbacks), + ets:insert(Durations, + {Pid, QueueDuration, + DesiredDurationAvg1}); + _ -> true + end + end, true, Durations); + false -> ok + end, + State1. + +get_memory_limit() -> + case vm_memory_monitor:get_memory_limit() of + undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; + A -> A + end. + +set_queue_duration(Pid, QueueDuration, Callbacks) -> + {M,F,A} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(M, F, A++[QueueDuration]). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 6da47933..65d4a451 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -51,7 +51,8 @@ -export([update/0, get_total_memory/0, get_check_interval/0, set_check_interval/1, - get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]). + get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, + get_memory_limit/0]). -define(SERVER, ?MODULE). @@ -73,9 +74,10 @@ -ifdef(use_specs). --spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})). +-spec(start_link/1 :: (float()) -> ('ignore' | {'error', any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). --spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)). +-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -128,6 +130,9 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call(get_memory_limit, _From, State) -> + {reply, State#state.memory_limit, State}; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -168,6 +173,13 @@ get_vm_memory_high_watermark() -> set_vm_memory_high_watermark(Fraction) -> gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). +get_memory_limit() -> + try + gen_server2:call(?MODULE, get_memory_limit) + catch + exit:{noproc, _} -> undefined + end. + %%---------------------------------------------------------------------------- %% Server Internals %%---------------------------------------------------------------------------- |