diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-22 14:37:45 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-22 14:37:45 +0100 |
commit | 9fb2f35c1afa4c911850edcb2adf6f8e55e7f666 (patch) | |
tree | 394291a9b8b48fcb37a339b970b52bbb05dd1774 /src/rabbit_channel.erl | |
parent | 8fd94c3ddba71bdb219ebae27f17b0423394dc6a (diff) | |
parent | 8b539291127aa38c5695820076ff407622fc4ff3 (diff) | |
download | rabbitmq-server-9fb2f35c1afa4c911850edcb2adf6f8e55e7f666.tar.gz |
stable to default
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 043ec7e3..d2f6719c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> handle_cast({force_event_refresh, Ref}, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), Ref), - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. @@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) -> send(Command, #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Command). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of {Channel, CloseMethod} -> - rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n", - [ConnPid, Channel, Reason]), + rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s'," + " user: '~s'), channel ~p:~n~p~n", + [ConnPid, ConnName, VHost, User#user.username, + Channel, Reason]), ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; {0, _} -> @@ -668,8 +673,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx = Tx, + channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -690,7 +698,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_in(Message, TraceState), + rabbit_trace:tap_in(Message, ConnName, ChannelNum, + Username, TraceState), Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), @@ -992,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), - rabbit_amqqueue:stat(Q) + maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, @@ -1048,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -1204,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E end. +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, @@ -1365,7 +1377,10 @@ record_sent(ConsumerTag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName, + channel = ChannelNum}) -> ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of {none, true} -> get; {none, false} -> get_no_ack; @@ -1376,7 +1391,7 @@ record_sent(ConsumerTag, AckRequired, true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); false -> ok end, - rabbit_trace:tap_out(Msg, TraceState), + rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, UAMQ); |