summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-11 16:38:44 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-11 16:38:44 +0000
commit648f6336ad9c2c2c3cf1166980bdf258a0d81b07 (patch)
tree56673a17b42b783ef86171f1a8dffb8056d09620
parent88433c43c79f45b045656aa26e11650776d003d8 (diff)
parent390bbf151ae557a4197f8199204b21417f85cf94 (diff)
downloadrabbitmq-server-bug21742.tar.gz
merging in from defaultbug21742
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl66
-rw-r--r--src/rabbit_memory_monitor.erl272
-rw-r--r--src/vm_memory_monitor.erl18
5 files changed, 361 insertions, 9 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29407e4e..b17711b4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -154,7 +154,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
+ ok = start_child(rabbit_node_monitor),
+ ok = start_child(rabbit_memory_monitor)
end},
{"recovery",
fun () ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d7..4abfcd0b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,8 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, set_queue_duration/2,
+ send_memory_monitor_update/1]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -101,6 +102,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
+-spec(send_memory_monitor_update/1 :: (pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -308,6 +311,12 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+set_queue_duration(QPid, Duration) ->
+ gen_server2:pcast(QPid, 7, {set_queue_duration, Duration}).
+
+send_memory_monitor_update(QPid) ->
+ gen_server2:pcast(QPid, 7, send_memory_monitor_update).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6e88f259..2d264fc2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,12 +55,18 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ drain_ratio}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
+-record(ratio, {ratio, %% float. messages/microsecond_us
+ t0, %% previous timestamp (us)
+ next_msg_id %% previous next_msg_id
+ }).
+
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
@@ -92,9 +98,13 @@ start_link(Q) ->
gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
-
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ rabbit_memory_monitor:register(self(), {rabbit_amqqueue, set_queue_duration,
+ [self()]}),
+ %% Beware. This breaks hibernation!
+ timer:apply_interval(2500, rabbit_amqqueue, send_memory_monitor_update,
+ [self()]),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -102,7 +112,11 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, hibernate,
+ blocked_consumers = queue:new(),
+ drain_ratio = #ratio{ratio = 0.0,
+ t0 = now(),
+ next_msg_id = 1}
+ }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
@@ -803,7 +817,51 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast(send_memory_monitor_update, State) ->
+ DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id),
+ MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec
+ QueueDuration =
+ case MsgSec == 0 of
+ true -> infinity;
+ false -> queue:len(State#q.message_buffer) / MsgSec % seconds
+ end,
+ DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration(
+ self(), QueueDuration),
+ ?LOGDEBUG("TIMER ~p Queue length is ~8p, should be ~p~n",
+ [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer),
+ case DesiredQueueDuration of
+ infinity -> infinity;
+ _ -> MsgSec * DesiredQueueDuration
+ end]),
+ noreply(State#q{drain_ratio = DrainRatio1});
+
+handle_cast({set_queue_duration, DesiredQueueDuration}, State) ->
+ DrainRatio = State#q.drain_ratio,
+ DesiredBufLength =
+ case DesiredQueueDuration of
+ infinity -> infinity;
+ _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000
+ end,
+ ?LOGDEBUG("MAGIC ~p Queue length is ~8p, should be ~p~n",
+ [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer),
+ DesiredBufLength]),
+ noreply(State).
+
+%% Based on kernel load average, as descibed:
+%% http://www.teamquest.com/resources/gunther/display/5/
+calc_load(Load, Exp, N) ->
+ Load*Exp + N*(1.0-Exp).
+
+update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) ->
+ T1 = now(),
+ Td = timer:now_diff(T1, T0),
+ MsgCount = MsgCount1 - MsgCount0,
+ MsgUSec = MsgCount / Td, % msg/usec
+ %% Td is in usec. We're interested in "load average" from last 30 seconds.
+ Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec),
+ #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
new file mode 100644
index 00000000..cf184f3f
--- /dev/null
+++ b/src/rabbit_memory_monitor.erl
@@ -0,0 +1,272 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+
+%% This module handles the node-wide memory statistics.
+%% It receives statistics from all queues, counts the desired
+%% queue length (in seconds), and sends this information back to
+%% queues.
+%%
+%% Normally, messages are exchanged like that:
+%%
+%% (1) (2) (3)
+%% Timer | |
+%% v v
+%% Queue -----+--------+-----<***hibernated***>------------->
+%% | ^ | ^ ^
+%% v | v | |
+%% Monitor X--*-+--X---*-+--X------X----X-----X+----------->
+%%
+%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration'
+%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor
+%% _always_ replies with a 'set_queue_duration' cast. This way,
+%% we're pretty sure that the Queue is not hibernated.
+%% Monitor periodically recounts numbers ('X' on asciiart). If, during this
+%% update we notice that a queue was using too much memory, we send a message
+%% back. This will happen even if the queue is hibernated, as we really do want
+%% it to reduce its memory footprint.
+%%
+%%
+%% The main job of this module, is to make sure that all the queues have
+%% more or less the same number of seconds till become drained.
+%% This average, seconds-till-queue-is-drained, is then multiplied by
+%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be
+%% used, we'll report greater number back to the queues. In the out of
+%% memory case, we are going to reduce the average drain-seconds.
+%% To acheive all this we need to accumulate the information from every
+%% queue, and count an average from that.
+%%
+%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...])
+%% memory_overcommit = allowed_memory / used_memory
+%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit
+
+
+-module(rabbit_memory_monitor).
+-include("rabbit.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/0, update/0, register/2, report_queue_duration/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {timer, %% 'internal_update' timer
+ queue_durations, %% ets, (qpid, last_reported, last_sent)
+ queue_duration_sum, %% sum of all queue_durations
+ queue_duration_count, %% number of elements in sum
+ memory_limit, %% how much memory we intend to use
+ memory_ratio, %% how much more memory we can use
+ desired_duration, %% the desired queue duration
+ callbacks %% a dict of qpid -> {M,F,A}s
+ }).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_UPDATE_INTERVAL, 2500).
+-define(TABLE_NAME, ?MODULE).
+-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day
+
+%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
+-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
+-spec(update/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
+-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+update() ->
+ gen_server2:cast(?SERVER, update).
+
+register(Pid, MFA = {_M, _F, _A}) ->
+ gen_server2:cast(?SERVER, {register, Pid, MFA}).
+
+report_queue_duration(Pid, QueueDuration) ->
+ gen_server2:call(rabbit_memory_monitor,
+ {report_queue_duration, Pid, QueueDuration}).
+
+
+%%----------------------------------------------------------------------------
+%% Gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ %% We should never use more memory than user requested. As the memory
+ %% manager doesn't really know how much memory queues are using, we shall
+ %% try to remain safe distance from real throttle limit.
+ MemoryLimit = trunc(get_memory_limit() * 0.6),
+
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
+ ?SERVER, update, []),
+ {ok, #state{timer = TRef,
+ queue_durations = ets:new(?TABLE_NAME, [set, private]),
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ memory_ratio = 1.0,
+ desired_duration = infinity,
+ callbacks = dict:new()}}.
+
+handle_call({report_queue_duration, Pid, QueueDuration}, From,
+ State = #state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ desired_duration = SendDuration}) ->
+ gen_server2:reply(From, SendDuration),
+
+ QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of
+ true -> infinity;
+ false -> QueueDuration
+ end,
+
+ [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
+ {Sum1, Count1} =
+ case {PrevQueueDuration, QueueDuration1} of
+ {infinity, infinity} -> {Sum, Count};
+ {infinity, _} -> {Sum + QueueDuration1, Count + 1};
+ {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count}
+ end,
+ true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration}),
+ {noreply, State#state{queue_duration_sum = Sum1,
+ queue_duration_count = Count1}};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations,
+ callbacks = Callbacks}) ->
+ _MRef = erlang:monitor(process, Pid),
+ true = ets:insert(Durations, {Pid, infinity, infinity}),
+ {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ callbacks = Callbacks}) ->
+ [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
+ Sum1 = case PrevQueueDuration of
+ infinity -> Sum;
+ _ -> Sum - PrevQueueDuration
+ end,
+ true = ets:delete(State#state.queue_durations, Pid),
+ {noreply, State#state{queue_duration_sum = Sum1,
+ queue_duration_count = Count-1,
+ callbacks = dict:erase(Pid, Callbacks)}};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{timer = TRef}) ->
+ timer:cancel(TRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
+
+internal_update(State = #state{memory_limit = Limit,
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ callbacks = Callbacks}) ->
+ %% available memory / used memory
+ MemoryRatio = Limit / erlang:memory(total),
+ AvgDuration = case Count of
+ 0 -> infinity;
+ _ -> Sum / Count
+ end,
+ DesiredDurationAvg1 = case AvgDuration of
+ infinity -> infinity;
+ AvgQueueDuration -> AvgQueueDuration * MemoryRatio
+ end,
+ State1 = State#state{memory_ratio = MemoryRatio,
+ desired_duration = DesiredDurationAvg1},
+
+ %% only inform queues immediately if the desired duration has
+ %% decreased
+ case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity)
+ orelse (DesiredDurationAvg1 < DesiredDurationAvg) of
+ true ->
+ %% If we have pessimistic information, we need to inform
+ %% queues to reduce it's memory usage when needed. This
+ %% sometimes wakes up queues from hibernation.
+ true = ets:foldl(
+ fun ({Pid, QueueDuration, PrevSendDuration}, true) ->
+ case DesiredDurationAvg1 <
+ lists:min([PrevSendDuration, QueueDuration]) of
+ true ->
+ ok =
+ set_queue_duration(
+ Pid, DesiredDurationAvg1, Callbacks),
+ ets:insert(Durations,
+ {Pid, QueueDuration,
+ DesiredDurationAvg1});
+ _ -> true
+ end
+ end, true, Durations);
+ false -> ok
+ end,
+ State1.
+
+get_memory_limit() ->
+ case vm_memory_monitor:get_memory_limit() of
+ undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
+ A -> A
+ end.
+
+set_queue_duration(Pid, QueueDuration, Callbacks) ->
+ {M,F,A} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(M, F, A++[QueueDuration]).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 6da47933..65d4a451 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -51,7 +51,8 @@
-export([update/0, get_total_memory/0,
get_check_interval/0, set_check_interval/1,
- get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]).
+ get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
+ get_memory_limit/0]).
-define(SERVER, ?MODULE).
@@ -73,9 +74,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})).
+-spec(start_link/1 :: (float()) -> ('ignore' | {'error', any()} | {'ok', pid()})).
-spec(update/0 :: () -> 'ok').
--spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)).
+-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
@@ -128,6 +130,9 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+handle_call(get_memory_limit, _From, State) ->
+ {reply, State#state.memory_limit, State};
+
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -168,6 +173,13 @@ get_vm_memory_high_watermark() ->
set_vm_memory_high_watermark(Fraction) ->
gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
+get_memory_limit() ->
+ try
+ gen_server2:call(?MODULE, get_memory_limit)
+ catch
+ exit:{noproc, _} -> undefined
+ end.
+
%%----------------------------------------------------------------------------
%% Server Internals
%%----------------------------------------------------------------------------