summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl346
1 files changed, 204 insertions, 142 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 19150fa9..1af213c4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -33,29 +33,35 @@
-include("rabbit.hrl").
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
--export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([list_for_source/1, list_for_destination/1,
+ list_for_source_and_destination/2]).
+-export([new_deletions/0, combine_deletions/2, add_deletion/3,
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
--export([has_for_exchange/1, remove_for_exchange/1,
- remove_for_queue/1, remove_transient_for_queue/1]).
+-export([has_for_source/1, remove_for_source/1,
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([key/0]).
+-export_type([key/0, deletions/0]).
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
+-type(bind_errors() :: rabbit_types:error('source_not_found' |
+ 'destination_not_found' |
+ 'source_and_destination_not_found')).
-type(bind_res() :: 'ok' | bind_errors()).
-type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
+ fun((rabbit_types:exchange(),
+ rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
+-opaque(deletions() :: dict:dictionary()).
+
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
@@ -65,10 +71,13 @@
-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
bind_res() | rabbit_types:error('binding_not_found')).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
--spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
--spec(list_for_exchange_and_queue/2 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_source/1 ::
+ (rabbit_types:binding_source()) -> bindings()).
+-spec(list_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> bindings()).
+-spec(list_for_source_and_destination/2 ::
+ (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
+ bindings()).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
@@ -76,18 +85,27 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
-> [[rabbit_types:info()]]).
--spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
--spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(remove_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(remove_transient_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
+-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
+-spec(remove_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> deletions()).
+-spec(remove_transient_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> deletions()).
+-spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
+-spec(add_deletion/3 :: (rabbit_exchange:name(),
+ {'undefined' | rabbit_types:binding_source(),
+ 'deleted' | 'not_deleted',
+ deletions()}, deletions()) -> deletions()).
+-spec(new_deletions/0 :: () -> deletions()).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+-define(INFO_KEYS, [source_name, source_kind,
+ destination_name, destination_kind,
+ routing_key, arguments]).
recover() ->
rabbit_misc:table_fold(
@@ -101,36 +119,34 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
-add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
-remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- case InnerFun(X, Q) of
+ case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
+ [] -> ok = sync_binding(
+ B, all_durable([Src, Dst]),
fun mnesia:write/3),
- {new, X, B};
- [_] -> {existing, X, B}
+ {new, Src, B};
+ [_] -> {existing, Src, B}
end;
{error, _} = E ->
E
end
end) of
- {new, X = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(X, B),
+ {new, Src = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Src, B),
rabbit_event:notify(binding_created, info(B));
{existing, _, _} ->
ok;
@@ -141,61 +157,55 @@ add(Binding, InnerFun) ->
remove(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- [_] -> case InnerFun(X, Q) of
- ok ->
- Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
- fun mnesia:delete_object/3),
- Deleted =
- rabbit_exchange:maybe_auto_delete(X),
- {{Deleted, X}, B};
- {error, _} = E ->
- E
- end
+ [] ->
+ {error, binding_not_found};
+ [_] ->
+ case InnerFun(Src, Dst) of
+ ok ->
+ ok = sync_binding(
+ B, all_durable([Src, Dst]),
+ fun mnesia:delete_object/3),
+ {ok,
+ maybe_auto_delete(B#binding.source,
+ [B], new_deletions())};
+ {error, _} = E ->
+ E
+ end
end
end) of
{error, _} = Err ->
Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> ok = Module:delete(X, [B]);
- not_deleted -> ok = Module:remove_bindings(X, [B])
- end,
- rabbit_event:notify(binding_deleted, info(B)),
- ok
+ {ok, Deletions} ->
+ ok = process_deletions(Deletions)
end.
list(VHostPath) ->
- Route = #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- queue_name = rabbit_misc:r(VHostPath, queue),
- _ = '_'},
+ VHostResource = rabbit_misc:r(VHostPath, '_'),
+ Route = #route{binding = #binding{source = VHostResource,
+ destination = VHostResource,
+ _ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_exchange(XName) ->
- Route = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+list_for_source(SrcName) ->
+ Route = #route{binding = #binding{source = SrcName, _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_queue(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+list_for_destination(DstName) ->
+ Route = #route{binding = #binding{destination = DstName, _ = '_'}},
[reverse_binding(B) || #reverse_route{reverse_binding = B} <-
mnesia:dirty_match_object(rabbit_reverse_route,
reverse_route(Route))].
-list_for_exchange_and_queue(XName, QueueName) ->
- Route = #route{binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- _ = '_'}},
+list_for_source_and_destination(SrcName, DstName) ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
@@ -208,10 +218,12 @@ map(VHostPath, F) ->
infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
-i(exchange_name, #binding{exchange_name = XName}) -> XName;
-i(queue_name, #binding{queue_name = QName}) -> QName;
-i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
-i(arguments, #binding{args = Arguments}) -> Arguments;
+i(source_name, #binding{source = SrcName}) -> SrcName#resource.name;
+i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind;
+i(destination_name, #binding{destination = DstName}) -> DstName#resource.name;
+i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
info(B = #binding{}) -> infos(?INFO_KEYS, B).
@@ -222,14 +234,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
-has_for_exchange(XName) ->
- Match = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+has_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
-remove_for_exchange(XName) ->
+remove_for_source(SrcName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
@@ -237,26 +249,31 @@ remove_for_exchange(XName) ->
Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
- #route{binding = #binding{exchange_name = XName,
- _ = '_'}},
+ #route{binding = #binding{source = SrcName,
+ _ = '_'}},
write)].
-remove_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_forward_routes/1).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_forward_routes/1).
-remove_transient_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_transient_forward_routes/1).
%%----------------------------------------------------------------------------
-binding_action(Binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- args = Arguments}, Fun) ->
- call_with_exchange_and_queue(
- XName, QueueName,
- fun (X, Q) ->
+all_durable(Resources) ->
+ lists:all(fun (#exchange{durable = D}) -> D;
+ (#amqqueue{durable = D}) -> D
+ end, Resources).
+
+binding_action(Binding = #binding{source = SrcName,
+ destination = DstName,
+ args = Arguments}, Fun) ->
+ call_with_source_and_destination(
+ SrcName, DstName,
+ fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
- Fun(X, Q, Binding#binding{args = SortedArgs})
+ Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -270,17 +287,22 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_exchange_and_queue(XName, QueueName, Fun) ->
+call_with_source_and_destination(SrcName, DstName, Fun) ->
+ SrcTable = table_for_resource(SrcName),
+ DstTable = table_for_resource(DstName),
rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, XName}),
- mnesia:read({rabbit_queue, QueueName})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
+ fun () -> case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> {error, source_not_found};
+ {[_], [] } -> {error, destination_not_found};
+ {[], [] } -> {error, source_and_destination_not_found}
+ end
end).
+table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
+table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
{ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
@@ -293,8 +315,8 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_queue(QueueName, FwdDeleteFun) ->
- DeletedBindings =
+remove_for_destination(DstName, FwdDeleteFun) ->
+ Bindings =
[begin
Route = reverse_route(ReverseRoute),
ok = FwdDeleteFun(Route),
@@ -304,40 +326,41 @@ remove_for_queue(QueueName, FwdDeleteFun) ->
end || ReverseRoute
<- mnesia:match_object(
rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
+ reverse_route(#route{
+ binding = #binding{
+ destination = DstName,
+ _ = '_'}}),
write)],
- Grouped = group_bindings_and_auto_delete(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Grouped)
- end.
+ group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
+ lists:keysort(#binding.source, Bindings)).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_and_auto_delete([], Acc) ->
+group_bindings_fold(_Fun, Acc, []) ->
Acc;
-group_bindings_and_auto_delete(
- [B = #binding{exchange_name = XName} | Bs], Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B], Acc).
-
-group_bindings_and_auto_delete(
- XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc);
-group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) ->
- %% either Removed is [], or its head has a non-matching XName
- [X] = mnesia:read({rabbit_exchange, XName}),
- NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
- group_bindings_and_auto_delete(Removed, NewAcc).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
+
+group_bindings_fold(
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
+ %% Either Removed is [], or its head has a non-matching SrcName.
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+
+maybe_auto_delete(XName, Bindings, Deletions) ->
+ case rabbit_exchange:lookup(XName) of
+ {error, not_found} ->
+ add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
+ {ok, X} ->
+ add_deletion(XName, {X, not_deleted, Bindings},
+ case rabbit_exchange:maybe_auto_delete(X) of
+ not_deleted -> Deletions;
+ {deleted, Deletions1} -> combine_deletions(
+ Deletions, Deletions1)
+ end)
+ end.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -358,20 +381,59 @@ reverse_route(#route{binding = Binding}) ->
reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
-reverse_binding(#reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}.
+reverse_binding(#reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}.
+
+%% ----------------------------------------------------------------------------
+%% Binding / exchange deletion abstraction API
+%% ----------------------------------------------------------------------------
+
+anything_but( NotThis, NotThis, NotThis) -> NotThis;
+anything_but( NotThis, NotThis, This) -> This;
+anything_but( NotThis, This, NotThis) -> This;
+anything_but(_NotThis, This, This) -> This.
+
+new_deletions() -> dict:new().
+
+add_deletion(XName, Entry, Deletions) ->
+ dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
+ Entry, Deletions).
+
+combine_deletions(Deletions1, Deletions2) ->
+ dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
+ Deletions1, Deletions2).
+
+merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
+ {anything_but(undefined, X1, X2),
+ anything_but(not_deleted, Deleted1, Deleted2),
+ [Bindings1 | Bindings2]}.
+
+process_deletions(Deletions) ->
+ dict:fold(
+ fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify(binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ TypeModule = type_to_module(Type),
+ case Deleted of
+ not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
+ deleted -> rabbit_event:notify(exchange_deleted,
+ [{name, X#exchange.name}]),
+ TypeModule:delete(X, FlatBindings)
+ end
+ end, ok, Deletions).