summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-15 13:53:46 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-15 13:53:46 +0000
commitf1e03821a9b62e1f28ffddf0a4ee4fa31006461b (patch)
tree0a606ed31b283364afde44fa0f5f3f2e14890189
parent24e4abdbabcd2e47c219d1f5e554214517601f60 (diff)
downloadrabbitmq-server-f1e03821a9b62e1f28ffddf0a4ee4fa31006461b.tar.gz
refactor stats recording in channel
- specify stats type explicitly rather than inferring it from the structure of the key (yuck!) - s/maybe_incr_stats/incr_stats
-rw-r--r--src/rabbit_channel.erl64
1 files changed, 29 insertions, 35 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 53610e6d..9dbfbdea 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1236,14 +1236,14 @@ record_sent(ConsumerTag, AckRequired,
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> maybe_incr_stats([{QPid, 1}], redeliver, State);
+ true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
false -> ok
end,
rabbit_trace:tap_trace_out(Msg, TraceState),
@@ -1277,13 +1277,13 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end.
ack(Acked, State) ->
- QIncs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], Acked),
+ Incs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{queue_stats, QPid, length(MsgIds)} | L]
+ end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
- maybe_incr_stats(QIncs, ack, State).
+ incr_stats(Incs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_acks = [],
@@ -1346,15 +1346,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
- maybe_incr_stats([{XName, 1} |
- [{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
+ incr_stats([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QPid, XName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State2),
State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
+ incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1379,10 +1379,11 @@ send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
MsgSeqNos =
- lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
- maybe_incr_stats([{XName, 1}], confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1465,21 +1466,15 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-maybe_incr_stats(QXIncs, Measure, State) ->
+incr_stats(Incs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ fine -> [update_measures(Type, Key, Inc, Measure) ||
+ {Type, Key, Inc} <- Incs];
_ -> ok
end.
-incr_stats({_, _} = QX, Inc, Measure) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
-
-update_measures(Type, QX, Inc, Measure) ->
- Measures = case get({Type, QX}) of
+update_measures(Type, Key, Inc, Measure) ->
+ Measures = case get({Type, Key}) of
undefined -> [];
D -> D
end,
@@ -1487,8 +1482,7 @@ update_measures(Type, QX, Inc, Measure) ->
error -> 0;
{ok, C} -> C
end,
- put({Type, QX},
- orddict:store(Measure, Cur + Inc, Measures)).
+ put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
emit_stats(State) ->
emit_stats(State, []).