diff options
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r-- | src/rabbit_exchange.erl | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 0e8b52ee..5ab3718d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -32,7 +32,6 @@ list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/1, delete_binding/1]). -export([add_binding/4, delete_binding/4]). -export([delete/2]). -export([delete_bindings/1]). @@ -67,13 +66,13 @@ -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). -spec(add_binding/1 :: (binding()) -> 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). + {'error', 'durability_settings_incompatible'}). -spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()). -spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). -spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'binding_not_found'}). -spec(delete_bindings/1 :: (queue_name()) -> 'ok' | not_found()). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). @@ -85,7 +84,8 @@ %%---------------------------------------------------------------------------- recover() -> - % TODO: These two functions share commonalities - maybe a refactoring target + % TODO: These two functions share commonalities, hence + % maybe a refactoring target ok = recover_durable_exchanges(), ok = recover_durable_routes(), ok. @@ -164,7 +164,8 @@ list_vhost_exchanges(VHostPath) -> #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). routes_for_exchange(Name) -> - qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route), + qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} + <- mnesia:table(route), N == Name])). %% Usable by Erlang code that wants to publish messages. @@ -213,13 +214,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> topic_matches(BindingKey, RoutingKey)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); -route(X = #exchange{name = Name}, RoutingKey) -> +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(X = #exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, key = RoutingKey, queue_name = '$1'}}, rabbit_cache:read_through({X, RoutingKey}, - fun() -> lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])) end). - + fun() -> lookup_qpids(mnesia:dirty_select(route, + [{MatchHead, [], ['$1']}])) end). lookup_qpids(Queues) -> sets:fold( @@ -228,10 +235,12 @@ lookup_qpids(Queues) -> [QPid | Acc] end, [], sets:from_list(Queues)). -% TODO: Should all of the route and binding management not be refactored to it's own module -% Especially seeing as unbind will have to be implemented for 0.91 ? +% TODO: Should all of the route and binding management not be refactored to +% it's own module, especially seeing as unbind will have to be implemented +% for 0.91 ? delete_routes(QueueName) -> - Binding = #binding{queue_name = QueueName, exchange_name = '_', key = '_'}, + Binding = #binding{queue_name = QueueName, + exchange_name = '_', key = '_'}, {Route, ReverseRoute} = route_with_reverse(Binding), ok = mnesia:delete_object(Route), ok = mnesia:delete_object(ReverseRoute), @@ -251,17 +260,18 @@ call_with_exchange_and_queue(#binding{exchange_name = Exchange, end. -add_binding(QueueName, ExchangeName, RoutingKey, _Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> Binding = #binding{exchange_name = ExchangeName, key = RoutingKey, queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun add_binding/1, [Binding]). + rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end). -delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> Binding = #binding{exchange_name = ExchangeName, key = RoutingKey, queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun delete_binding/1, [Binding]). + rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding) + end). % Must be called from within a transaction add_binding(Binding) -> @@ -270,7 +280,8 @@ add_binding(Binding) -> fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> - ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:write/3) + ok = sync_binding(Binding, Q#amqqueue.durable, + fun mnesia:write/3) end end). @@ -278,11 +289,13 @@ add_binding(Binding) -> delete_binding(Binding) -> call_with_exchange_and_queue( Binding, - fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:delete_object/3), + fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, + fun mnesia:delete_object/3), maybe_auto_delete(X) end). -% TODO: Should the exported function not get renamed to delete_routes instead of this indirection? +% TODO: Should the exported function not get renamed to delete_routes +% instead of this indirection? delete_bindings(QueueName) -> delete_routes(QueueName). @@ -295,7 +308,8 @@ maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> Other -> Other end. -reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue}) -> +reverse_binding(#binding{exchange_name = Exchange, key = Key, + queue_name = Queue}) -> #reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}. %% Must run within a transaction. @@ -304,14 +318,15 @@ sync_binding(Binding, Durable, Fun) -> true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || R <- tuple_to_list(route_with_reverse(Binding))], + [ok, ok] = [Fun(element(1, R), R, write) || R + <- tuple_to_list(route_with_reverse(Binding))], ok. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); route_with_reverse(Binding = #binding{}) -> - Route = #route{ binding = Binding }, - ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) }, + Route = #route{binding = Binding}, + ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)}, {Route, ReverseRoute}. split_topic_key(Key) -> @@ -361,7 +376,8 @@ do_internal_delete(ExchangeName, Routes) -> case mnesia:wread({exchange, ExchangeName}) of [] -> {error, not_found}; _ -> - lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, Routes), + lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, + Routes), ok = mnesia:delete({durable_exchanges, ExchangeName}), ok = mnesia:delete({exchange, ExchangeName}) end. |