From f1e03821a9b62e1f28ffddf0a4ee4fa31006461b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Nov 2012 13:53:46 +0000 Subject: refactor stats recording in channel - specify stats type explicitly rather than inferring it from the structure of the key (yuck!) - s/maybe_incr_stats/incr_stats --- src/rabbit_channel.erl | 64 +++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 53610e6d..9dbfbdea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1236,14 +1236,14 @@ record_sent(ConsumerTag, AckRequired, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> maybe_incr_stats([{QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,13 +1277,13 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> end. ack(Acked, State) -> - QIncs = fold_per_queue( - fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{QPid, length(MsgIds)} | L] - end, [], Acked), + Incs = fold_per_queue( + fun (QPid, MsgIds, L) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + [{queue_stats, QPid, length(MsgIds)} | L] + end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), - maybe_incr_stats(QIncs, ack, State). + incr_stats(Incs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), uncommitted_acks = [], @@ -1346,15 +1346,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), - maybe_incr_stats([{XName, 1} | - [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), + incr_stats([{exchange_stats, XName, 1} | + [{queue_exchange_stats, {QPid, XName}, 1} || + QPid <- DeliveredQPids]], publish, State2), State2. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], + return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1379,10 +1379,11 @@ send_confirms(State = #ch{tx_status = none, confirmed = []}) -> State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> MsgSeqNos = - lists:foldl(fun ({MsgSeqNo, XName}, MSNs) -> - maybe_incr_stats([{XName, 1}], confirm, State), - [MsgSeqNo | MSNs] - end, [], lists:append(C)), + lists:foldl( + fun ({MsgSeqNo, XName}, MSNs) -> + incr_stats([{exchange_stats, XName, 1}], confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), send_confirms(MsgSeqNos, State#ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1465,21 +1466,15 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -maybe_incr_stats(QXIncs, Measure, State) -> +incr_stats(Incs, Measure, State) -> case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + fine -> [update_measures(Type, Key, Inc, Measure) || + {Type, Key, Inc} <- Incs]; _ -> ok end. -incr_stats({_, _} = QX, Inc, Measure) -> - update_measures(queue_exchange_stats, QX, Inc, Measure); -incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(X, Inc, Measure) -> - update_measures(exchange_stats, X, Inc, Measure). - -update_measures(Type, QX, Inc, Measure) -> - Measures = case get({Type, QX}) of +update_measures(Type, Key, Inc, Measure) -> + Measures = case get({Type, Key}) of undefined -> []; D -> D end, @@ -1487,8 +1482,7 @@ update_measures(Type, QX, Inc, Measure) -> error -> 0; {ok, C} -> C end, - put({Type, QX}, - orddict:store(Measure, Cur + Inc, Measures)). + put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)). emit_stats(State) -> emit_stats(State, []). -- cgit v1.2.1