summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-10-04 15:45:56 +0100
committerBen Hood <0x6e6562@gmail.com>2008-10-04 15:45:56 +0100
commit10e1445a78fc50f8a8b0228c4be40344a28a0bd9 (patch)
treed01d905bb534f0ef6633c6cd2b39f6cfffd5e290
parent6d27482ec87bbbc8d249e1754f219d388dec39a1 (diff)
parent6b67dfc96b6c9c79b1c95674854925603108fcb3 (diff)
downloadrabbitmq-server-bug19490.tar.gz
Merged 19250 into 19490bug19490
-rw-r--r--src/rabbit_channel.erl70
1 files changed, 47 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5cc07aed..4cce60e5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
nowait = NoWait,
- arguments = Arguments},
- _, State = #ch{ virtual_host = VHostPath }) ->
- %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges - including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case rabbit_amqqueue:add_binding(QueueName, ExchangeName,
- ActualRoutingKey, Arguments) of
- {error, queue_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
- {ok, _BindingCount} ->
- return_ok(State, NoWait, #'queue.bind_ok'{})
- end;
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
+ NoWait, State);
+
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -636,6 +625,41 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ReturnMethod, State) ->
+ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ReturnMethod, false, State).
+
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ %% FIXME: connection exception (!) on failure??
+ %% (see rule named "failure" in spec-XML)
+ %% FIXME: don't allow binding to internal exchanges -
+ %% including the one named "" !
+ QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
+ State),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of
+ {error, queue_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
+ {error, exchange_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ {error, binding_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no binding ~s between exhange ~s and queue ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
+ {error, durability_settings_incompatible} ->
+ rabbit_misc:protocol_error(
+ not_allowed, "durability settings of ~s incompatible with ~s",
+ [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
+ {ok, _BindingCount} ->
+ return_ok(State, NoWait, ReturnMethod)
+ end.
+
publish(Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,