summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-09-28 16:02:16 +0100
committerBen Hood <0x6e6562@gmail.com>2008-09-28 16:02:16 +0100
commit99752d99163773658401670544811c19803d82bf (patch)
treea8b13ac3c6e9daad946ec32515d45e0bdeff0b42
parent2556c8cd83b2d86f17656e2bf0971bc186ff8985 (diff)
parent387c9a72b16d491104aaf67069c20a987f3dc418 (diff)
downloadrabbitmq-server-bug19336.tar.gz
Re-merged 18776 into 19336bug19336
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_exchange.erl64
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_mnesia.erl3
5 files changed, 48 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index df41a5c6..a0d8d308 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -159,7 +159,7 @@ recover_queue(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(QueueName, Exchange, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
ok.
lookup(Name) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b8abc8cc..ddd0ecf4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -580,7 +580,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case rabbit_exchange:add_binding(QueueName, ExchangeName,
+ case rabbit_exchange:add_binding(ExchangeName, QueueName,
ActualRoutingKey, Arguments) of
{error, queue_not_found} ->
rabbit_misc:protocol_error(
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 0e8b52ee..5ab3718d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -32,7 +32,6 @@
list_vhost_exchanges/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/1, delete_binding/1]).
-export([add_binding/4, delete_binding/4]).
-export([delete/2]).
-export([delete_bindings/1]).
@@ -67,13 +66,13 @@
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
-spec(add_binding/1 :: (binding()) -> 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
+ {'error', 'durability_settings_incompatible'}).
-spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()).
-spec(add_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
-spec(delete_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(delete_bindings/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
@@ -85,7 +84,8 @@
%%----------------------------------------------------------------------------
recover() ->
- % TODO: These two functions share commonalities - maybe a refactoring target
+ % TODO: These two functions share commonalities, hence
+ % maybe a refactoring target
ok = recover_durable_exchanges(),
ok = recover_durable_routes(),
ok.
@@ -164,7 +164,8 @@ list_vhost_exchanges(VHostPath) ->
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
routes_for_exchange(Name) ->
- qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route),
+ qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}}
+ <- mnesia:table(route),
N == Name])).
%% Usable by Erlang code that wants to publish messages.
@@ -213,13 +214,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
topic_matches(BindingKey, RoutingKey)]),
lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query]));
-route(X = #exchange{name = Name}, RoutingKey) ->
+route(X = #exchange{type = fanout}, _) ->
+ route_internal(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey) ->
+ route_internal(X, RoutingKey).
+
+route_internal(X = #exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
key = RoutingKey,
queue_name = '$1'}},
rabbit_cache:read_through({X, RoutingKey},
- fun() -> lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])) end).
-
+ fun() -> lookup_qpids(mnesia:dirty_select(route,
+ [{MatchHead, [], ['$1']}])) end).
lookup_qpids(Queues) ->
sets:fold(
@@ -228,10 +235,12 @@ lookup_qpids(Queues) ->
[QPid | Acc]
end, [], sets:from_list(Queues)).
-% TODO: Should all of the route and binding management not be refactored to it's own module
-% Especially seeing as unbind will have to be implemented for 0.91 ?
+% TODO: Should all of the route and binding management not be refactored to
+% it's own module, especially seeing as unbind will have to be implemented
+% for 0.91 ?
delete_routes(QueueName) ->
- Binding = #binding{queue_name = QueueName, exchange_name = '_', key = '_'},
+ Binding = #binding{queue_name = QueueName,
+ exchange_name = '_', key = '_'},
{Route, ReverseRoute} = route_with_reverse(Binding),
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(ReverseRoute),
@@ -251,17 +260,18 @@ call_with_exchange_and_queue(#binding{exchange_name = Exchange,
end.
-add_binding(QueueName, ExchangeName, RoutingKey, _Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
Binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun add_binding/1, [Binding]).
+ rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end).
-delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
Binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun delete_binding/1, [Binding]).
+ rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding)
+ end).
% Must be called from within a transaction
add_binding(Binding) ->
@@ -270,7 +280,8 @@ add_binding(Binding) ->
fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:write/3)
+ ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:write/3)
end
end).
@@ -278,11 +289,13 @@ add_binding(Binding) ->
delete_binding(Binding) ->
call_with_exchange_and_queue(
Binding,
- fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, fun mnesia:delete_object/3),
+ fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
maybe_auto_delete(X)
end).
-% TODO: Should the exported function not get renamed to delete_routes instead of this indirection?
+% TODO: Should the exported function not get renamed to delete_routes
+% instead of this indirection?
delete_bindings(QueueName) ->
delete_routes(QueueName).
@@ -295,7 +308,8 @@ maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
Other -> Other
end.
-reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue}) ->
+reverse_binding(#binding{exchange_name = Exchange, key = Key,
+ queue_name = Queue}) ->
#reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}.
%% Must run within a transaction.
@@ -304,14 +318,15 @@ sync_binding(Binding, Durable, Fun) ->
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
end,
- [ok, ok] = [Fun(element(1, R), R, write) || R <- tuple_to_list(route_with_reverse(Binding))],
+ [ok, ok] = [Fun(element(1, R), R, write) || R
+ <- tuple_to_list(route_with_reverse(Binding))],
ok.
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
route_with_reverse(Binding = #binding{}) ->
- Route = #route{ binding = Binding },
- ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) },
+ Route = #route{binding = Binding},
+ ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)},
{Route, ReverseRoute}.
split_topic_key(Key) ->
@@ -361,7 +376,8 @@ do_internal_delete(ExchangeName, Routes) ->
case mnesia:wread({exchange, ExchangeName}) of
[] -> {error, not_found};
_ ->
- lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end, Routes),
+ lists:foreach(fun (R) -> ok = mnesia:delete_object(R) end,
+ Routes),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
ok = mnesia:delete({exchange, ExchangeName})
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2975cc4d..ad715cf7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,7 +36,7 @@
-export([enable_cover/0, report_cover/0]).
-export([with_exit_handler/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
--export([execute_mnesia_transaction/1, execute_mnesia_transaction/2]).
+-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
@@ -68,7 +68,8 @@
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
+ when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
@@ -81,7 +82,6 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(execute_mnesia_transaction/2 :: (thunk(A), list()) -> A).
-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> node()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
@@ -233,10 +233,7 @@ with_user_and_vhost(Username, VHostPath, Thunk) ->
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
execute_mnesia_transaction(TxFun) ->
- execute_mnesia_transaction(TxFun, []).
-
-execute_mnesia_transaction(TxFun, Args) ->
- case mnesia:sync_transaction(TxFun, Args) of
+ case mnesia:sync_transaction(TxFun) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index bf532757..bfb42746 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -110,7 +110,8 @@ table_definitions() ->
{record_name, route},
{attributes, record_info(fields, route)}]},
{route, [{type, ordered_set}, {attributes, record_info(fields, route)}]},
- {reverse_route, [{type, ordered_set}, {attributes, record_info(fields, reverse_route)}]},
+ {reverse_route, [{type, ordered_set},
+ {attributes, record_info(fields, reverse_route)}]},
{durable_exchanges, [{disc_copies, [node()]},
{record_name, exchange},
{attributes, record_info(fields, exchange)}]},