summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl124
1 files changed, 67 insertions, 57 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb29580f..ec1816ca 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -33,11 +33,12 @@
-include("rabbit.hrl").
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
--export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([list_for_exchange/1, list_for_destination/1,
+ list_for_exchange_and_destination/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_exchange/1, remove_for_exchange/1,
- remove_for_queue/1, remove_transient_for_queue/1]).
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -66,9 +67,11 @@
bind_res() | rabbit_types:error('binding_not_found')).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
--spec(list_for_exchange_and_queue/2 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_destination/1 ::
+ (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()).
+-spec(list_for_exchange_and_destination/2 ::
+ (rabbit_exchange:name(),
+ rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
@@ -78,16 +81,16 @@
-> [[rabbit_types:info()]]).
-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(remove_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(remove_transient_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_for_destination/1 ::
+ (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())).
+-spec(remove_transient_for_destination/1 ::
+ (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+-define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]).
recover() ->
rabbit_misc:table_fold(
@@ -101,26 +104,24 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end).
-add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+add(Binding) -> add(Binding, fun (_X, _D) -> ok end).
-remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (X, D, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- case InnerFun(X, Q) of
+ case InnerFun(X, D) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
+ [] -> ok = sync_binding(
+ B, are_endpoints_durable(X, D),
fun mnesia:write/3),
{new, X, B};
[_] -> {existing, X, B}
@@ -141,16 +142,14 @@ add(Binding, InnerFun) ->
remove(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (X, D, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- [_] -> case InnerFun(X, Q) of
+ [_] -> case InnerFun(X, D) of
ok ->
- Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
ok = sync_binding(
- B, Durable,
+ B, are_endpoints_durable(X, D),
fun mnesia:delete_object/3),
Deleted =
rabbit_exchange:maybe_auto_delete(X),
@@ -175,7 +174,7 @@ remove(Binding, InnerFun) ->
list(VHostPath) ->
Route = #route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
- queue_name = rabbit_misc:r(VHostPath, queue),
+ destination = rabbit_misc:r(VHostPath, '_'),
_ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
@@ -186,15 +185,15 @@ list_for_exchange(ExchangeName) ->
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_queue(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+list_for_destination(DestinationName) ->
+ Route = #route{binding = #binding{destination = DestinationName, _ = '_'}},
[reverse_binding(B) || #reverse_route{reverse_binding = B} <-
mnesia:dirty_match_object(rabbit_reverse_route,
reverse_route(Route))].
-list_for_exchange_and_queue(ExchangeName, QueueName) ->
+list_for_exchange_and_destination(ExchangeName, DestinationName) ->
Route = #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = DestinationName,
_ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
@@ -208,10 +207,10 @@ map(VHostPath, F) ->
infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
-i(exchange_name, #binding{exchange_name = XName}) -> XName;
-i(queue_name, #binding{queue_name = QName}) -> QName;
-i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
-i(arguments, #binding{args = Arguments}) -> Arguments;
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(destination, #binding{destination = Destination}) -> Destination;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
info(B = #binding{}) -> infos(?INFO_KEYS, B).
@@ -241,22 +240,28 @@ remove_for_exchange(ExchangeName) ->
_ = '_'}},
write)].
-remove_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_forward_routes/1).
+remove_for_destination(DestinationName) ->
+ remove_for_destination(DestinationName, fun delete_forward_routes/1).
-remove_transient_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(DestinationName) ->
+ remove_for_destination(DestinationName,
+ fun delete_transient_forward_routes/1).
%%----------------------------------------------------------------------------
+are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) ->
+ A andalso B;
+are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) ->
+ A andalso B.
+
binding_action(Binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = Destination,
args = Arguments}, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
+ call_with_exchange_and_destination(
+ ExchangeName, Destination,
+ fun (X, D) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
- Fun(X, Q, Binding#binding{args = SortedArgs})
+ Fun(X, D, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -270,15 +275,19 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+call_with_exchange_and_destination(Exchange, Destination, Fun) ->
+ DestTable = case Destination#resource.kind of
+ queue -> rabbit_queue;
+ exchange -> rabbit_exchange
+ end,
rabbit_misc:execute_mnesia_transaction(
fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
+ mnesia:read({DestTable, Destination})} of
+ {[X], [D]} -> Fun(X, D);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, destination_not_found};
+ {[ ], [ ]} -> {error, exchange_and_destination_not_found}
+ end
end).
%% Used with atoms from records; e.g., the type is expected to exist.
@@ -293,7 +302,7 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_queue(QueueName, FwdDeleteFun) ->
+remove_for_destination(DestinationName, FwdDeleteFun) ->
DeletedBindings =
[begin
Route = reverse_route(ReverseRoute),
@@ -304,9 +313,10 @@ remove_for_queue(QueueName, FwdDeleteFun) ->
end || ReverseRoute
<- mnesia:match_object(
rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
+ reverse_route(#route{
+ binding = #binding{
+ destination = DestinationName,
+ _ = '_'}}),
write)],
Grouped = group_bindings_and_auto_delete(
lists:keysort(#binding.exchange_name, DeletedBindings), []),
@@ -360,19 +370,19 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}) ->
#binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args};
reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}) ->
#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}.