diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 17:09:09 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 17:09:09 +0000 |
commit | 1531a8bfefdfb1c0eb11b9e810134811c227a48b (patch) | |
tree | 3053e1468515305c60483633297dee36cf100e55 /src/rabbit_amqqueue_process.erl | |
parent | 8444163da6f023914044d6532cab70bff735273e (diff) | |
download | rabbitmq-server-1531a8bfefdfb1c0eb11b9e810134811c227a48b.tar.gz |
Experiment with the idea of "consumer utilisation". Still far too lumpy.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 75 |
1 files changed, 57 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d4cba944..69ae6a7e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ backing_queue, backing_queue_state, active_consumers, - active_consumers_last_empty, + consumer_bound_info, expires, sync_timer_ref, rate_timer_ref, @@ -59,6 +59,11 @@ -record(consumer, {tag, ack_required, args}). +-record(cb_info, {inactive, + active, + inactive_dur, + active_dur}). + %% These are held in our process dictionary -record(cr, {ch_pid, monitor_ref, @@ -96,7 +101,7 @@ messages_unacknowledged, messages, consumers, - consumer_bound, + consumer_utilisation, memory, slave_pids, synchronised_slave_pids, @@ -147,15 +152,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - active_consumers = priority_queue:new(), - active_consumers_last_empty = erlang:now(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0}, + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + active_consumers = priority_queue:new(), + consumer_bound_info = #cb_info{inactive = erlang:now(), + active = erlang:now(), + inactive_dur = 1, + active_dur = 1}, + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -485,10 +493,19 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers}) -> + State = #q{active_consumers = ActiveConsumers, + consumer_bound_info = CBInfo}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State#q{active_consumers_last_empty = erlang:now()}}; + #cb_info{active = WhenActive, + inactive = WhenInactive} = CBInfo, + Now = erlang:now(), + CBInfo1 = case timer:now_diff(WhenInactive, WhenActive) > 0 of + true -> CBInfo; + false -> CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive), + inactive = Now} + end, + {false, State#q{consumer_bound_info = CBInfo1}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -728,12 +745,22 @@ unblock(State, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), + #q{consumer_bound_info = CBInfo} = State, + #cb_info{inactive = WhenInactive, + active = WhenActive} = CBInfo, + State1 = case timer:now_diff(WhenActive, WhenInactive) > 0 of + true -> State; + false -> Now = erlang:now(), + CBInfo1 = CBInfo#cb_info{inactive_dur = timer:now_diff(Now, WhenInactive), + active = Now}, + State#q{consumer_bound_info = CBInfo1} + end, AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - State1 = State#q{active_consumers = AC1}, + State2 = State1#q{active_consumers = AC1}, [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State1) || + consumer_unblocked, [{consumer_tag, CTag}], State2) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State1) + run_message_queue(State2) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1040,8 +1067,20 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); -i(consumer_bound, #q{active_consumers_last_empty = Last}) -> - timer:now_diff(erlang:now(), Last) < 1000000; +i(consumer_utilisation, + #q{consumer_bound_info = #cb_info{active_dur = ActiveDur, + active = WhenActive, + inactive_dur = InactiveDur, + inactive = WhenInactive}}) -> + Now = erlang:now(), + case timer:now_diff(Now, WhenInactive) < 5000000 of + false -> case timer:now_diff(WhenInactive, WhenActive) > 0 of + true -> 0; + false -> 100 + end; + true -> Ratio = ActiveDur / (InactiveDur + ActiveDur), + trunc(Ratio * 100) + end; i(memory, _) -> {memory, M} = process_info(self(), memory), M; |