summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl64
1 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 0e8b52ee..5ab3718d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -32,7 +32,6 @@
list_vhost_exchanges/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/1, delete_binding/1]).
-export([add_binding/4, delete_binding/4]).
-export([delete/2]).
-export([delete_bindings/1]).
@@ -67,13 +66,13 @@
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
-spec(add_binding/1 :: (binding()) -> 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
+ {'error', 'durability_settings_incompatible'}).
-spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()).
-spec(add_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
-spec(delete_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(delete_bindings/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
@@ -85,7 +84,8 @@
%%----------------------------------------------------------------------------
recover() ->
- % TODO: These two functions share commonalities - maybe a refactoring target
+ % TODO: These two functions share commonalities, hence
+ % maybe a refactoring target
ok = recover_durable_exchanges(),
ok = recover_durable_routes(),
ok.
@@ -164,7 +164,8 @@ list_vhost_exchanges(VHostPath) ->
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
routes_for_exchange(Name) ->
- qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route),
+ qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}}
+ <- mnesia:table(route),
N == Name])).
%% Usable by Erlang code that wants to publish messages.
@@ -213,13 +214,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
topic_matches(BindingKey, RoutingKey)]),
lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query]));
-route(X = #exchange{name = Name}, RoutingKey) ->
+route(X = #exchange{type = fanout}, _) ->
+ route_internal(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey) ->
+ route_internal(X, RoutingKey).
+
+route_internal(X = #exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
key = RoutingKey,
queue_name = '$1'}},
rabbit_cache:read_through({X, RoutingKey},
- fun() -> lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])) end).
-
+ fun() -> lookup_qpids(mnesia:dirty_select(route,
+ [{MatchHead, [], ['$1']}])) end).
lookup_qpids(Queues) ->
sets:fold(
@@ -228,10 +235,12 @@ lookup_qpids(Queues) ->
[QPid | Acc]
end, [], sets:from_list(Queues)).
-% TODO: Should all of the route and binding management not be refactored to it's own module
-% Especially seeing as unbind will have to be implemented for 0.91 ?
+% TODO: Should all of the route and binding management not be refactored to
+% it's own module, especially seeing as unbind will have to be implemented
+% for 0.91 ?
delete_routes(QueueName) ->
- Binding = #binding{queue_name = QueueName, exchange_name = '_', key = '_'},
+ Binding = #binding{queue_name = QueueName,
+ exchange_name = '_', key = '_'},
{Route, ReverseRoute} = route_with_reverse(Binding),
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(ReverseRoute),
@@ -251,17 +260,18 @@ call_with_exchange_and_queue(#binding{exchange_name = Exchange,
end.
-add_binding(QueueName, ExchangeName, RoutingKey, _Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
Binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun add_binding/1, [Binding]).
+ rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end).
-delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
Binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun delete_binding/1, [Binding]).
+ rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding)
+ end).
% Must be called from within a transaction
add_binding(Binding) ->
@@ -270,7 +280,8 @@ add_binding(Binding) ->
fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:write/3)
+ ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:write/3)
end
end).
@@ -278,11 +289,13 @@ add_binding(Binding) ->
delete_binding(Binding) ->
call_with_exchange_and_queue(
Binding,
- fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:delete_object/3),
+ fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
maybe_auto_delete(X)
end).
-% TODO: Should the exported function not get renamed to delete_routes instead of this indirection?
+% TODO: Should the exported function not get renamed to delete_routes
+% instead of this indirection?
delete_bindings(QueueName) ->
delete_routes(QueueName).
@@ -295,7 +308,8 @@ maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
Other -> Other
end.
-reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue}) ->
+reverse_binding(#binding{exchange_name = Exchange, key = Key,
+ queue_name = Queue}) ->
#reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}.
%% Must run within a transaction.
@@ -304,14 +318,15 @@ sync_binding(Binding, Durable, Fun) ->
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
end,
- [ok, ok] = [Fun(element(1, R), R, write) || R <- tuple_to_list(route_with_reverse(Binding))],
+ [ok, ok] = [Fun(element(1, R), R, write) || R
+ <- tuple_to_list(route_with_reverse(Binding))],
ok.
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
route_with_reverse(Binding = #binding{}) ->
- Route = #route{ binding = Binding },
- ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) },
+ Route = #route{binding = Binding},
+ ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)},
{Route, ReverseRoute}.
split_topic_key(Key) ->
@@ -361,7 +376,8 @@ do_internal_delete(ExchangeName, Routes) ->
case mnesia:wread({exchange, ExchangeName}) of
[] -> {error, not_found};
_ ->
- lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, Routes),
+ lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end,
+ Routes),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
ok = mnesia:delete({exchange, ExchangeName})
end.