From ff296ce8d6523280e25dbcb81b3fc82bdcaf7bb5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 16 Mar 2011 13:12:37 +0000 Subject: Use the correct connection pid for exclusivity (and error logging). --- src/rabbit_channel.erl | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b27f6886..19b2eaf4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -362,14 +362,15 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid}) -> +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + connection_pid = ConnectionPid}) -> {CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [ReaderPid, Channel, Reason]), + [ConnectionPid, Channel, Reason]), %% something bad's happened: rollback_and_notify may not be 'ok' {_Result, State1} = rollback_and_notify(State), case CloseChannel of @@ -650,13 +651,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - reader_pid = ReaderPid, - next_tag = DeliveryTag}) -> + _, State = #ch{writer_pid = WriterPid, + connection_pid = ConnectionPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnectionPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, @@ -690,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid, + _, State = #ch{connection_pid = ConnectionPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of @@ -707,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnectionPid, fun (Q) -> {rabbit_amqqueue:basic_consume( Q, NoAck, self(), LimiterPid, @@ -922,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid, + connection_pid = ConnectionPid, queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of - true -> ReaderPid; + true -> ConnectionPid; false -> none end, ActualNameBin = case QueueNameBin of @@ -967,14 +968,14 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + _, State = #ch{virtual_host = VHostPath, + connection_pid = ConnectionPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), - ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnectionPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -982,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid}) -> + _, State = #ch{connection_pid = ConnectionPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnectionPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -1018,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid}) -> + _, State = #ch{connection_pid = ConnectionPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnectionPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -1142,8 +1143,8 @@ handle_consuming_queue_down(MRef, ConsumerTag, binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, - State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + State = #ch{virtual_host = VHostPath, + connection_pid = ConnectionPid }) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -1159,7 +1160,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, key = ActualRoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> - try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + try rabbit_amqqueue:check_exclusive_access(Q, + ConnectionPid) catch exit:Reason -> {error, Reason} end; (_X, #exchange{}) -> -- cgit v1.2.1