summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-16 13:12:37 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-16 13:12:37 +0000
commitff296ce8d6523280e25dbcb81b3fc82bdcaf7bb5 (patch)
tree14d3c2a15afce4c1c61e9f45832830393daf4311
parenta6586a1c333ce2499a787c07ec7cb2c8a2cfc180 (diff)
downloadrabbitmq-server-ff296ce8d6523280e25dbcb81b3fc82bdcaf7bb5.tar.gz
Use the correct connection pid for exclusivity (and error logging).
-rw-r--r--src/rabbit_channel.erl48
1 files changed, 25 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b27f6886..19b2eaf4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -362,14 +362,15 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid}) ->
+send_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ connection_pid = ConnectionPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ReaderPid, Channel, Reason]),
+ [ConnectionPid, Channel, Reason]),
%% something bad's happened: rollback_and_notify may not be 'ok'
{_Result, State1} = rollback_and_notify(State),
case CloseChannel of
@@ -650,13 +651,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- reader_pid = ReaderPid,
- next_tag = DeliveryTag}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ connection_pid = ConnectionPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnectionPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
@@ -690,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
+ _, State = #ch{connection_pid = ConnectionPid,
limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
@@ -707,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnectionPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), LimiterPid,
@@ -922,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid,
+ connection_pid = ConnectionPid,
queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
- true -> ReaderPid;
+ true -> ConnectionPid;
false -> none
end,
ActualNameBin = case QueueNameBin of
@@ -967,14 +968,14 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ _, State = #ch{virtual_host = VHostPath,
+ connection_pid = ConnectionPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnectionPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -982,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{connection_pid = ConnectionPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnectionPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -1018,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{connection_pid = ConnectionPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnectionPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -1142,8 +1143,8 @@ handle_consuming_queue_down(MRef, ConsumerTag,
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
- State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ State = #ch{virtual_host = VHostPath,
+ connection_pid = ConnectionPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -1159,7 +1160,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
key = ActualRoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
- try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ try rabbit_amqqueue:check_exclusive_access(Q,
+ ConnectionPid)
catch exit:Reason -> {error, Reason}
end;
(_X, #exchange{}) ->