summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-14 17:09:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-14 17:09:09 +0000
commit1531a8bfefdfb1c0eb11b9e810134811c227a48b (patch)
tree3053e1468515305c60483633297dee36cf100e55 /src/rabbit_amqqueue_process.erl
parent8444163da6f023914044d6532cab70bff735273e (diff)
downloadrabbitmq-server-1531a8bfefdfb1c0eb11b9e810134811c227a48b.tar.gz
Experiment with the idea of "consumer utilisation". Still far too lumpy.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl75
1 files changed, 57 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d4cba944..69ae6a7e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
- active_consumers_last_empty,
+ consumer_bound_info,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -59,6 +59,11 @@
-record(consumer, {tag, ack_required, args}).
+-record(cb_info, {inactive,
+ active,
+ inactive_dur,
+ active_dur}).
+
%% These are held in our process dictionary
-record(cr, {ch_pid,
monitor_ref,
@@ -96,7 +101,7 @@
messages_unacknowledged,
messages,
consumers,
- consumer_bound,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
@@ -147,15 +152,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- active_consumers = priority_queue:new(),
- active_consumers_last_empty = erlang:now(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0},
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ active_consumers = priority_queue:new(),
+ consumer_bound_info = #cb_info{inactive = erlang:now(),
+ active = erlang:now(),
+ inactive_dur = 1,
+ active_dur = 1},
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -485,10 +493,19 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers}) ->
+ State = #q{active_consumers = ActiveConsumers,
+ consumer_bound_info = CBInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State#q{active_consumers_last_empty = erlang:now()}};
+ #cb_info{active = WhenActive,
+ inactive = WhenInactive} = CBInfo,
+ Now = erlang:now(),
+ CBInfo1 = case timer:now_diff(WhenInactive, WhenActive) > 0 of
+ true -> CBInfo;
+ false -> CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive),
+ inactive = Now}
+ end,
+ {false, State#q{consumer_bound_info = CBInfo1}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -728,12 +745,22 @@ unblock(State, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ #q{consumer_bound_info = CBInfo} = State,
+ #cb_info{inactive = WhenInactive,
+ active = WhenActive} = CBInfo,
+ State1 = case timer:now_diff(WhenActive, WhenInactive) > 0 of
+ true -> State;
+ false -> Now = erlang:now(),
+ CBInfo1 = CBInfo#cb_info{inactive_dur = timer:now_diff(Now, WhenInactive),
+ active = Now},
+ State#q{consumer_bound_info = CBInfo1}
+ end,
AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
+ State2 = State1#q{active_consumers = AC1},
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State2) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State1)
+ run_message_queue(State2)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1040,8 +1067,20 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(consumer_bound, #q{active_consumers_last_empty = Last}) ->
- timer:now_diff(erlang:now(), Last) < 1000000;
+i(consumer_utilisation,
+ #q{consumer_bound_info = #cb_info{active_dur = ActiveDur,
+ active = WhenActive,
+ inactive_dur = InactiveDur,
+ inactive = WhenInactive}}) ->
+ Now = erlang:now(),
+ case timer:now_diff(Now, WhenInactive) < 5000000 of
+ false -> case timer:now_diff(WhenInactive, WhenActive) > 0 of
+ true -> 0;
+ false -> 100
+ end;
+ true -> Ratio = ActiveDur / (InactiveDur + ActiveDur),
+ trunc(Ratio * 100)
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;