summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-11 23:46:41 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-11 23:46:41 +0100
commitf182b6135b803e0d77887046ad5fb6bb7adddea5 (patch)
treec2b5c09c192929e1ee61f4a7472490ff01bfcbc7
parent432aefa0b24e199ec5b694da32187b777348cf61 (diff)
downloadrabbitmq-server-f182b6135b803e0d77887046ad5fb6bb7adddea5.tar.gz
Implement exchange-to-exchange bindings
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--include/rabbit.hrl4
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_binding.erl124
-rw-r--r--src/rabbit_channel.erl55
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_exchange.erl139
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl9
-rw-r--r--src/rabbit_exchange_type_topic.erl11
-rw-r--r--src/rabbit_mnesia.erl6
-rw-r--r--src/rabbit_router.erl44
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_types.erl12
16 files changed, 242 insertions, 193 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5179eb25..607838ff 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -900,10 +900,10 @@
escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
- <term>queue_name</term>
- <listitem><para>The name of the queue to which the
- binding is attached. with non-ASCII characters
- escaped as in C.</para></listitem>
+ <term>destination</term>
+ <listitem><para>The type and name of the destination
+ to which the binding is attached. with non-ASCII
+ characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>routing_key</term>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 24aa8d98..6bf6c87e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,8 +60,8 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
--record(binding, {exchange_name, key, queue_name, args = []}).
--record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+-record(binding, {exchange_name, key, destination, args = []}).
+-record(reverse_binding, {destination, key, exchange_name, args = []}).
-record(listener, {node, protocol, host, port}).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index cecd666b..014b0736 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -32,7 +32,7 @@
-spec(description/0 :: () -> [{atom(), any()}]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+ -> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7116653c..f4256c8f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -252,7 +252,7 @@ add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
rabbit_binding:add(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = QueueName,
key = RoutingKey,
args = []}).
@@ -434,7 +434,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_binding:remove_for_queue(QueueName).
+ rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
case
@@ -479,7 +479,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_binding:remove_transient_for_queue(QueueName),
+ Post = rabbit_binding:remove_transient_for_destination(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb29580f..ec1816ca 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -33,11 +33,12 @@
-include("rabbit.hrl").
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
--export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([list_for_exchange/1, list_for_destination/1,
+ list_for_exchange_and_destination/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_exchange/1, remove_for_exchange/1,
- remove_for_queue/1, remove_transient_for_queue/1]).
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -66,9 +67,11 @@
bind_res() | rabbit_types:error('binding_not_found')).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
--spec(list_for_exchange_and_queue/2 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_destination/1 ::
+ (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()).
+-spec(list_for_exchange_and_destination/2 ::
+ (rabbit_exchange:name(),
+ rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
@@ -78,16 +81,16 @@
-> [[rabbit_types:info()]]).
-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(remove_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(remove_transient_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_for_destination/1 ::
+ (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())).
+-spec(remove_transient_for_destination/1 ::
+ (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+-define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]).
recover() ->
rabbit_misc:table_fold(
@@ -101,26 +104,24 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end).
-add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+add(Binding) -> add(Binding, fun (_X, _D) -> ok end).
-remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (X, D, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- case InnerFun(X, Q) of
+ case InnerFun(X, D) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
+ [] -> ok = sync_binding(
+ B, are_endpoints_durable(X, D),
fun mnesia:write/3),
{new, X, B};
[_] -> {existing, X, B}
@@ -141,16 +142,14 @@ add(Binding, InnerFun) ->
remove(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (X, D, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- [_] -> case InnerFun(X, Q) of
+ [_] -> case InnerFun(X, D) of
ok ->
- Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
ok = sync_binding(
- B, Durable,
+ B, are_endpoints_durable(X, D),
fun mnesia:delete_object/3),
Deleted =
rabbit_exchange:maybe_auto_delete(X),
@@ -175,7 +174,7 @@ remove(Binding, InnerFun) ->
list(VHostPath) ->
Route = #route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
- queue_name = rabbit_misc:r(VHostPath, queue),
+ destination = rabbit_misc:r(VHostPath, '_'),
_ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
@@ -186,15 +185,15 @@ list_for_exchange(ExchangeName) ->
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_queue(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+list_for_destination(DestinationName) ->
+ Route = #route{binding = #binding{destination = DestinationName, _ = '_'}},
[reverse_binding(B) || #reverse_route{reverse_binding = B} <-
mnesia:dirty_match_object(rabbit_reverse_route,
reverse_route(Route))].
-list_for_exchange_and_queue(ExchangeName, QueueName) ->
+list_for_exchange_and_destination(ExchangeName, DestinationName) ->
Route = #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = DestinationName,
_ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
@@ -208,10 +207,10 @@ map(VHostPath, F) ->
infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
-i(exchange_name, #binding{exchange_name = XName}) -> XName;
-i(queue_name, #binding{queue_name = QName}) -> QName;
-i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
-i(arguments, #binding{args = Arguments}) -> Arguments;
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(destination, #binding{destination = Destination}) -> Destination;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
info(B = #binding{}) -> infos(?INFO_KEYS, B).
@@ -241,22 +240,28 @@ remove_for_exchange(ExchangeName) ->
_ = '_'}},
write)].
-remove_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_forward_routes/1).
+remove_for_destination(DestinationName) ->
+ remove_for_destination(DestinationName, fun delete_forward_routes/1).
-remove_transient_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(DestinationName) ->
+ remove_for_destination(DestinationName,
+ fun delete_transient_forward_routes/1).
%%----------------------------------------------------------------------------
+are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) ->
+ A andalso B;
+are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) ->
+ A andalso B.
+
binding_action(Binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = Destination,
args = Arguments}, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
+ call_with_exchange_and_destination(
+ ExchangeName, Destination,
+ fun (X, D) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
- Fun(X, Q, Binding#binding{args = SortedArgs})
+ Fun(X, D, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -270,15 +275,19 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+call_with_exchange_and_destination(Exchange, Destination, Fun) ->
+ DestTable = case Destination#resource.kind of
+ queue -> rabbit_queue;
+ exchange -> rabbit_exchange
+ end,
rabbit_misc:execute_mnesia_transaction(
fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
+ mnesia:read({DestTable, Destination})} of
+ {[X], [D]} -> Fun(X, D);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, destination_not_found};
+ {[ ], [ ]} -> {error, exchange_and_destination_not_found}
+ end
end).
%% Used with atoms from records; e.g., the type is expected to exist.
@@ -293,7 +302,7 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_queue(QueueName, FwdDeleteFun) ->
+remove_for_destination(DestinationName, FwdDeleteFun) ->
DeletedBindings =
[begin
Route = reverse_route(ReverseRoute),
@@ -304,9 +313,10 @@ remove_for_queue(QueueName, FwdDeleteFun) ->
end || ReverseRoute
<- mnesia:match_object(
rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
+ reverse_route(#route{
+ binding = #binding{
+ destination = DestinationName,
+ _ = '_'}}),
write)],
Grouped = group_bindings_and_auto_delete(
lists:keysort(#binding.exchange_name, DeletedBindings), []),
@@ -360,19 +370,19 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}) ->
#binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args};
reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}) ->
#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
+ destination = Destination,
key = Key,
args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 174eab40..924f1bbe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -715,6 +715,23 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:add/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.bind_ok'{}, NoWait, State);
+
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:remove/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.unbind_ok'{}, false, State);
+
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = Durable,
@@ -806,7 +823,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
nowait = NoWait,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
@@ -814,7 +831,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
@@ -879,42 +896,48 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait,
+binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
+ RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_write_permitted(QueueName, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
+ DestinationName =
+ case DestinationType of
+ queue -> expand_queue_name_shortcut(DestinationNameBin, State);
+ exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin)
+ end,
+ check_write_permitted(DestinationName, State),
+ ActualRoutingKey =
+ expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ destination = DestinationName,
key = ActualRoutingKey,
args = Arguments},
- fun (_X, Q) ->
+ fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
- end
+ end;
+ (_X, #exchange{}) ->
+ ok
end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
- {error, queue_not_found} ->
- rabbit_misc:not_found(QueueName);
- {error, exchange_and_queue_not_found} ->
+ {error, destination_not_found} ->
+ rabbit_misc:not_found(DestinationName);
+ {error, exchange_and_destination_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index a3b6f369..59d3c889 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -257,7 +257,7 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ ArgAtoms = default_if_empty(Args, [exchange_name, destination,
routing_key, arguments]),
display_info_list(rpc_call(Node, rabbit_binding, info_all,
[VHostArg, ArgAtoms]),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 40bee25f..06fd819c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -93,9 +93,9 @@
recover() ->
Exs = rabbit_misc:table_fold(
- fun (Exchange, Acc) ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
- [Exchange | Acc]
+ fun (X, Acc) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ [X | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
recover_with_bindings(
@@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args},
+declare(XName, Type, Durable, AutoDelete, Args) ->
+ X = #exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
TypeModule = type_to_module(Type),
- ok = TypeModule:validate(Exchange),
+ ok = TypeModule:validate(X),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
+ ok = mnesia:write(rabbit_exchange, X, write),
ok = case Durable of
true ->
mnesia:write(rabbit_durable_exchange,
- Exchange, write);
+ X, write);
false ->
ok
end,
- {new, Exchange};
+ {new, X};
[ExistingX] ->
{existing, ExistingX}
end
@@ -225,52 +225,69 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case (type_to_module(Type)):publish(X, Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true -> R;
- false -> case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+publish(X = #exchange{name = XName}, Delivery) ->
+ QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []),
+ QueuePids = lookup_qpids(QueueNames),
+ rabbit_router:deliver(QueuePids, Delivery).
+
+find_queues(Delivery, WorkList, SeenExchanges, QueueNames) ->
+ case queue:out(WorkList) of
+ {empty, _WorkList} ->
+ lists:usort(lists:flatten(QueueNames));
+ {{value, X = #exchange{type = Type}}, WorkList1} ->
+ {NewQueueNames, NewExchangeNames} =
+ process_alternate(
+ X, ((type_to_module(Type)):publish(X, Delivery))),
+ {WorkList2, SeenExchanges1} =
+ lists:foldl(
+ fun (XName, {WorkListN, SeenExchangesN} = Acc) ->
+ case lists:member(XName, SeenExchangesN) of
+ true -> Acc;
+ false -> {case lookup(XName) of
+ {ok, X1} ->
+ queue:in(X1, WorkListN);
+ {error, not_found} ->
+ WorkListN
+ end, [XName | SeenExchangesN]}
+ end
+ end, {WorkList1, SeenExchanges}, NewExchangeNames),
+ find_queues(Delivery, WorkList2, SeenExchanges1,
+ [NewQueueNames | QueueNames])
end.
-call_with_exchange(Exchange, Fun) ->
+process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) ->
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+ undefined ->
+ {[], []};
+ AName ->
+ {[], [AName]}
+ end;
+process_alternate(_X, Results) ->
+ Results.
+
+lookup_qpids(QueueNames) ->
+ lists:foldl(
+ fun (Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], QueueNames).
+
+call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
end).
-delete(ExchangeName, IfUnused) ->
+delete(XName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
false -> fun unconditional_delete/1
end,
- case call_with_exchange(ExchangeName, Fun) of
+ case call_with_exchange(XName, Fun) of
{deleted, X = #exchange{type = Type}, Bs} ->
(type_to_module(Type)):delete(X, Bs),
ok;
@@ -280,21 +297,21 @@ delete(ExchangeName, IfUnused) ->
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
-maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
- case conditional_delete(Exchange) of
- {error, in_use} -> not_deleted;
- {deleted, Exchange, []} -> auto_deleted
+maybe_auto_delete(#exchange{auto_delete = true} = X) ->
+ case conditional_delete(X) of
+ {error, in_use} -> not_deleted;
+ {deleted, X, []} -> auto_deleted
end.
-conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- case rabbit_binding:has_for_exchange(ExchangeName) of
- false -> unconditional_delete(Exchange);
+conditional_delete(X = #exchange{name = XName}) ->
+ case rabbit_binding:has_for_exchange(XName) of
+ false -> unconditional_delete(X);
true -> {error, in_use}
end.
-unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = rabbit_binding:remove_for_exchange(ExchangeName),
- ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
- {deleted, Exchange, Bindings}.
+unconditional_delete(X = #exchange{name = XName}) ->
+ Bindings = rabbit_binding:remove_for_exchange(XName),
+ ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange, XName}),
+ rabbit_event:notify(exchange_deleted, [{name, XName}]),
+ {deleted, X, Bindings}.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 4f6eb851..bc740d4b 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -50,10 +50,9 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
+publish(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey),
- Delivery).
+ rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 94798c78..4dad9cdd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -50,8 +50,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+publish(#exchange{name = Name}, _Delivery) ->
+ rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0a59a175..7edc6f7b 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -57,16 +57,13 @@ description() ->
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{content = Content}}) ->
+ #delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
H -> rabbit_misc:sort_field_table(H)
end,
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end),
- Delivery).
+ rabbit_router:match_bindings(
+ Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
default_headers_match_kind() -> all.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e796acf3..f4a9c904 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -58,13 +58,12 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
+publish(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
+ rabbit_router:match_bindings(Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a3214888..e9ef61a2 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -212,13 +212,15 @@ table_definitions() ->
{match, #amqqueue{name = queue_name_match(), _='_'}}]}].
binding_match() ->
- #binding{queue_name = queue_name_match(),
+ #binding{destination = binding_destination_match(),
exchange_name = exchange_name_match(),
_='_'}.
reverse_binding_match() ->
- #reverse_binding{queue_name = queue_name_match(),
+ #reverse_binding{destination = binding_destination_match(),
exchange_name = exchange_name_match(),
_='_'}.
+binding_destination_match() ->
+ resource_match('_').
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index bfccb0da..7e5a6b84 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -39,19 +39,19 @@
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0]).
+-export_type([routing_key/0, routing_result/0, match_result/0]).
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(qpids() :: [pid()]).
+-type(match_result() :: {[rabbit_amqqueue:name()], [rabbit_exchange:name()]}).
-spec(deliver/2 ::
- (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+ ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
-spec(match_bindings/2 :: (rabbit_exchange:name(),
fun ((rabbit_types:binding()) -> boolean())) ->
- qpids()).
+ match_result()).
-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
- qpids()).
+ match_result()).
-endif.
@@ -84,29 +84,27 @@ deliver(QPids, Delivery) ->
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
+ Query = qlc:q([Destination ||
+ #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ destination = Destination}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])).
match_routing_key(Name, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
+ destination = '$1',
key = RoutingKey,
_ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- lists:foldl(
- fun (Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], lists:usort(Queues)).
+ partition_destinations(
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+partition_destinations(Destinations) ->
+ lists:partition(
+ fun (DestinationName) -> DestinationName#resource.kind =:= queue end,
+ Destinations).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a72656b7..eb08087a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1043,9 +1043,9 @@ test_server_status() ->
%% misc binding listing APIs
[_|_] = rabbit_binding:list_for_exchange(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
- [_] = rabbit_binding:list_for_queue(
+ [_] = rabbit_binding:list_for_destination(
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
- [_] = rabbit_binding:list_for_exchange_and_queue(
+ [_] = rabbit_binding:list_for_exchange_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 0b6a15ec..03fbe55a 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,9 +39,10 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
- ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
+ binding/0, binding_destination/0, amqqueue/0, exchange/0,
+ connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
+ ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection_exit/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -113,9 +114,12 @@
host :: rabbit_networking:hostname(),
port :: rabbit_networking:ip_port()}).
+-type(binding_destination() ::
+ rabbit_amqqueue:name() | rabbit_exchange:name()).
+
-type(binding() ::
#binding{exchange_name :: rabbit_exchange:name(),
- queue_name :: rabbit_amqqueue:name(),
+ destination :: binding_destination(),
key :: rabbit_binding:key(),
args :: rabbit_framing:amqp_table()}).