summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml31
-rw-r--r--include/rabbit.hrl4
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_binding.erl275
-rw-r--r--src/rabbit_channel.erl64
-rw-r--r--src/rabbit_control.erl9
-rw-r--r--src/rabbit_exchange.erl95
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl9
-rw-r--r--src/rabbit_exchange_type_topic.erl11
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_router.erl64
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl19
16 files changed, 349 insertions, 271 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 73882861..3b7244c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -894,16 +894,31 @@
</para>
<variablelist>
<varlistentry>
- <term>exchange_name</term>
- <listitem><para>The name of the exchange to which the
- binding is attached. with non-ASCII characters
- escaped as in C.</para></listitem>
+ <term>source_name</term>
+ <listitem><para>The name of the source of messages to
+ which the binding is attached. With non-ASCII
+ characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
- <term>queue_name</term>
- <listitem><para>The name of the queue to which the
- binding is attached. with non-ASCII characters
- escaped as in C.</para></listitem>
+ <term>source_kind</term>
+ <listitem><para>The kind of the source of messages to
+ which the binding is attached. Currently always
+ queue. With non-ASCII characters escaped as in
+ C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>destination_name</term>
+ <listitem><para>The name of the destination of
+ messages to which the binding is attached. With
+ non-ASCII characters escaped as in
+ C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>destination_kind</term>
+ <listitem><para>The kind of the destination of
+ messages to which the binding is attached. With
+ non-ASCII characters escaped as in
+ C.</para></listitem>
</varlistentry>
<varlistentry>
<term>routing_key</term>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 24aa8d98..bce9dfa3 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,8 +60,8 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
--record(binding, {exchange_name, key, queue_name, args = []}).
--record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+-record(binding, {source, key, destination, args = []}).
+-record(reverse_binding, {destination, key, source, args = []}).
-record(listener, {node, protocol, host, port}).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index cecd666b..014b0736 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -32,7 +32,7 @@
-spec(description/0 :: () -> [{atom(), any()}]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+ -> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 42bddc5e..fe67ea9b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -254,10 +254,10 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = []}).
+ rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -440,7 +440,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_binding:remove_for_queue(QueueName).
+ rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
case
@@ -484,7 +484,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_binding:remove_transient_for_queue(QueueName),
+ Post = rabbit_binding:remove_transient_for_destination(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 19150fa9..a0389a52 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -33,11 +33,12 @@
-include("rabbit.hrl").
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
--export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([list_for_source/1, list_for_destination/1,
+ list_for_source_and_destination/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
--export([has_for_exchange/1, remove_for_exchange/1,
- remove_for_queue/1, remove_transient_for_queue/1]).
+-export([has_for_source/1, remove_for_source/1,
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -47,12 +48,13 @@
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
+-type(bind_errors() :: rabbit_types:error('source_not_found' |
+ 'destination_not_found' |
+ 'source_and_destination_not_found')).
-type(bind_res() :: 'ok' | bind_errors()).
-type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
+ fun((rabbit_types:exchange(),
+ rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
@@ -65,10 +67,13 @@
-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
bind_res() | rabbit_types:error('binding_not_found')).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
--spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
--spec(list_for_exchange_and_queue/2 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_source/1 ::
+ (rabbit_types:binding_source()) -> bindings()).
+-spec(list_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> bindings()).
+-spec(list_for_source_and_destination/2 ::
+ (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
+ bindings()).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
@@ -76,18 +81,20 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
-> [[rabbit_types:info()]]).
--spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
--spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(remove_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(remove_transient_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
+-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
+-spec(remove_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> fun (() -> any())).
+-spec(remove_transient_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> fun (() -> any())).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+-define(INFO_KEYS, [source_name, source_kind,
+ destination_name, destination_kind,
+ routing_key, arguments]).
recover() ->
rabbit_misc:table_fold(
@@ -101,36 +108,34 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
-add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
-remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- case InnerFun(X, Q) of
+ case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
+ [] -> ok = sync_binding(
+ B, all_durable([Src, Dst]),
fun mnesia:write/3),
- {new, X, B};
- [_] -> {existing, X, B}
+ {new, Src, B};
+ [_] -> {existing, Src, B}
end;
{error, _} = E ->
E
end
end) of
- {new, X = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(X, B),
+ {new, Src = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Src, B),
rabbit_event:notify(binding_created, info(B));
{existing, _, _} ->
ok;
@@ -141,61 +146,57 @@ add(Binding, InnerFun) ->
remove(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- [_] -> case InnerFun(X, Q) of
- ok ->
- Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
- fun mnesia:delete_object/3),
- Deleted =
- rabbit_exchange:maybe_auto_delete(X),
- {{Deleted, X}, B};
- {error, _} = E ->
- E
- end
+ [] ->
+ {error, binding_not_found};
+ [_] ->
+ case InnerFun(Src, Dst) of
+ ok ->
+ ok = sync_binding(
+ B, all_durable([Src, Dst]),
+ fun mnesia:delete_object/3),
+ Deleted =
+ rabbit_exchange:maybe_auto_delete(Src),
+ {{Deleted, Src}, B};
+ {error, _} = E ->
+ E
+ end
end
end) of
{error, _} = Err ->
Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> ok = Module:delete(X, [B]);
- not_deleted -> ok = Module:remove_bindings(X, [B])
- end,
+ {{IsDeleted, Src}, B} ->
+ ok = post_binding_removal(IsDeleted, Src, [B]),
rabbit_event:notify(binding_deleted, info(B)),
ok
end.
list(VHostPath) ->
- Route = #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- queue_name = rabbit_misc:r(VHostPath, queue),
- _ = '_'},
+ VHostResource = rabbit_misc:r(VHostPath, '_'),
+ Route = #route{binding = #binding{source = VHostResource,
+ destination = VHostResource,
+ _ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_exchange(XName) ->
- Route = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+list_for_source(SrcName) ->
+ Route = #route{binding = #binding{source = SrcName, _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_queue(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+list_for_destination(DstName) ->
+ Route = #route{binding = #binding{destination = DstName, _ = '_'}},
[reverse_binding(B) || #reverse_route{reverse_binding = B} <-
mnesia:dirty_match_object(rabbit_reverse_route,
reverse_route(Route))].
-list_for_exchange_and_queue(XName, QueueName) ->
- Route = #route{binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- _ = '_'}},
+list_for_source_and_destination(SrcName, DstName) ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
@@ -208,10 +209,12 @@ map(VHostPath, F) ->
infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
-i(exchange_name, #binding{exchange_name = XName}) -> XName;
-i(queue_name, #binding{queue_name = QName}) -> QName;
-i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
-i(arguments, #binding{args = Arguments}) -> Arguments;
+i(source_name, #binding{source = SrcName}) -> SrcName#resource.name;
+i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind;
+i(destination_name, #binding{destination = DstName}) -> DstName#resource.name;
+i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
info(B = #binding{}) -> infos(?INFO_KEYS, B).
@@ -222,14 +225,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
-has_for_exchange(XName) ->
- Match = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+has_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
-remove_for_exchange(XName) ->
+remove_for_source(SrcName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
@@ -237,26 +240,31 @@ remove_for_exchange(XName) ->
Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
- #route{binding = #binding{exchange_name = XName,
- _ = '_'}},
+ #route{binding = #binding{source = SrcName,
+ _ = '_'}},
write)].
-remove_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_forward_routes/1).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_forward_routes/1).
-remove_transient_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_transient_forward_routes/1).
%%----------------------------------------------------------------------------
-binding_action(Binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- args = Arguments}, Fun) ->
- call_with_exchange_and_queue(
- XName, QueueName,
- fun (X, Q) ->
+all_durable(Resources) ->
+ lists:all(fun (#exchange{durable = D}) -> D;
+ (#amqqueue{durable = D}) -> D
+ end, Resources).
+
+binding_action(Binding = #binding{source = SrcName,
+ destination = DstName,
+ args = Arguments}, Fun) ->
+ call_with_source_and_destination(
+ SrcName, DstName,
+ fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
- Fun(X, Q, Binding#binding{args = SortedArgs})
+ Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -270,17 +278,22 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_exchange_and_queue(XName, QueueName, Fun) ->
+call_with_source_and_destination(SrcName, DstName, Fun) ->
+ SrcTable = table_for_resource(SrcName),
+ DstTable = table_for_resource(DstName),
rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, XName}),
- mnesia:read({rabbit_queue, QueueName})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
+ fun () -> case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> {error, source_not_found};
+ {[_], [] } -> {error, destination_not_found};
+ {[], [] } -> {error, source_and_destination_not_found}
+ end
end).
+table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
+table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
{ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
@@ -293,7 +306,7 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_queue(QueueName, FwdDeleteFun) ->
+remove_for_destination(DstName, FwdDeleteFun) ->
DeletedBindings =
[begin
Route = reverse_route(ReverseRoute),
@@ -304,39 +317,45 @@ remove_for_queue(QueueName, FwdDeleteFun) ->
end || ReverseRoute
<- mnesia:match_object(
rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
+ reverse_route(#route{
+ binding = #binding{
+ destination = DstName,
+ _ = '_'}}),
write)],
Grouped = group_bindings_and_auto_delete(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ lists:keysort(#binding.source, DeletedBindings), []),
fun () ->
lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
+ fun ({{IsDeleted, Src}, Bs}) ->
+ ok = post_binding_removal(IsDeleted, Src, Bs)
end, Grouped)
end.
+post_binding_removal(IsDeleted, Src = #exchange{ type = Type }, Bs) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ {auto_deleted, Fun} -> ok = Module:delete(Src, Bs),
+ Fun(),
+ ok;
+ not_deleted -> ok = Module:remove_bindings(Src, Bs)
+ end.
+
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
group_bindings_and_auto_delete([], Acc) ->
Acc;
group_bindings_and_auto_delete(
- [B = #binding{exchange_name = XName} | Bs], Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B], Acc).
+ [B = #binding{source = SrcName} | Bs], Acc) ->
+ group_bindings_and_auto_delete(SrcName, Bs, [B], Acc).
group_bindings_and_auto_delete(
- XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc);
-group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) ->
- %% either Removed is [], or its head has a non-matching XName
- [X] = mnesia:read({rabbit_exchange, XName}),
- NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
+ SrcName, [B = #binding{source = SrcName} | Bs], Bindings, Acc) ->
+ group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc);
+group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) ->
+ %% either Removed is [], or its head has a non-matching SrcName
+ [Src] = mnesia:read({rabbit_exchange, SrcName}),
+ NewAcc = [{{rabbit_exchange:maybe_auto_delete(Src), Src}, Bindings} | Acc],
group_bindings_and_auto_delete(Removed, NewAcc).
delete_forward_routes(Route) ->
@@ -358,20 +377,20 @@ reverse_route(#route{binding = Binding}) ->
reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
-reverse_binding(#reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}.
+reverse_binding(#reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe36cef9..f75707c3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -728,6 +728,24 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:add/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.bind_ok'{}, NoWait, State);
+
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:remove/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.unbind_ok'{}, NoWait, State);
+
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = Durable,
@@ -819,7 +837,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
nowait = NoWait,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
@@ -827,7 +845,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
@@ -893,42 +911,48 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait,
+binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
+ RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_write_permitted(QueueName, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
+ DestinationName =
+ case DestinationType of
+ queue -> expand_queue_name_shortcut(DestinationNameBin, State);
+ exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin)
+ end,
+ check_write_permitted(DestinationName, State),
+ ActualRoutingKey =
+ expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = ActualRoutingKey,
- args = Arguments},
- fun (_X, Q) ->
+ case Fun(#binding{source = ExchangeName,
+ destination = DestinationName,
+ key = ActualRoutingKey,
+ args = Arguments},
+ fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
- end
+ end;
+ (_X, #exchange{}) ->
+ ok
end) of
- {error, exchange_not_found} ->
+ {error, source_not_found} ->
rabbit_misc:not_found(ExchangeName);
- {error, queue_not_found} ->
- rabbit_misc:not_found(QueueName);
- {error, exchange_and_queue_not_found} ->
+ {error, destination_not_found} ->
+ rabbit_misc:not_found(DestinationName);
+ {error, source_and_destination_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 57efe7cc..a9c798fc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -257,7 +257,8 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ ArgAtoms = default_if_empty(Args, [source_name, source_kind,
+ destination_name, destination_kind,
routing_key, arguments]),
display_info_list(rpc_call(Node, rabbit_binding, info_all,
[VHostArg, ArgAtoms]),
@@ -313,9 +314,9 @@ default_if_empty(List, Default) when is_list(List) ->
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
- fun (Result) -> display_row(
- [format_info_item(proplists:get_value(X, Result)) ||
- X <- InfoItemKeys])
+ fun (Result) ->
+ display_row([format_info_item(proplists:get_value(X, Result))
+ || X <- InfoItemKeys])
end, Results),
ok;
display_info_list(Other, _) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2a19d5b1..db5a46bd 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -82,8 +82,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
- 'not_deleted' | 'auto_deleted').
+-spec(maybe_auto_delete/1::
+ (rabbit_types:exchange())
+ -> 'not_deleted' | {'auto_deleted', fun (() -> any())}).
-endif.
@@ -99,11 +100,11 @@ recover() ->
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
recover_with_bindings(
- lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#binding.source, Bs),
lists:keysort(#exchange.name, Xs), []).
-recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
- Xs = [#exchange{name = Name} | _],
+recover_with_bindings([B = #binding{source = XName} | Rest],
+ Xs = [#exchange{name = XName} | _],
Bindings) ->
recover_with_bindings(Rest, Xs, [B | Bindings]);
recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
@@ -225,38 +226,49 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case (type_to_module(Type)):publish(X, Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true -> R;
- false -> case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+publish(X = #exchange{name = XName}, Delivery) ->
+ QNames = find_qnames(Delivery, queue:from_list([X]),
+ sets:from_list([XName]), []),
+ rabbit_router:deliver(QNames, Delivery).
+
+find_qnames(Delivery, WorkList, SeenXs, QNames) ->
+ case queue:out(WorkList) of
+ {empty, _WorkList} ->
+ lists:usort(QNames);
+ {{value, X = #exchange{type = Type}}, WorkList1} ->
+ DstNames =
+ process_alternate(
+ X, ((type_to_module(Type)):publish(X, Delivery))),
+ {WorkList2, SeenXs1, QNames1} =
+ lists:foldl(
+ fun (XName = #resource{kind = exchange},
+ {WorkListN, SeenXsN, QNamesN} = Acc) ->
+ case sets:is_element(XName, SeenXsN) of
+ true -> Acc;
+ false -> {case lookup(XName) of
+ {ok, X1} ->
+ queue:in(X1, WorkListN);
+ {error, not_found} ->
+ WorkListN
+ end,
+ sets:add_element(XName, SeenXsN),
+ QNamesN}
+ end;
+ (QName = #resource{kind = queue},
+ {WorkListN, SeenXsN, QNamesN})->
+ {WorkListN, SeenXsN, [QName | QNamesN]}
+ end, {WorkList1, SeenXs, QNames}, DstNames),
+ find_qnames(Delivery, WorkList2, SeenXs1, QNames1)
end.
+process_alternate(#exchange{name = XName, arguments = Args}, []) ->
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+ undefined -> [];
+ AName -> [AName]
+ end;
+process_alternate(_X, Results) ->
+ Results.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
@@ -271,8 +283,9 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
case call_with_exchange(XName, Fun) of
- {deleted, X = #exchange{type = Type}, Bs} ->
+ {deleted, X = #exchange{type = Type}, Bs, Fun1} ->
(type_to_module(Type)):delete(X, Bs),
+ Fun1(),
ok;
Error = {error, _InUseOrNotFound} ->
Error
@@ -282,19 +295,19 @@ maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
case conditional_delete(X) of
- {error, in_use} -> not_deleted;
- {deleted, X, []} -> auto_deleted
+ {error, in_use} -> not_deleted;
+ {deleted, X, [], Fun} -> {auto_deleted, Fun}
end.
conditional_delete(X = #exchange{name = XName}) ->
- case rabbit_binding:has_for_exchange(XName) of
+ case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
true -> {error, in_use}
end.
unconditional_delete(X = #exchange{name = XName}) ->
- Bindings = rabbit_binding:remove_for_exchange(XName),
+ Bindings = rabbit_binding:remove_for_source(XName),
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
rabbit_event:notify(exchange_deleted, [{name, XName}]),
- {deleted, X, Bindings}.
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 4f6eb851..bc740d4b 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -50,10 +50,9 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
+publish(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey),
- Delivery).
+ rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 94798c78..4dad9cdd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -50,8 +50,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+publish(#exchange{name = Name}, _Delivery) ->
+ rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0a59a175..7edc6f7b 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -57,16 +57,13 @@ description() ->
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{content = Content}}) ->
+ #delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
H -> rabbit_misc:sort_field_table(H)
end,
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end),
- Delivery).
+ rabbit_router:match_bindings(
+ Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
default_headers_match_kind() -> all.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e796acf3..f4a9c904 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -58,13 +58,12 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
+publish(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
+ rabbit_router:match_bindings(Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a3214888..33d13978 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -212,13 +212,15 @@ table_definitions() ->
{match, #amqqueue{name = queue_name_match(), _='_'}}]}].
binding_match() ->
- #binding{queue_name = queue_name_match(),
- exchange_name = exchange_name_match(),
+ #binding{source = exchange_name_match(),
+ destination = binding_destination_match(),
_='_'}.
reverse_binding_match() ->
- #reverse_binding{queue_name = queue_name_match(),
- exchange_name = exchange_name_match(),
+ #reverse_binding{destination = binding_destination_match(),
+ source = exchange_name_match(),
_='_'}.
+binding_destination_match() ->
+ resource_match('_').
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index bd57f737..ebe28162 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -39,26 +39,27 @@
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0]).
+-export_type([routing_key/0, routing_result/0, match_result/0]).
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(qpids() :: [pid()]).
+-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 ::
- (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
--spec(match_bindings/2 :: (rabbit_exchange:name(),
+-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
- qpids()).
--spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
- qpids()).
+ match_result()).
+-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
+ routing_key() | '_') -> match_result()).
-endif.
%%----------------------------------------------------------------------------
-deliver(QPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QNames, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -66,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% fire-and-forget cast here and return the QPids - the semantics
%% is preserved. This scales much better than the non-immediate
%% case below.
+ QPids = lookup_qpids(QNames),
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery) ->
+deliver(QNames, Delivery) ->
+ QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -81,32 +84,33 @@ deliver(QPids, Delivery) ->
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = XName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- XName == Name,
- Match(Binding)]),
- lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
-
-match_routing_key(Name, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
+lookup_qpids(QNames) ->
lists:foldl(
fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
- end, [], lists:usort(Queues)).
+ end, [], QNames).
+
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same source
+match_bindings(SrcName, Match) ->
+ Query = qlc:q([DestinationName ||
+ #route{binding = Binding = #binding{
+ source = SrcName1,
+ destination = DestinationName}} <-
+ mnesia:table(rabbit_route),
+ SrcName == SrcName1,
+ Match(Binding)]),
+ mnesia:async_dirty(fun qlc:e/1, [Query]).
+
+match_routing_key(SrcName, RoutingKey) ->
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b36ee0be..1b47cdb7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1040,11 +1040,11 @@ test_server_status() ->
%% list bindings
ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
%% misc binding listing APIs
- [_|_] = rabbit_binding:list_for_exchange(
+ [_|_] = rabbit_binding:list_for_source(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
- [_] = rabbit_binding:list_for_queue(
+ [_] = rabbit_binding:list_for_destination(
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
- [_] = rabbit_binding:list_for_exchange_and_queue(
+ [_] = rabbit_binding:list_for_source_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 0b6a15ec..b971a63f 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,9 +39,11 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
- ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
+ binding/0, binding_source/0, binding_destination/0,
+ amqqueue/0, exchange/0,
+ connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
+ ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection_exit/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -113,11 +115,14 @@
host :: rabbit_networking:hostname(),
port :: rabbit_networking:ip_port()}).
+-type(binding_source() :: rabbit_exchange:name()).
+-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
+
-type(binding() ::
- #binding{exchange_name :: rabbit_exchange:name(),
- queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_binding:key(),
- args :: rabbit_framing:amqp_table()}).
+ #binding{source :: rabbit_exchange:name(),
+ destination :: binding_destination(),
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),