diff options
author | Ben Hood <0x6e6562@gmail.com> | 2008-09-28 16:02:16 +0100 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2008-09-28 16:02:16 +0100 |
commit | 99752d99163773658401670544811c19803d82bf (patch) | |
tree | a8b13ac3c6e9daad946ec32515d45e0bdeff0b42 | |
parent | 2556c8cd83b2d86f17656e2bf0971bc186ff8985 (diff) | |
parent | 387c9a72b16d491104aaf67069c20a987f3dc418 (diff) | |
download | rabbitmq-server-bug19336.tar.gz |
Re-merged 18776 into 19336bug19336
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 64 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 3 |
5 files changed, 48 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index df41a5c6..a0d8d308 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -159,7 +159,7 @@ recover_queue(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(QueueName, Exchange, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), ok. lookup(Name) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b8abc8cc..ddd0ecf4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -580,7 +580,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_exchange:add_binding(QueueName, ExchangeName, + case rabbit_exchange:add_binding(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 0e8b52ee..5ab3718d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -32,7 +32,6 @@ list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/1, delete_binding/1]). -export([add_binding/4, delete_binding/4]). -export([delete/2]). -export([delete_bindings/1]). @@ -67,13 +66,13 @@ -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). -spec(add_binding/1 :: (binding()) -> 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). + {'error', 'durability_settings_incompatible'}). -spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()). -spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). -spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'binding_not_found'}). -spec(delete_bindings/1 :: (queue_name()) -> 'ok' | not_found()). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). @@ -85,7 +84,8 @@ %%---------------------------------------------------------------------------- recover() -> - % TODO: These two functions share commonalities - maybe a refactoring target + % TODO: These two functions share commonalities, hence + % maybe a refactoring target ok = recover_durable_exchanges(), ok = recover_durable_routes(), ok. @@ -164,7 +164,8 @@ list_vhost_exchanges(VHostPath) -> #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). routes_for_exchange(Name) -> - qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route), + qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} + <- mnesia:table(route), N == Name])). %% Usable by Erlang code that wants to publish messages. @@ -213,13 +214,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> topic_matches(BindingKey, RoutingKey)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); -route(X = #exchange{name = Name}, RoutingKey) -> +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(X = #exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, key = RoutingKey, queue_name = '$1'}}, rabbit_cache:read_through({X, RoutingKey}, - fun() -> lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])) end). - + fun() -> lookup_qpids(mnesia:dirty_select(route, + [{MatchHead, [], ['$1']}])) end). lookup_qpids(Queues) -> sets:fold( @@ -228,10 +235,12 @@ lookup_qpids(Queues) -> [QPid | Acc] end, [], sets:from_list(Queues)). -% TODO: Should all of the route and binding management not be refactored to it's own module -% Especially seeing as unbind will have to be implemented for 0.91 ? +% TODO: Should all of the route and binding management not be refactored to +% it's own module, especially seeing as unbind will have to be implemented +% for 0.91 ? delete_routes(QueueName) -> - Binding = #binding{queue_name = QueueName, exchange_name = '_', key = '_'}, + Binding = #binding{queue_name = QueueName, + exchange_name = '_', key = '_'}, {Route, ReverseRoute} = route_with_reverse(Binding), ok = mnesia:delete_object(Route), ok = mnesia:delete_object(ReverseRoute), @@ -251,17 +260,18 @@ call_with_exchange_and_queue(#binding{exchange_name = Exchange, end. -add_binding(QueueName, ExchangeName, RoutingKey, _Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> Binding = #binding{exchange_name = ExchangeName, key = RoutingKey, queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun add_binding/1, [Binding]). + rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end). -delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> Binding = #binding{exchange_name = ExchangeName, key = RoutingKey, queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun delete_binding/1, [Binding]). + rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding) + end). % Must be called from within a transaction add_binding(Binding) -> @@ -270,7 +280,8 @@ add_binding(Binding) -> fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> - ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:write/3) + ok = sync_binding(Binding, Q#amqqueue.durable, + fun mnesia:write/3) end end). @@ -278,11 +289,13 @@ add_binding(Binding) -> delete_binding(Binding) -> call_with_exchange_and_queue( Binding, - fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:delete_object/3), + fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, + fun mnesia:delete_object/3), maybe_auto_delete(X) end). -% TODO: Should the exported function not get renamed to delete_routes instead of this indirection? +% TODO: Should the exported function not get renamed to delete_routes +% instead of this indirection? delete_bindings(QueueName) -> delete_routes(QueueName). @@ -295,7 +308,8 @@ maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> Other -> Other end. -reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue}) -> +reverse_binding(#binding{exchange_name = Exchange, key = Key, + queue_name = Queue}) -> #reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}. %% Must run within a transaction. @@ -304,14 +318,15 @@ sync_binding(Binding, Durable, Fun) -> true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || R <- tuple_to_list(route_with_reverse(Binding))], + [ok, ok] = [Fun(element(1, R), R, write) || R + <- tuple_to_list(route_with_reverse(Binding))], ok. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); route_with_reverse(Binding = #binding{}) -> - Route = #route{ binding = Binding }, - ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) }, + Route = #route{binding = Binding}, + ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)}, {Route, ReverseRoute}. split_topic_key(Key) -> @@ -361,7 +376,8 @@ do_internal_delete(ExchangeName, Routes) -> case mnesia:wread({exchange, ExchangeName}) of [] -> {error, not_found}; _ -> - lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, Routes), + lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, + Routes), ok = mnesia:delete({durable_exchanges, ExchangeName}), ok = mnesia:delete({exchange, ExchangeName}) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2975cc4d..ad715cf7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -36,7 +36,7 @@ -export([enable_cover/0, report_cover/0]). -export([with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). --export([execute_mnesia_transaction/1, execute_mnesia_transaction/2]). +-export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). @@ -68,7 +68,8 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) + when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -81,7 +82,6 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(execute_mnesia_transaction/2 :: (thunk(A), list()) -> A). -spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). @@ -233,10 +233,7 @@ with_user_and_vhost(Username, VHostPath, Thunk) -> %% elsewhere and get a consistent result even when that read %% executes on a different node. execute_mnesia_transaction(TxFun) -> - execute_mnesia_transaction(TxFun, []). - -execute_mnesia_transaction(TxFun, Args) -> - case mnesia:sync_transaction(TxFun, Args) of + case mnesia:sync_transaction(TxFun) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index bf532757..bfb42746 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -110,7 +110,8 @@ table_definitions() -> {record_name, route}, {attributes, record_info(fields, route)}]}, {route, [{type, ordered_set}, {attributes, record_info(fields, route)}]}, - {reverse_route, [{type, ordered_set}, {attributes, record_info(fields, reverse_route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, |