From f182b6135b803e0d77887046ad5fb6bb7adddea5 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Sat, 11 Sep 2010 23:46:41 +0100 Subject: Implement exchange-to-exchange bindings --- docs/rabbitmqctl.1.xml | 8 +- include/rabbit.hrl | 4 +- include/rabbit_exchange_type_spec.hrl | 2 +- src/rabbit_amqqueue.erl | 6 +- src/rabbit_binding.erl | 124 ++++++++++++++++-------------- src/rabbit_channel.erl | 55 ++++++++++---- src/rabbit_control.erl | 2 +- src/rabbit_exchange.erl | 139 +++++++++++++++++++--------------- src/rabbit_exchange_type_direct.erl | 5 +- src/rabbit_exchange_type_fanout.erl | 4 +- src/rabbit_exchange_type_headers.erl | 9 +-- src/rabbit_exchange_type_topic.erl | 11 ++- src/rabbit_mnesia.erl | 6 +- src/rabbit_router.erl | 44 +++++------ src/rabbit_tests.erl | 4 +- src/rabbit_types.erl | 12 ++- 16 files changed, 242 insertions(+), 193 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 5179eb25..607838ff 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -900,10 +900,10 @@ escaped as in C. - queue_name - The name of the queue to which the - binding is attached. with non-ASCII characters - escaped as in C. + destination + The type and name of the destination + to which the binding is attached. with non-ASCII + characters escaped as in C. routing_key diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d98..6bf6c87e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, queue_name, args = []}). --record(reverse_binding, {queue_name, key, exchange_name, args = []}). +-record(binding, {exchange_name, key, destination, args = []}). +-record(reverse_binding, {destination, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index cecd666b..014b0736 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -32,7 +32,7 @@ -spec(description/0 :: () -> [{atom(), any()}]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). + -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7116653c..f4256c8f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -252,7 +252,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, rabbit_binding:add(#binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = QueueName, key = RoutingKey, args = []}). @@ -434,7 +434,7 @@ internal_delete1(QueueName) -> %% we want to execute some things, as %% decided by rabbit_exchange, after the %% transaction. - rabbit_binding:remove_for_queue(QueueName). + rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> case @@ -479,7 +479,7 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_queue(QueueName), + Post = rabbit_binding:remove_transient_for_destination(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), Post. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb29580f..ec1816ca 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,11 +33,12 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_exchange/1, list_for_destination/1, + list_for_exchange_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -66,9 +67,11 @@ bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). +-spec(list_for_exchange_and_destination/2 :: + (rabbit_exchange:name(), + rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -78,16 +81,16 @@ -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). -spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). +-spec(remove_transient_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,26 +104,24 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_X, _D) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(X, D) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, are_endpoints_durable(X, D), fun mnesia:write/3), {new, X, B}; [_] -> {existing, X, B} @@ -141,16 +142,14 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of + [_] -> case InnerFun(X, D) of ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), ok = sync_binding( - B, Durable, + B, are_endpoints_durable(X, D), fun mnesia:delete_object/3), Deleted = rabbit_exchange:maybe_auto_delete(X), @@ -175,7 +174,7 @@ remove(Binding, InnerFun) -> list(VHostPath) -> Route = #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), + destination = rabbit_misc:r(VHostPath, '_'), _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, @@ -186,15 +185,15 @@ list_for_exchange(ExchangeName) -> [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DestinationName) -> + Route = #route{binding = #binding{destination = DestinationName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(ExchangeName, QueueName) -> +list_for_exchange_and_destination(ExchangeName, DestinationName) -> Route = #route{binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = DestinationName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +207,10 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(destination, #binding{destination = Destination}) -> Destination; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -241,22 +240,28 @@ remove_for_exchange(ExchangeName) -> _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DestinationName) -> + remove_for_destination(DestinationName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DestinationName) -> + remove_for_destination(DestinationName, + fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- +are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> + A andalso B; +are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> + A andalso B. + binding_action(Binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = Destination, args = Arguments}, Fun) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> + call_with_exchange_and_destination( + ExchangeName, Destination, + fun (X, D) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(X, D, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,15 +275,19 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(Exchange, Queue, Fun) -> +call_with_exchange_and_destination(Exchange, Destination, Fun) -> + DestTable = case Destination#resource.kind of + queue -> rabbit_queue; + exchange -> rabbit_exchange + end, rabbit_misc:execute_mnesia_transaction( fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + mnesia:read({DestTable, Destination})} of + {[X], [D]} -> Fun(X, D); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, destination_not_found}; + {[ ], [ ]} -> {error, exchange_and_destination_not_found} + end end). %% Used with atoms from records; e.g., the type is expected to exist. @@ -293,7 +302,7 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> +remove_for_destination(DestinationName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -304,9 +313,10 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DestinationName, + _ = '_'}}), write)], Grouped = group_bindings_and_auto_delete( lists:keysort(#binding.exchange_name, DeletedBindings), []), @@ -360,19 +370,19 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 174eab40..924f1bbe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -715,6 +715,23 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.bind_ok'{}, NoWait, State); + +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.unbind_ok'{}, false, State); + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, @@ -806,7 +823,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, @@ -814,7 +831,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, @@ -879,42 +896,48 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, +binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, + RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_write_permitted(QueueName, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), + DestinationName = + case DestinationType of + queue -> expand_queue_name_shortcut(DestinationNameBin, State); + exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin) + end, + check_write_permitted(DestinationName, State), + ActualRoutingKey = + expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(#binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = DestinationName, key = ActualRoutingKey, args = Arguments}, - fun (_X, Q) -> + fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} - end + end; + (_X, #exchange{}) -> + ok end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); - {error, queue_not_found} -> - rabbit_misc:not_found(QueueName); - {error, exchange_and_queue_not_found} -> + {error, destination_not_found} -> + rabbit_misc:not_found(DestinationName); + {error, exchange_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index a3b6f369..59d3c889 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,7 +257,7 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + ArgAtoms = default_if_empty(Args, [exchange_name, destination, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 40bee25f..06fd819c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -93,9 +93,9 @@ recover() -> Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( @@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end @@ -225,52 +225,69 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []), + QueuePids = lookup_qpids(QueueNames), + rabbit_router:deliver(QueuePids, Delivery). + +find_queues(Delivery, WorkList, SeenExchanges, QueueNames) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(lists:flatten(QueueNames)); + {{value, X = #exchange{type = Type}}, WorkList1} -> + {NewQueueNames, NewExchangeNames} = + process_alternate( + X, ((type_to_module(Type)):publish(X, Delivery))), + {WorkList2, SeenExchanges1} = + lists:foldl( + fun (XName, {WorkListN, SeenExchangesN} = Acc) -> + case lists:member(XName, SeenExchangesN) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X1} -> + queue:in(X1, WorkListN); + {error, not_found} -> + WorkListN + end, [XName | SeenExchangesN]} + end + end, {WorkList1, SeenExchanges}, NewExchangeNames), + find_queues(Delivery, WorkList2, SeenExchanges1, + [NewQueueNames | QueueNames]) end. -call_with_exchange(Exchange, Fun) -> +process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> + {[], []}; + AName -> + {[], [AName]} + end; +process_alternate(_X, Results) -> + Results. + +lookup_qpids(QueueNames) -> + lists:foldl( + fun (Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], QueueNames). + +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -280,21 +297,21 @@ delete(ExchangeName, IfUnused) -> maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> - case conditional_delete(Exchange) of - {error, in_use} -> not_deleted; - {deleted, Exchange, []} -> auto_deleted +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case rabbit_binding:has_for_exchange(ExchangeName) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = rabbit_binding:remove_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 4f6eb851..bc740d4b 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -50,10 +50,9 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +publish(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), - Delivery). + rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 94798c78..4dad9cdd 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -50,8 +50,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). +publish(#exchange{name = Name}, _Delivery) -> + rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0a59a175..7edc6f7b 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -57,16 +57,13 @@ description() -> {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> + #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_field_table(H) end, - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). + rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). default_headers_match_kind() -> all. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e796acf3..f4a9c904 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -58,13 +58,12 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +publish(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). + rabbit_router:match_bindings(Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a3214888..e9ef61a2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -212,13 +212,15 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{queue_name = queue_name_match(), + #binding{destination = binding_destination_match(), exchange_name = exchange_name_match(), _='_'}. reverse_binding_match() -> - #reverse_binding{queue_name = queue_name_match(), + #reverse_binding{destination = binding_destination_match(), exchange_name = exchange_name_match(), _='_'}. +binding_destination_match() -> + resource_match('_'). exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bfccb0da..7e5a6b84 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,19 +39,19 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(qpids() :: [pid()]). +-type(match_result() :: {[rabbit_amqqueue:name()], [rabbit_exchange:name()]}). -spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). + ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). -spec(match_bindings/2 :: (rabbit_exchange:name(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). + match_result()). -spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). -endif. @@ -84,29 +84,27 @@ deliver(QPids, Delivery) -> %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). + Query = qlc:q([Destination || + #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + destination = Destination}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])). match_routing_key(Name, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', + destination = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], lists:usort(Queues)). + partition_destinations( + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +partition_destinations(Destinations) -> + lists:partition( + fun (DestinationName) -> DestinationName#resource.kind =:= queue end, + Destinations). %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a72656b7..eb08087a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1043,9 +1043,9 @@ test_server_status() -> %% misc binding listing APIs [_|_] = rabbit_binding:list_for_exchange( rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_queue( + [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_queue( + [_] = rabbit_binding:list_for_exchange_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 0b6a15ec..03fbe55a 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,9 +39,10 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, - ok_pid_or_error/0, channel_exit/0, connection_exit/0]). + binding/0, binding_destination/0, amqqueue/0, exchange/0, + connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, + ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -113,9 +114,12 @@ host :: rabbit_networking:hostname(), port :: rabbit_networking:ip_port()}). +-type(binding_destination() :: + rabbit_amqqueue:name() | rabbit_exchange:name()). + -type(binding() :: #binding{exchange_name :: rabbit_exchange:name(), - queue_name :: rabbit_amqqueue:name(), + destination :: binding_destination(), key :: rabbit_binding:key(), args :: rabbit_framing:amqp_table()}). -- cgit v1.2.1 From 9ffaa55446fbe90d145dd23cf48ae640f360bf84 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Sun, 12 Sep 2010 16:49:20 +0100 Subject: exchange_name => source; consistent naming of Src{Name} and Dst{Name}, except in rabbit_exchange when we clearly are talking only about exchanges; improvements to rabbit_control so that list_bindings returns the type of the endpoints as well as the endpoint names --- docs/rabbitmqctl.1.xml | 14 ++-- include/rabbit.hrl | 4 +- src/rabbit_amqqueue.erl | 8 +- src/rabbit_binding.erl | 213 ++++++++++++++++++++++++------------------------ src/rabbit_channel.erl | 12 +-- src/rabbit_control.erl | 23 ++++-- src/rabbit_exchange.erl | 36 ++++---- src/rabbit_mnesia.erl | 6 +- src/rabbit_router.erl | 14 ++-- src/rabbit_tests.erl | 4 +- src/rabbit_types.erl | 8 +- 11 files changed, 177 insertions(+), 165 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 607838ff..ab16a532 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -894,16 +894,18 @@ - exchange_name - The name of the exchange to which the - binding is attached. with non-ASCII characters - escaped as in C. + source + The name and type of the source of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C. destination The type and name of the destination - to which the binding is attached. with non-ASCII - characters escaped as in C. + of messages to which the binding is attached. With + non-ASCII characters escaped as in + C. routing_key diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6bf6c87e..bce9dfa3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, destination, args = []}). --record(reverse_binding, {destination, key, exchange_name, args = []}). +-record(binding, {source, key, destination, args = []}). +-record(reverse_binding, {destination, key, source, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f4256c8f..90804e49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -251,10 +251,10 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{exchange_name = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}). + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0a93d1a4..eff93baf 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,8 +33,8 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_destination/1, - list_for_exchange_and_destination/2]). +-export([list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, @@ -48,9 +48,9 @@ -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('source_not_found' | + 'destination_not_found' | + 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> @@ -66,10 +66,10 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_source/1 :: (rabbit_exchange:name()) -> bindings()). -spec(list_for_destination/1 :: (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). --spec(list_for_exchange_and_destination/2 :: +-spec(list_for_source_and_destination/2 :: (rabbit_exchange:name(), rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). @@ -90,7 +90,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]). +-define(INFO_KEYS, [source, destination, routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -104,34 +104,34 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _D) -> ok end). +add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end). +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, D, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, D) of + case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of [] -> ok = sync_binding( - B, are_endpoints_durable(X, D), + B, are_endpoints_durable(Src, Dst), fun mnesia:write/3), - {new, X, B}; - [_] -> {existing, X, B} + {new, Src, B}; + [_] -> {existing, Src, B} end; {error, _} = E -> E end end) of - {new, X = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(X, B), + {new, Src = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Src, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -142,30 +142,32 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, D, B) -> + fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, D) of - ok -> - ok = sync_binding( - B, are_endpoints_durable(X, D), - fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(X), - {{Deleted, X}, B}; - {error, _} = E -> - E - end + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding( + B, are_endpoints_durable(Src, Dst), + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(Src), + {{Deleted, Src}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, Src = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> ok = Module:delete(X, [B]); - not_deleted -> ok = Module:remove_bindings(X, [B]) + auto_deleted -> ok = Module:delete(Src, [B]); + not_deleted -> ok = Module:remove_bindings(Src, [B]) end, rabbit_event:notify(binding_deleted, info(B)), ok @@ -173,28 +175,28 @@ remove(Binding, InnerFun) -> list(VHostPath) -> Route = #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - destination = rabbit_misc:r(VHostPath, '_'), - _ = '_'}, + source = rabbit_misc:r(VHostPath, exchange), + destination = rabbit_misc:r(VHostPath, '_'), + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(XName) -> - Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +list_for_source(SrcName) -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_destination(DestinationName) -> - Route = #route{binding = #binding{destination = DestinationName, _ = '_'}}, +list_for_destination(DstName) -> + Route = #route{binding = #binding{destination = DstName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_destination(XName, DestinationName) -> - Route = #route{binding = #binding{exchange_name = XName, - destination = DestinationName, - _ = '_'}}, +list_for_source_and_destination(SrcName, DstName) -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -207,10 +209,10 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(destination, #binding{destination = DestinationName}) -> DestinationName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source, #binding{source = SrcName}) -> SrcName; +i(destination, #binding{destination = DstName}) -> DstName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -222,7 +224,7 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_exchange(XName) -> - Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, + Match = #route{binding = #binding{source = XName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure @@ -236,16 +238,15 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = XName, - _ = '_'}}, + #route{binding = #binding{source = XName, + _ = '_'}}, write)]. -remove_for_destination(DestinationName) -> - remove_for_destination(DestinationName, fun delete_forward_routes/1). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_forward_routes/1). -remove_transient_for_destination(DestinationName) -> - remove_for_destination(DestinationName, - fun delete_transient_forward_routes/1). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- @@ -254,14 +255,14 @@ are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> A andalso B. -binding_action(Binding = #binding{exchange_name = XName, - destination = DestinationName, - args = Arguments}, Fun) -> - call_with_exchange_and_destination( - XName, DestinationName, - fun (X, D) -> +binding_action(Binding = #binding{source = SrcName, + destination = DstName, + args = Arguments}, Fun) -> + call_with_source_and_destination( + SrcName, DstName, + fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, D, Binding#binding{args = SortedArgs}) + Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -275,18 +276,18 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_destination(XName, DestinationName, Fun) -> - DestTable = case DestinationName#resource.kind of - queue -> rabbit_queue; - exchange -> rabbit_exchange - end, +call_with_source_and_destination(SrcName, DstName, Fun) -> + DstTable = case DstName#resource.kind of + queue -> rabbit_queue; + exchange -> rabbit_exchange + end, rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({DestTable, DestinationName})} of - {[X], [D]} -> Fun(X, D); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, destination_not_found}; - {[ ], [ ]} -> {error, exchange_and_destination_not_found} + fun () -> case {mnesia:read({rabbit_exchange, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> {error, source_not_found}; + {[_], [] } -> {error, destination_not_found}; + {[], [] } -> {error, source_and_destination_not_found} end end). @@ -302,7 +303,7 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_destination(DestinationName, FwdDeleteFun) -> +remove_for_destination(DstName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -315,18 +316,18 @@ remove_for_destination(DestinationName, FwdDeleteFun) -> rabbit_reverse_route, reverse_route(#route{ binding = #binding{ - destination = DestinationName, + destination = DstName, _ = '_'}}), write)], Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.exchange_name, DeletedBindings), []), + lists:keysort(#binding.source, DeletedBindings), []), fun () -> lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + fun ({{IsDeleted, Src = #exchange{ type = Type }}, Bs}) -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) + auto_deleted -> Module:delete(Src, Bs); + not_deleted -> Module:remove_bindings(Src, Bs) end end, Grouped) end. @@ -337,16 +338,16 @@ remove_for_destination(DestinationName, FwdDeleteFun) -> group_bindings_and_auto_delete([], Acc) -> Acc; group_bindings_and_auto_delete( - [B = #binding{exchange_name = XName} | Bs], Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B], Acc). + [B = #binding{source = SrcName} | Bs], Acc) -> + group_bindings_and_auto_delete(SrcName, Bs, [B], Acc). group_bindings_and_auto_delete( - XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching XName - [X] = mnesia:read({rabbit_exchange, XName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], + SrcName, [B = #binding{source = SrcName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching SrcName + [Src] = mnesia:read({rabbit_exchange, SrcName}), + NewAcc = [{{rabbit_exchange:maybe_auto_delete(Src), Src}, Bindings} | Acc], group_bindings_and_auto_delete(Removed, NewAcc). delete_forward_routes(Route) -> @@ -368,20 +369,20 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}) -> - #binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}. +reverse_binding(#reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}; + +reverse_binding(#binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 924f1bbe..0613422c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -914,10 +914,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(#binding{exchange_name = ExchangeName, - destination = DestinationName, - key = ActualRoutingKey, - args = Arguments}, + case Fun(#binding{source = ExchangeName, + destination = DestinationName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} @@ -925,11 +925,11 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, (_X, #exchange{}) -> ok end) of - {error, exchange_not_found} -> + {error, source_not_found} -> rabbit_misc:not_found(ExchangeName); {error, destination_not_found} -> rabbit_misc:not_found(DestinationName); - {error, exchange_and_destination_not_found} -> + {error, source_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 59d3c889..e7050ef0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,11 +257,16 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, destination, + FormatFun = fun (#resource{name = Name, kind = Kind}) -> + "{" ++ format_info_item(Kind) ++ ": " ++ + format_info_item(Name) ++ "}" + end, + ArgAtoms = default_if_empty(Args, [source, destination, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), - ArgAtoms); + ArgAtoms, [{source, FormatFun}, + {destination, FormatFun}]); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -311,14 +316,18 @@ default_if_empty(List, Default) when is_list(List) -> [list_to_atom(X) || X <- List] end. -display_info_list(Results, InfoItemKeys) when is_list(Results) -> +display_info_list(Results, InfoItemKeys) -> + display_info_list(Results, InfoItemKeys, []). + +display_info_list(Results, InfoItemKeys, FormatFuns) when is_list(Results) -> lists:foreach( - fun (Result) -> display_row( - [format_info_item(proplists:get_value(X, Result)) || - X <- InfoItemKeys]) + fun (Result) -> + display_row( + [(proplists:get_value(X, FormatFuns, fun format_info_item/1))( + proplists:get_value(X, Result)) || X <- InfoItemKeys]) end, Results), ok; -display_info_list(Other, _) -> +display_info_list(Other, _, _) -> Other. display_row(Row) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9481301a..9f85f4cc 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -99,11 +99,11 @@ recover() -> end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( - lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#binding.source, Bs), lists:keysort(#exchange.name, Xs), []). -recover_with_bindings([B = #binding{exchange_name = Name} | Rest], - Xs = [#exchange{name = Name} | _], +recover_with_bindings([B = #binding{source = XName} | Rest], + Xs = [#exchange{name = XName} | _], Bindings) -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> @@ -226,33 +226,33 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> - QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []), - QueuePids = lookup_qpids(QueueNames), - rabbit_router:deliver(QueuePids, Delivery). + QNames = find_qnames(Delivery, queue:from_list([X]), [XName], []), + QPids = lookup_qpids(QNames), + rabbit_router:deliver(QPids, Delivery). -find_queues(Delivery, WorkList, SeenExchanges, QueueNames) -> +find_qnames(Delivery, WorkList, SeenXs, QNames) -> case queue:out(WorkList) of {empty, _WorkList} -> - lists:usort(lists:flatten(QueueNames)); + lists:usort(lists:flatten(QNames)); {{value, X = #exchange{type = Type}}, WorkList1} -> - {NewQueueNames, NewExchangeNames} = + {NewQNames, NewXNames} = process_alternate( X, ((type_to_module(Type)):publish(X, Delivery))), - {WorkList2, SeenExchanges1} = + {WorkList2, SeenXs1} = lists:foldl( - fun (XName, {WorkListN, SeenExchangesN} = Acc) -> - case lists:member(XName, SeenExchangesN) of + fun (XName, {WorkListN, SeenXsN} = Acc) -> + case lists:member(XName, SeenXsN) of true -> Acc; false -> {case lookup(XName) of {ok, X1} -> queue:in(X1, WorkListN); {error, not_found} -> WorkListN - end, [XName | SeenExchangesN]} + end, [XName | SeenXsN]} end - end, {WorkList1, SeenExchanges}, NewExchangeNames), - find_queues(Delivery, WorkList2, SeenExchanges1, - [NewQueueNames | QueueNames]) + end, {WorkList1, SeenXs}, NewXNames), + find_qnames(Delivery, WorkList2, SeenXs1, + [NewQNames | QNames]) end. process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> @@ -265,14 +265,14 @@ process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> process_alternate(_X, Results) -> Results. -lookup_qpids(QueueNames) -> +lookup_qpids(QNames) -> lists:foldl( fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], QueueNames). + end, [], QNames). call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e9ef61a2..33d13978 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -212,12 +212,12 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{destination = binding_destination_match(), - exchange_name = exchange_name_match(), + #binding{source = exchange_name_match(), + destination = binding_destination_match(), _='_'}. reverse_binding_match() -> #reverse_binding{destination = binding_destination_match(), - exchange_name = exchange_name_match(), + source = exchange_name_match(), _='_'}. binding_destination_match() -> resource_match('_'). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 89bafb8a..c5a1c440 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -83,21 +83,21 @@ deliver(QPids, Delivery) -> %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange -match_bindings(XName, Match) -> +match_bindings(SrcName, Match) -> Query = qlc:q([DestinationName || #route{binding = Binding = #binding{ - exchange_name = XName1, + source = SrcName1, destination = DestinationName}} <- mnesia:table(rabbit_route), - XName == XName1, + SrcName == SrcName1, Match(Binding)]), partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])). -match_routing_key(XName, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = XName, +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', - key = RoutingKey, - _ = '_'}}, + key = RoutingKey, + _ = '_'}}, partition_destinations( mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eb08087a..ea46357e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1041,11 +1041,11 @@ test_server_status() -> %% list bindings ok = info_action(list_bindings, rabbit_binding:info_keys(), true), %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_exchange( + [_|_] = rabbit_binding:list_for_source( rabbit_misc:r(<<"/">>, exchange, <<"">>)), [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_destination( + [_] = rabbit_binding:list_for_source_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 03fbe55a..603c45bd 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,10 +118,10 @@ rabbit_amqqueue:name() | rabbit_exchange:name()). -type(binding() :: - #binding{exchange_name :: rabbit_exchange:name(), - destination :: binding_destination(), - key :: rabbit_binding:key(), - args :: rabbit_framing:amqp_table()}). + #binding{source :: rabbit_exchange:name(), + destination :: binding_destination(), + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), -- cgit v1.2.1 From c5126e4b19d2b5da1e8ad291fea29617c8b412ee Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Sun, 12 Sep 2010 17:18:38 +0100 Subject: Avoid traversing lists multiple times --- src/rabbit_exchange.erl | 29 +++++++++++++++-------------- src/rabbit_router.erl | 12 +++--------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9f85f4cc..14cd5cf4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -233,14 +233,15 @@ publish(X = #exchange{name = XName}, Delivery) -> find_qnames(Delivery, WorkList, SeenXs, QNames) -> case queue:out(WorkList) of {empty, _WorkList} -> - lists:usort(lists:flatten(QNames)); + lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> - {NewQNames, NewXNames} = + DstNames = process_alternate( X, ((type_to_module(Type)):publish(X, Delivery))), - {WorkList2, SeenXs1} = + {WorkList2, SeenXs1, QNames1} = lists:foldl( - fun (XName, {WorkListN, SeenXsN} = Acc) -> + fun (XName = #resource{kind = exchange}, + {WorkListN, SeenXsN, QNamesN} = Acc) -> case lists:member(XName, SeenXsN) of true -> Acc; false -> {case lookup(XName) of @@ -248,19 +249,19 @@ find_qnames(Delivery, WorkList, SeenXs, QNames) -> queue:in(X1, WorkListN); {error, not_found} -> WorkListN - end, [XName | SeenXsN]} - end - end, {WorkList1, SeenXs}, NewXNames), - find_qnames(Delivery, WorkList2, SeenXs1, - [NewQNames | QNames]) + end, [XName | SeenXsN], QNamesN} + end; + (QName = #resource{kind = queue}, + {WorkListN, SeenXsN, QNamesN})-> + {WorkListN, SeenXsN, [QName | QNamesN]} + end, {WorkList1, SeenXs, QNames}, DstNames), + find_qnames(Delivery, WorkList2, SeenXs1, QNames1) end. -process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> +process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of - undefined -> - {[], []}; - AName -> - {[], [AName]} + undefined -> []; + AName -> [AName] end; process_alternate(_X, Results) -> Results. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index c5a1c440..0fccd61b 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -43,7 +43,7 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(match_result() :: {[rabbit_amqqueue:name()], [rabbit_exchange:name()]}). +-type(match_result() :: [rabbit_amqqueue:name() | rabbit_exchange:name()]). -spec(deliver/2 :: ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). @@ -91,20 +91,14 @@ match_bindings(SrcName, Match) -> mnesia:table(rabbit_route), SrcName == SrcName1, Match(Binding)]), - partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])). + mnesia:async_dirty(fun qlc:e/1, [Query]). match_routing_key(SrcName, RoutingKey) -> MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, - partition_destinations( - mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -partition_destinations(Destinations) -> - lists:partition( - fun (DestinationName) -> DestinationName#resource.kind =:= queue end, - Destinations). + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). %%-------------------------------------------------------------------- -- cgit v1.2.1 From 05f8a9148f1b7bb701fde463cb64a3fc8c93f08b Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Sun, 12 Sep 2010 17:36:10 +0100 Subject: exchange.unbind has nowait --- src/rabbit_channel.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0613422c..08b16f69 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -727,10 +727,11 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, + nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, - Arguments, #'exchange.unbind_ok'{}, false, State); + Arguments, #'exchange.unbind_ok'{}, NoWait, State); handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, -- cgit v1.2.1 From 64612f412607729a96604dc347ef422302ff1442 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Sun, 12 Sep 2010 19:12:18 +0100 Subject: Do the simplest thing and have additional fields to indicate the type of the binding endpoints --- docs/rabbitmqctl.1.xml | 23 ++++++++++++++++++----- src/rabbit_binding.erl | 14 +++++++++----- src/rabbit_control.erl | 22 +++++++--------------- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ab16a532..940bf6a8 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -894,16 +894,29 @@ - source - The name and type of the source of + source_name + The name of the source of messages to + which the binding is attached. With non-ASCII + characters escaped as in C. + + + source_kind + The kind of the source of messages to + which the binding is attached. Currently always + queue. With non-ASCII characters escaped as in + C. + + + destination_name + The name of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C. - destination - The type and name of the destination - of messages to which the binding is attached. With + destination_kind + The kind of the destination of + messages to which the binding is attached. With non-ASCII characters escaped as in C. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index eff93baf..b12927e8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -90,7 +90,9 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [source, destination, routing_key, arguments]). +-define(INFO_KEYS, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -209,10 +211,12 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(source, #binding{source = SrcName}) -> SrcName; -i(destination, #binding{destination = DstName}) -> DstName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; +i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; +i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; +i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e7050ef0..2d62b999 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,16 +257,12 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - FormatFun = fun (#resource{name = Name, kind = Kind}) -> - "{" ++ format_info_item(Kind) ++ ": " ++ - format_info_item(Name) ++ "}" - end, - ArgAtoms = default_if_empty(Args, [source, destination, + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), - ArgAtoms, [{source, FormatFun}, - {destination, FormatFun}]); + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -316,18 +312,14 @@ default_if_empty(List, Default) when is_list(List) -> [list_to_atom(X) || X <- List] end. -display_info_list(Results, InfoItemKeys) -> - display_info_list(Results, InfoItemKeys, []). - -display_info_list(Results, InfoItemKeys, FormatFuns) when is_list(Results) -> +display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> - display_row( - [(proplists:get_value(X, FormatFuns, fun format_info_item/1))( - proplists:get_value(X, Result)) || X <- InfoItemKeys]) + display_row([format_info_item(proplists:get_value(X, Result)) + || X <- InfoItemKeys]) end, Results), ok; -display_info_list(Other, _, _) -> +display_info_list(Other, _) -> Other. display_row(Row) -> -- cgit v1.2.1 From 483d59916f24f3f35a6ec8663e5fc9c8248e06ee Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 21:43:51 +0100 Subject: tweak specs --- src/rabbit_binding.erl | 12 ++++++------ src/rabbit_router.erl | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index b12927e8..f7ec3b27 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -67,11 +67,11 @@ bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_source/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_destination/1 :: - (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). +-spec(list_for_destination/1 :: (rabbit_types:binding_destination()) -> + bindings()). -spec(list_for_source_and_destination/2 :: - (rabbit_exchange:name(), - rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). + (rabbit_exchange:name(), rabbit_types:binding_destination()) -> + bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -82,9 +82,9 @@ -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). -spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_destination/1 :: - (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). + (rabbit_types:binding_destination()) -> fun (() -> any())). -spec(remove_transient_for_destination/1 :: - (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). + (rabbit_types:binding_destination()) -> fun (() -> any())). -endif. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0fccd61b..a2fbd3ca 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -43,10 +43,11 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(match_result() :: [rabbit_amqqueue:name() | rabbit_exchange:name()]). +-type(qpids() :: [pid()]). +-type(match_result() :: [rabbit_types:binding_destination()]). -spec(deliver/2 :: - ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). + (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_exchange:name(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). -- cgit v1.2.1 From da43ca9e9e0b3881b1ba11137b2dcb157c19a789 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 21:53:18 +0100 Subject: minor refactor --- src/rabbit_binding.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index f7ec3b27..71b9960b 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -123,7 +123,7 @@ add(Binding, InnerFun) -> ok -> case mnesia:read({rabbit_route, B}) of [] -> ok = sync_binding( - B, are_endpoints_durable(Src, Dst), + B, all_durable([Src, Dst]), fun mnesia:write/3), {new, Src, B}; [_] -> {existing, Src, B} @@ -153,7 +153,7 @@ remove(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> ok = sync_binding( - B, are_endpoints_durable(Src, Dst), + B, all_durable([Src, Dst]), fun mnesia:delete_object/3), Deleted = rabbit_exchange:maybe_auto_delete(Src), @@ -254,10 +254,10 @@ remove_transient_for_destination(DstName) -> %%---------------------------------------------------------------------------- -are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> - A andalso B; -are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> - A andalso B. +all_durable(Resources) -> + lists:all(fun (#exchange{durable = D}) -> D; + (#amqqueue{durable = D}) -> D + end, Resources). binding_action(Binding = #binding{source = SrcName, destination = DstName, -- cgit v1.2.1 From cc86e7c2354a826aca17b961faf2696d26fedaad Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 21:58:09 +0100 Subject: generalise binding:list in case we ever come up with other source types --- src/rabbit_binding.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 71b9960b..ff3fa5ac 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -176,10 +176,10 @@ remove(Binding, InnerFun) -> end. list(VHostPath) -> - Route = #route{binding = #binding{ - source = rabbit_misc:r(VHostPath, exchange), - destination = rabbit_misc:r(VHostPath, '_'), - _ = '_'}, + VHostResource = rabbit_misc:r(VHostPath, '_'), + Route = #route{binding = #binding{source = VHostResource, + destination = VHostResource, + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -- cgit v1.2.1 From aab0e35461eca14e023f822edae07a07b15b729e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 22:04:11 +0100 Subject: rename a few funs to make their purpose clearer --- src/rabbit_binding.erl | 14 +++++++------- src/rabbit_exchange.erl | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ff3fa5ac..555b17cc 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -37,7 +37,7 @@ list_for_source_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx --export([has_for_exchange/1, remove_for_exchange/1, +-export([has_for_source/1, remove_for_source/1, remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -79,8 +79,8 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). --spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(has_for_source/1 :: (rabbit_exchange:name()) -> boolean()). +-spec(remove_for_source/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_destination/1 :: (rabbit_types:binding_destination()) -> fun (() -> any())). -spec(remove_transient_for_destination/1 :: @@ -227,14 +227,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(XName) -> - Match = #route{binding = #binding{source = XName, _ = '_'}}, +has_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(XName) -> +remove_for_source(SrcName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -242,7 +242,7 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{source = XName, + #route{binding = #binding{source = SrcName, _ = '_'}}, write)]. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 14cd5cf4..8497aa7b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -305,13 +305,13 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> end. conditional_delete(X = #exchange{name = XName}) -> - case rabbit_binding:has_for_exchange(XName) of + case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); true -> {error, in_use} end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_exchange(XName), + Bindings = rabbit_binding:remove_for_source(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), rabbit_event:notify(exchange_deleted, [{name, XName}]), -- cgit v1.2.1 From 9ae26329a9795b74a4121fe5d64557ebbf136bac Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 22:12:35 +0100 Subject: correct a spec this was in fact wrong to start with --- src/rabbit_binding.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 555b17cc..78403b81 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -53,7 +53,8 @@ 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> + fun((rabbit_types:exchange(), + rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). -- cgit v1.2.1 From 1c20328505cee4767526c2e218f46980fe607f77 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 27 Sep 2010 23:20:47 +0100 Subject: generalise binding code somewhat to mostly abstract away the source kind - not completely, since that would require substantial changes, but enough to make the code more pleasing to the eye --- src/rabbit_binding.erl | 24 +++++++++++++----------- src/rabbit_router.erl | 8 ++++---- src/rabbit_types.erl | 7 ++++--- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 78403b81..0199a354 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -67,11 +67,12 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_source/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_destination/1 :: (rabbit_types:binding_destination()) -> - bindings()). +-spec(list_for_source/1 :: + (rabbit_types:binding_source()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_types:binding_destination()) -> bindings()). -spec(list_for_source_and_destination/2 :: - (rabbit_exchange:name(), rabbit_types:binding_destination()) -> + (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). @@ -80,8 +81,8 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). --spec(has_for_source/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_source/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). +-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: (rabbit_types:binding_destination()) -> fun (() -> any())). -spec(remove_transient_for_destination/1 :: @@ -282,12 +283,10 @@ sync_binding(Binding, Durable, Fun) -> ok. call_with_source_and_destination(SrcName, DstName, Fun) -> - DstTable = case DstName#resource.kind of - queue -> rabbit_queue; - exchange -> rabbit_exchange - end, + SrcTable = table_for_resource(SrcName), + DstTable = table_for_resource(DstName), rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, SrcName}), + fun () -> case {mnesia:read({SrcTable, SrcName}), mnesia:read({DstTable, DstName})} of {[Src], [Dst]} -> Fun(Src, Dst); {[], [_] } -> {error, source_not_found}; @@ -296,6 +295,9 @@ call_with_source_and_destination(SrcName, DstName, Fun) -> end end). +table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; +table_for_resource(#resource{kind = queue}) -> rabbit_queue. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index a2fbd3ca..6f91633e 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -48,11 +48,11 @@ -spec(deliver/2 :: (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(match_bindings/2 :: (rabbit_exchange:name(), +-spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). --spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - match_result()). +-spec(match_routing_key/2 :: (rabbit_types:binding_source(), + routing_key() | '_') -> match_result()). -endif. @@ -83,7 +83,7 @@ deliver(QPids, Delivery) -> {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange +%% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> Query = qlc:q([DestinationName || #route{binding = Binding = #binding{ diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 603c45bd..b971a63f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,7 +39,8 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, - binding/0, binding_destination/0, amqqueue/0, exchange/0, + binding/0, binding_source/0, binding_destination/0, + amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). @@ -114,8 +115,8 @@ host :: rabbit_networking:hostname(), port :: rabbit_networking:ip_port()}). --type(binding_destination() :: - rabbit_amqqueue:name() | rabbit_exchange:name()). +-type(binding_source() :: rabbit_exchange:name()). +-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()). -type(binding() :: #binding{source :: rabbit_exchange:name(), -- cgit v1.2.1 From 68001e01905c771d4f6ebb712804e11be02a1c3a Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 30 Sep 2010 15:24:24 +0100 Subject: Exchange auto-deletion works even with e2e. Diamond property works. Correct behaviour if explicitly deleted entity is queue or exchange --- src/rabbit_binding.erl | 25 +++++++++++++------------ src/rabbit_exchange.erl | 14 ++++++++------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0199a354..a0389a52 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -167,12 +167,8 @@ remove(Binding, InnerFun) -> end) of {error, _} = Err -> Err; - {{IsDeleted, Src = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> ok = Module:delete(Src, [B]); - not_deleted -> ok = Module:remove_bindings(Src, [B]) - end, + {{IsDeleted, Src}, B} -> + ok = post_binding_removal(IsDeleted, Src, [B]), rabbit_event:notify(binding_deleted, info(B)), ok end. @@ -330,15 +326,20 @@ remove_for_destination(DstName, FwdDeleteFun) -> lists:keysort(#binding.source, DeletedBindings), []), fun () -> lists:foreach( - fun ({{IsDeleted, Src = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(Src, Bs); - not_deleted -> Module:remove_bindings(Src, Bs) - end + fun ({{IsDeleted, Src}, Bs}) -> + ok = post_binding_removal(IsDeleted, Src, Bs) end, Grouped) end. +post_binding_removal(IsDeleted, Src = #exchange{ type = Type }, Bs) -> + Module = type_to_module(Type), + case IsDeleted of + {auto_deleted, Fun} -> ok = Module:delete(Src, Bs), + Fun(), + ok; + not_deleted -> ok = Module:remove_bindings(Src, Bs) + end. + %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8497aa7b..e3ec1dcc 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -82,8 +82,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> - 'not_deleted' | 'auto_deleted'). +-spec(maybe_auto_delete/1:: + (rabbit_types:exchange()) + -> 'not_deleted' | {'auto_deleted', fun (() -> any())}). -endif. @@ -289,8 +290,9 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X = #exchange{type = Type}, Bs} -> + {deleted, X = #exchange{type = Type}, Bs, Fun1} -> (type_to_module(Type)):delete(X, Bs), + Fun1(), ok; Error = {error, _InUseOrNotFound} -> Error @@ -300,8 +302,8 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, []} -> auto_deleted + {error, in_use} -> not_deleted; + {deleted, X, [], Fun} -> {auto_deleted, Fun} end. conditional_delete(X = #exchange{name = XName}) -> @@ -315,4 +317,4 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings}. + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -- cgit v1.2.1 From 7816a942369260675a61066713f8e76bcb6d0432 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 30 Sep 2010 17:09:37 +0100 Subject: Change the SeenXs to a sets rather than a list --- src/rabbit_exchange.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e3ec1dcc..62c14179 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -227,7 +227,8 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> - QNames = find_qnames(Delivery, queue:from_list([X]), [XName], []), + QNames = find_qnames(Delivery, queue:from_list([X]), + sets:from_list([XName]), []), QPids = lookup_qpids(QNames), rabbit_router:deliver(QPids, Delivery). @@ -243,14 +244,16 @@ find_qnames(Delivery, WorkList, SeenXs, QNames) -> lists:foldl( fun (XName = #resource{kind = exchange}, {WorkListN, SeenXsN, QNamesN} = Acc) -> - case lists:member(XName, SeenXsN) of + case sets:is_element(XName, SeenXsN) of true -> Acc; false -> {case lookup(XName) of {ok, X1} -> queue:in(X1, WorkListN); {error, not_found} -> WorkListN - end, [XName | SeenXsN], QNamesN} + end, + sets:add_element(XName, SeenXsN), + QNamesN} end; (QName = #resource{kind = queue}, {WorkListN, SeenXsN, QNamesN})-> -- cgit v1.2.1 From 41046e8502fae1638f49172b3e756b3b5793d5e2 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 30 Sep 2010 17:22:47 +0100 Subject: Move lookup_qpids into router and associated changes to deliver API --- src/rabbit_exchange.erl | 12 +----------- src/rabbit_router.erl | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 62c14179..db5a46bd 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -229,8 +229,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> QNames = find_qnames(Delivery, queue:from_list([X]), sets:from_list([XName]), []), - QPids = lookup_qpids(QNames), - rabbit_router:deliver(QPids, Delivery). + rabbit_router:deliver(QNames, Delivery). find_qnames(Delivery, WorkList, SeenXs, QNames) -> case queue:out(WorkList) of @@ -270,15 +269,6 @@ process_alternate(#exchange{name = XName, arguments = Args}, []) -> process_alternate(_X, Results) -> Results. -lookup_qpids(QNames) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], QNames). - call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 6f91633e..ebe28162 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -46,8 +46,8 @@ -type(qpids() :: [pid()]). -type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). @@ -58,8 +58,8 @@ %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -67,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QNames, Delivery) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,6 +84,15 @@ deliver(QPids, Delivery) -> check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). +lookup_qpids(QNames) -> + lists:foldl( + fun (Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], QNames). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> -- cgit v1.2.1 From 985972a1d3fba56a9846a1674cfdbe5f7980b008 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 5 Oct 2010 14:26:38 +0100 Subject: Revert unnecessary reformatting --- src/rabbit_control.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index a9c798fc..58dedf68 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -314,9 +314,9 @@ default_if_empty(List, Default) when is_list(List) -> display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( - fun (Result) -> - display_row([format_info_item(proplists:get_value(X, Result)) - || X <- InfoItemKeys]) + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) end, Results), ok; display_info_list(Other, _) -> -- cgit v1.2.1 From 1d75b6173d1e5db5eb0e9027283ed60d819bfa95 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 7 Oct 2010 20:46:21 +0100 Subject: refactor --- src/rabbit_exchange.erl | 49 ++++++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index db5a46bd..21b3e780 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -227,38 +227,20 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> - QNames = find_qnames(Delivery, queue:from_list([X]), - sets:from_list([XName]), []), - rabbit_router:deliver(QNames, Delivery). + rabbit_router:deliver( + route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + Delivery). -find_qnames(Delivery, WorkList, SeenXs, QNames) -> +route(Delivery, {WorkList, SeenXs, QNames}) -> case queue:out(WorkList) of {empty, _WorkList} -> lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> - DstNames = - process_alternate( - X, ((type_to_module(Type)):publish(X, Delivery))), - {WorkList2, SeenXs1, QNames1} = - lists:foldl( - fun (XName = #resource{kind = exchange}, - {WorkListN, SeenXsN, QNamesN} = Acc) -> - case sets:is_element(XName, SeenXsN) of - true -> Acc; - false -> {case lookup(XName) of - {ok, X1} -> - queue:in(X1, WorkListN); - {error, not_found} -> - WorkListN - end, - sets:add_element(XName, SeenXsN), - QNamesN} - end; - (QName = #resource{kind = queue}, - {WorkListN, SeenXsN, QNamesN})-> - {WorkListN, SeenXsN, [QName | QNamesN]} - end, {WorkList1, SeenXs, QNames}, DstNames), - find_qnames(Delivery, WorkList2, SeenXs1, QNames1) + DstNames = process_alternate( + X, ((type_to_module(Type)):publish(X, Delivery))), + route(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. process_alternate(#exchange{name = XName, arguments = Args}, []) -> @@ -269,6 +251,19 @@ process_alternate(#exchange{name = XName, arguments = Args}, []) -> process_alternate(_X, Results) -> Results. +process_route(#resource{kind = exchange} = XName, + {WorkList, SeenXs, QNames} = Acc) -> + case sets:is_element(XName, SeenXs) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, sets:add_element(XName, SeenXs), QNames} + end; +process_route(#resource{kind = queue} = QName, + {WorkList, SeenXs, QNames}) -> + {WorkList, SeenXs, [QName | QNames]}. + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of -- cgit v1.2.1 From 9f1bb370e9d46cf8b02bdf60f43700b4c1a80bed Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 8 Oct 2010 15:48:35 +0100 Subject: XT:publish => XT:route --- include/rabbit_exchange_type_spec.hrl | 4 ++-- src/rabbit_exchange.erl | 2 +- src/rabbit_exchange_type.erl | 2 +- src/rabbit_exchange_type_direct.erl | 6 +++--- src/rabbit_exchange_type_fanout.erl | 4 ++-- src/rabbit_exchange_type_headers.erl | 8 ++++---- src/rabbit_exchange_type_topic.erl | 4 ++-- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 014b0736..ae326a87 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -31,8 +31,8 @@ -ifdef(use_specs). -spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> rabbit_router:match_result()). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 21b3e780..9581c229 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -237,7 +237,7 @@ route(Delivery, {WorkList, SeenXs, QNames}) -> lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> DstNames = process_alternate( - X, ((type_to_module(Type)):publish(X, Delivery))), + X, ((type_to_module(Type)):route(X, Delivery))), route(Delivery, lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, DstNames)) diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 85760edc..742944dc 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -36,7 +36,7 @@ behaviour_info(callbacks) -> [ {description, 0}, - {publish, 2}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {validate, 1}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index bc740d4b..d934a497 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,8 +50,8 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> +route(#exchange{name = Name}, + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 4dad9cdd..77ca9686 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,7 +50,7 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, _Delivery) -> +route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 7edc6f7b..ec9e7ba4 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -56,8 +56,8 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, - #delivery{message = #basic_message{content = Content}}) -> +route(#exchange{name = Name}, + #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_field_table(H) @@ -76,7 +76,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that publish/1 and +%% (rabbit_misc:sort_field_table) that route/1 and %% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index f4a9c904..d3ecdd4d 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -58,7 +58,7 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, +route(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> rabbit_router:match_bindings(Name, fun (#binding{key = BindingKey}) -> -- cgit v1.2.1 From 24c8e03cdf4e8cf1455d5d098515bd61eb2c89ba Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 8 Oct 2010 15:54:44 +0100 Subject: Gratuitous refactoring --- src/rabbit_binding.erl | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index a0389a52..61a3b7f5 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,14 +331,12 @@ remove_for_destination(DstName, FwdDeleteFun) -> end, Grouped) end. -post_binding_removal(IsDeleted, Src = #exchange{ type = Type }, Bs) -> - Module = type_to_module(Type), - case IsDeleted of - {auto_deleted, Fun} -> ok = Module:delete(Src, Bs), - Fun(), - ok; - not_deleted -> ok = Module:remove_bindings(Src, Bs) - end. +post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) -> + ok = type_to_module(Type):remove_bindings(Src, Bs); +post_binding_removal({auto_deleted, Fun}, Src = #exchange{ type = Type }, Bs) -> + ok = type_to_module(Type):delete(Src, Bs), + Fun(), + ok. %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to -- cgit v1.2.1 From 325f0c2291e25bfdda468954ac9f77894a851788 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 8 Oct 2010 16:09:43 +0100 Subject: remove R14ism --- src/rabbit_binding.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 61a3b7f5..eabb422e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -332,9 +332,9 @@ remove_for_destination(DstName, FwdDeleteFun) -> end. post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) -> - ok = type_to_module(Type):remove_bindings(Src, Bs); + ok = (type_to_module(Type)):remove_bindings(Src, Bs); post_binding_removal({auto_deleted, Fun}, Src = #exchange{ type = Type }, Bs) -> - ok = type_to_module(Type):delete(Src, Bs), + ok = (type_to_module(Type)):delete(Src, Bs), Fun(), ok. -- cgit v1.2.1 From 17e1c2b4e8966eefb9dc6195dfa64d97f495a36c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 8 Oct 2010 18:56:36 +0100 Subject: Minimised the number of calls to the XTs on deletion --- src/rabbit_amqqueue.erl | 1 - src/rabbit_binding.erl | 114 ++++++++++++++++++++++++++++++++++++------------ src/rabbit_exchange.erl | 15 ++++--- 3 files changed, 93 insertions(+), 37 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7dfe41ba..3c767eef 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -509,4 +509,3 @@ delegate_call(Pid, Msg, Timeout) -> delegate_cast(Pid, Msg) -> delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index eabb422e..2e9c580a 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -36,9 +36,11 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([post_binding_removal_fun/1]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1]). + remove_for_destination/1, remove_transient_for_destination/1, + remove_for_destination_inner/1]). %%---------------------------------------------------------------------------- @@ -84,9 +86,12 @@ -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> any())). + (rabbit_types:binding_destination()) -> fun (() -> 'ok')). -spec(remove_transient_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> any())). + (rabbit_types:binding_destination()) -> fun (() -> 'ok')). +-spec(remove_for_destination_inner/1 :: + (rabbit_types:binding_destination()) -> dict:dictionary()). +-spec(post_binding_removal_fun/1 :: (dict:dictionary()) -> fun (() -> 'ok')). -endif. @@ -157,9 +162,9 @@ remove(Binding, InnerFun) -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(Src), - {{Deleted, Src}, B}; + {ok, merge_maybe_auto_delete( + Binding#binding.source, [B], + dict:new())}; {error, _} = E -> E end @@ -167,10 +172,8 @@ remove(Binding, InnerFun) -> end) of {error, _} = Err -> Err; - {{IsDeleted, Src}, B} -> - ok = post_binding_removal(IsDeleted, Src, [B]), - rabbit_event:notify(binding_deleted, info(B)), - ok + {ok, Grouped} -> + ok = (post_binding_removal_fun(Grouped))() end. list(VHostPath) -> @@ -247,6 +250,9 @@ remove_for_source(SrcName) -> remove_for_destination(DstName) -> remove_for_destination(DstName, fun delete_forward_routes/1). +remove_for_destination_inner(DstName) -> + remove_for_destination_inner(DstName, fun delete_forward_routes/1). + remove_transient_for_destination(DstName) -> remove_for_destination(DstName, fun delete_transient_forward_routes/1). @@ -307,6 +313,10 @@ continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). remove_for_destination(DstName, FwdDeleteFun) -> + post_binding_removal_fun( + remove_for_destination_inner(DstName, FwdDeleteFun)). + +remove_for_destination_inner(DstName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -322,39 +332,85 @@ remove_for_destination(DstName, FwdDeleteFun) -> destination = DstName, _ = '_'}}), write)], - Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.source, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, Src}, Bs}) -> - ok = post_binding_removal(IsDeleted, Src, Bs) - end, Grouped) + group_bindings_and_auto_delete( + lists:keysort(#binding.source, DeletedBindings), dict:new()). + +post_binding_removal_fun(Grouped) -> + fun () -> dict:fold( + fun (_SrcName, {Src, IsDeleted, Bs}, ok) -> + post_binding_removal(IsDeleted, Src, + lists:usort(lists:flatten(Bs))) + end, ok, Grouped) end. post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) -> ok = (type_to_module(Type)):remove_bindings(Src, Bs); -post_binding_removal({auto_deleted, Fun}, Src = #exchange{ type = Type }, Bs) -> - ok = (type_to_module(Type)):delete(Src, Bs), - Fun(), - ok. +post_binding_removal(deleted, Src = #exchange{ type = Type }, Bs) -> + ok = (type_to_module(Type)):delete(Src, Bs). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. group_bindings_and_auto_delete([], Acc) -> Acc; -group_bindings_and_auto_delete( - [B = #binding{source = SrcName} | Bs], Acc) -> +group_bindings_and_auto_delete([B = #binding{source = SrcName} | Bs], Acc) -> group_bindings_and_auto_delete(SrcName, Bs, [B], Acc). -group_bindings_and_auto_delete( - SrcName, [B = #binding{source = SrcName} | Bs], Bindings, Acc) -> +group_bindings_and_auto_delete(SrcName, [B = #binding{source = SrcName} | Bs], + Bindings, Acc) -> group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc); group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching SrcName - [Src] = mnesia:read({rabbit_exchange, SrcName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(Src), Src}, Bindings} | Acc], - group_bindings_and_auto_delete(Removed, NewAcc). + %% Either Removed is [], or its head has a non-matching SrcName. + group_bindings_and_auto_delete( + Removed, merge_maybe_auto_delete(SrcName, Bindings, Acc)). + +%% Once a binding source is deleted, we'll never revisit it, so we +%% should never find that the existing entry is {deleted, Bindings}. +merge_maybe_auto_delete(SrcName, Bindings, Acc) -> + UpdateFun = fun (NewResult, Src) -> + dict:update( + SrcName, + fun ({Src1, Result, Bindings1}) -> + {not_undef(Src, Src1), + boolean_or(deleted, Result, NewResult), + [Bindings | Bindings1]} + end, {Src, NewResult, Bindings}, Acc) + end, + case mnesia:read({rabbit_exchange, SrcName}) of + [] -> UpdateFun(deleted, undefined); + [Src] -> case rabbit_exchange:maybe_auto_delete(Src) of + not_deleted -> + UpdateFun(not_deleted, Src); + {auto_deleted, Acc1} -> + merge_binding_dicts(UpdateFun(deleted, Src), Acc1) + end + end. + +%% Should never find that both have deleted the exchange. +merge_binding_dicts(LHS, RHS) -> + dict:merge( + fun (_SrcName, + {SrcA, IsDeletedA, BindingsL}, {SrcB, IsDeletedB, BindingsR}) -> + {not_undef(SrcA, SrcB), + boolean_or(deleted, IsDeletedA, IsDeletedB), + [BindingsL | BindingsR]} + end, LHS, RHS). + +not_undef(undefined, undefined) -> + undefined; +not_undef(undefined, N) -> + N; +not_undef(N, undefined) -> + N; +not_undef(N, N) -> + N. + +boolean_or(True, True, _Any) -> + True; +boolean_or(True, _Any, True) -> + True; +boolean_or(_True, Any, Any) -> + Any. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9581c229..50f49d0c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -84,7 +84,7 @@ rabbit_types:error('in_use')). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) - -> 'not_deleted' | {'auto_deleted', fun (() -> any())}). + -> 'not_deleted' | {'auto_deleted', dict:dictionary()}). -endif. @@ -278,10 +278,11 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X = #exchange{type = Type}, Bs, Fun1} -> - (type_to_module(Type)):delete(X, Bs), - Fun1(), - ok; + {deleted, X, Bs, Grouped} -> + Grouped1 = dict:update(XName, fun ({_X, _MaybeDeleted, Bs1}) -> + {X, deleted, [Bs | Bs1]} + end, {X, deleted, Bs}, Grouped), + ok = (rabbit_binding:post_binding_removal_fun(Grouped1))(); Error = {error, _InUseOrNotFound} -> Error end. @@ -291,7 +292,7 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of {error, in_use} -> not_deleted; - {deleted, X, [], Fun} -> {auto_deleted, Fun} + {deleted, X, [], Res} -> {auto_deleted, Res} end. conditional_delete(X = #exchange{name = XName}) -> @@ -305,4 +306,4 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + {deleted, X, Bindings, rabbit_binding:remove_for_destination_inner(XName)}. -- cgit v1.2.1 From 688ab242b766dca9e67ab08e209e1b52355ff946 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 11 Oct 2010 15:54:06 +0100 Subject: Reworked binding / exchange autodeletion with better abstracted and cleaner API --- src/rabbit_amqqueue.erl | 28 ++++---- src/rabbit_binding.erl | 184 ++++++++++++++++++++++++------------------------ src/rabbit_exchange.erl | 20 +++--- 3 files changed, 115 insertions(+), 117 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3c767eef..d047f7ca 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -450,8 +450,7 @@ internal_delete(QueueName) -> end end) of {error, _} = Err -> Err; - PostHook -> PostHook(), - ok + Deletions -> ok = rabbit_binding:process_deletions(Deletions) end. maybe_run_queue_via_backing_queue(QPid, Fun) -> @@ -470,20 +469,23 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - [Hook() || - Hook <- rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)], - ok. + Deletions = + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)), + ok = rabbit_binding:process_deletions(Deletions). delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_destination(QueueName), + Deletions = rabbit_binding:remove_transient_for_destination(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), - Post. + Deletions. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 2e9c580a..a40671b8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -35,18 +35,18 @@ -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). +-export([new_deletions/0, combine_deletions/2, add_deletion/3, + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([post_binding_removal_fun/1]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1, - remove_for_destination_inner/1]). + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([key/0]). +-export_type([key/0, deletions/0]). -type(key() :: binary()). @@ -60,6 +60,8 @@ rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-opaque(deletions() :: dict:dictionary()). + -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). @@ -86,12 +88,16 @@ -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> 'ok')). + (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> 'ok')). --spec(remove_for_destination_inner/1 :: - (rabbit_types:binding_destination()) -> dict:dictionary()). --spec(post_binding_removal_fun/1 :: (dict:dictionary()) -> fun (() -> 'ok')). + (rabbit_types:binding_destination()) -> deletions()). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). +-spec(add_deletion/3 :: (rabbit_types:binding_source(), + {'undefined' | rabbit_exchange:name(), + 'deleted' | 'not_deleted', + deletions()}, deletions()) -> deletions()). +-spec(new_deletions/0 :: () -> deletions()). -endif. @@ -162,9 +168,9 @@ remove(Binding, InnerFun) -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:delete_object/3), - {ok, merge_maybe_auto_delete( - Binding#binding.source, [B], - dict:new())}; + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; {error, _} = E -> E end @@ -172,8 +178,8 @@ remove(Binding, InnerFun) -> end) of {error, _} = Err -> Err; - {ok, Grouped} -> - ok = (post_binding_removal_fun(Grouped))() + {ok, Deletions} -> + ok = process_deletions(Deletions) end. list(VHostPath) -> @@ -250,9 +256,6 @@ remove_for_source(SrcName) -> remove_for_destination(DstName) -> remove_for_destination(DstName, fun delete_forward_routes/1). -remove_for_destination_inner(DstName) -> - remove_for_destination_inner(DstName, fun delete_forward_routes/1). - remove_transient_for_destination(DstName) -> remove_for_destination(DstName, fun delete_transient_forward_routes/1). @@ -313,11 +316,7 @@ continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). remove_for_destination(DstName, FwdDeleteFun) -> - post_binding_removal_fun( - remove_for_destination_inner(DstName, FwdDeleteFun)). - -remove_for_destination_inner(DstName, FwdDeleteFun) -> - DeletedBindings = + Bindings = [begin Route = reverse_route(ReverseRoute), ok = FwdDeleteFun(Route), @@ -332,86 +331,38 @@ remove_for_destination_inner(DstName, FwdDeleteFun) -> destination = DstName, _ = '_'}}), write)], - group_bindings_and_auto_delete( - lists:keysort(#binding.source, DeletedBindings), dict:new()). - -post_binding_removal_fun(Grouped) -> - fun () -> dict:fold( - fun (_SrcName, {Src, IsDeleted, Bs}, ok) -> - post_binding_removal(IsDeleted, Src, - lists:usort(lists:flatten(Bs))) - end, ok, Grouped) - end. - -post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) -> - ok = (type_to_module(Type)):remove_bindings(Src, Bs); -post_binding_removal(deleted, Src = #exchange{ type = Type }, Bs) -> - ok = (type_to_module(Type)):delete(Src, Bs). + group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), + lists:keysort(#binding.source, Bindings)). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_and_auto_delete([], Acc) -> +group_bindings_fold(_Fun, Acc, []) -> Acc; -group_bindings_and_auto_delete([B = #binding{source = SrcName} | Bs], Acc) -> - group_bindings_and_auto_delete(SrcName, Bs, [B], Acc). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). -group_bindings_and_auto_delete(SrcName, [B = #binding{source = SrcName} | Bs], - Bindings, Acc) -> - group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) -> +group_bindings_fold( + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> %% Either Removed is [], or its head has a non-matching SrcName. - group_bindings_and_auto_delete( - Removed, merge_maybe_auto_delete(SrcName, Bindings, Acc)). - -%% Once a binding source is deleted, we'll never revisit it, so we -%% should never find that the existing entry is {deleted, Bindings}. -merge_maybe_auto_delete(SrcName, Bindings, Acc) -> - UpdateFun = fun (NewResult, Src) -> - dict:update( - SrcName, - fun ({Src1, Result, Bindings1}) -> - {not_undef(Src, Src1), - boolean_or(deleted, Result, NewResult), - [Bindings | Bindings1]} - end, {Src, NewResult, Bindings}, Acc) - end, - case mnesia:read({rabbit_exchange, SrcName}) of - [] -> UpdateFun(deleted, undefined); - [Src] -> case rabbit_exchange:maybe_auto_delete(Src) of - not_deleted -> - UpdateFun(not_deleted, Src); - {auto_deleted, Acc1} -> - merge_binding_dicts(UpdateFun(deleted, Src), Acc1) - end + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + +maybe_auto_delete(XName, Bindings, Deletions) -> + case rabbit_exchange:lookup(XName) of + {error, not_found} -> + add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); + {ok, X} -> + Deletions1 = + add_deletion(XName, {X, not_deleted, Bindings}, Deletions), + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions1; + {deleted, Deletions2} -> combine_deletions(Deletions1, + Deletions2) + end end. -%% Should never find that both have deleted the exchange. -merge_binding_dicts(LHS, RHS) -> - dict:merge( - fun (_SrcName, - {SrcA, IsDeletedA, BindingsL}, {SrcB, IsDeletedB, BindingsR}) -> - {not_undef(SrcA, SrcB), - boolean_or(deleted, IsDeletedA, IsDeletedB), - [BindingsL | BindingsR]} - end, LHS, RHS). - -not_undef(undefined, undefined) -> - undefined; -not_undef(undefined, N) -> - N; -not_undef(N, undefined) -> - N; -not_undef(N, N) -> - N. - -boolean_or(True, True, _Any) -> - True; -boolean_or(True, _Any, True) -> - True; -boolean_or(_True, Any, Any) -> - Any. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). @@ -448,3 +399,50 @@ reverse_binding(#binding{source = SrcName, destination = DstName, key = Key, args = Args}. + +%% ---------------------------------------------------------------------------- +%% Binding / exchange deletion abstraction API +%% ---------------------------------------------------------------------------- + +anything_but(NotThis, NotThis, NotThis) -> + NotThis; +anything_but(NotThis, NotThis, This) -> + This; +anything_but(NotThis, This, NotThis) -> + This; +anything_but(_NotThis, This, This) -> + This. + +boolean_or(True, True, _Any) -> + True; +boolean_or(True, _Any, True) -> + True; +boolean_or(_True, Any, Any) -> + Any. + +new_deletions() -> + dict:new(). + +add_deletion(XName, Init = {X, Deleted, Bindings}, Deletions) -> + dict:update( + XName, fun ({X1, Deleted1, Bindings1}) -> + {anything_but(undefined, X, X1), + boolean_or(deleted, Deleted, Deleted1), + [Bindings | Bindings1]} + end, Init, Deletions). + +combine_deletions(Deletions1, Deletions2) -> + dict:merge( + fun (_XName, {X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), + boolean_or(deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]} + end, Deletions1, Deletions2). + +process_deletions(Deletions) -> + dict:fold( + fun (_XName, {X = #exchange{ type = Type }, not_deleted, Bindings}, ok) -> + (type_to_module(Type)):remove_bindings(X, lists:flatten(Bindings)); + (_XName, {X = #exchange{ type = Type }, deleted, Bindings}, ok) -> + (type_to_module(Type)):delete(X, lists:flatten(Bindings)) + end, ok, Deletions). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 50f49d0c..0ddeca37 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -84,7 +84,7 @@ rabbit_types:error('in_use')). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) - -> 'not_deleted' | {'auto_deleted', dict:dictionary()}). + -> 'not_deleted' | {'deleted', rabbit_binding:dictionary()}). -endif. @@ -278,11 +278,10 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X, Bs, Grouped} -> - Grouped1 = dict:update(XName, fun ({_X, _MaybeDeleted, Bs1}) -> - {X, deleted, [Bs | Bs1]} - end, {X, deleted, Bs}, Grouped), - ok = (rabbit_binding:post_binding_removal_fun(Grouped1))(); + {deleted, X, Bs, Deletions} -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); Error = {error, _InUseOrNotFound} -> Error end. @@ -291,8 +290,8 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, [], Res} -> {auto_deleted, Res} + {error, in_use} -> not_deleted; + {deleted, X, [], Deletions} -> {deleted, Deletions} end. conditional_delete(X = #exchange{name = XName}) -> @@ -302,8 +301,7 @@ conditional_delete(X = #exchange{name = XName}) -> end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_source(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), - rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings, rabbit_binding:remove_for_destination_inner(XName)}. + Bindings = rabbit_binding:remove_for_source(XName), + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -- cgit v1.2.1 From ac7722bba506629cf4ede0d8994f176fb0bc7614 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 11 Oct 2010 16:11:50 +0100 Subject: Typeo in spec, and simplify delete_queues as order of operations here doesn't matter --- src/rabbit_amqqueue.erl | 12 +++++------- src/rabbit_exchange.erl | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d047f7ca..572bfc01 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -474,18 +474,16 @@ on_node_down(Node) -> fun rabbit_binding:combine_deletions/2, rabbit_binding:new_deletions(), rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) end)), ok = rabbit_binding:process_deletions(Deletions). delete_queue(QueueName) -> - Deletions = rabbit_binding:remove_transient_for_destination(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), - Deletions. + rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 0ddeca37..46564233 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -84,7 +84,7 @@ rabbit_types:error('in_use')). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) - -> 'not_deleted' | {'deleted', rabbit_binding:dictionary()}). + -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -endif. -- cgit v1.2.1 From 1c272ccff6319fff22328649a1f3c5b93bd79130 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 11 Oct 2010 16:17:22 +0100 Subject: Make dialyzer marginally happier --- src/rabbit_binding.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index a40671b8..59c59072 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -93,8 +93,8 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(process_deletions/1 :: (deletions()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). --spec(add_deletion/3 :: (rabbit_types:binding_source(), - {'undefined' | rabbit_exchange:name(), +-spec(add_deletion/3 :: (rabbit_exchange:name(), + {'undefined' | rabbit_types:binding_source(), 'deleted' | 'not_deleted', deletions()}, deletions()) -> deletions()). -spec(new_deletions/0 :: () -> deletions()). -- cgit v1.2.1 From bd43b942e85d73431f112119ecdb2090556bcb86 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 11 Oct 2010 22:01:00 +0100 Subject: cosmetic --- src/rabbit_router.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index f185bad4..f337eda3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -102,6 +102,7 @@ match_routing_key(SrcName, RoutingKey) -> key = RoutingKey, _ = '_'}}, mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). + %%-------------------------------------------------------------------- fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; -- cgit v1.2.1 From 786a473fb99e3a2c687fe1cbde0e1978f0687214 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 11 Oct 2010 22:47:25 +0100 Subject: refactoring --- src/rabbit_amqqueue.erl | 21 +++++++------- src/rabbit_binding.erl | 76 +++++++++++++++++++++---------------------------- 2 files changed, 43 insertions(+), 54 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 572bfc01..6dcd04d5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -469,17 +469,16 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - Deletions = - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)), - ok = rabbit_binding:process_deletions(Deletions). + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end))). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 59c59072..8b041fde 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -354,13 +354,12 @@ maybe_auto_delete(XName, Bindings, Deletions) -> {error, not_found} -> add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); {ok, X} -> - Deletions1 = - add_deletion(XName, {X, not_deleted, Bindings}, Deletions), - case rabbit_exchange:maybe_auto_delete(X) of - not_deleted -> Deletions1; - {deleted, Deletions2} -> combine_deletions(Deletions1, - Deletions2) - end + add_deletion(XName, {X, not_deleted, Bindings}, + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions; + {deleted, Deletions1} -> combine_deletions( + Deletions, Deletions1) + end) end. delete_forward_routes(Route) -> @@ -404,45 +403,36 @@ reverse_binding(#binding{source = SrcName, %% Binding / exchange deletion abstraction API %% ---------------------------------------------------------------------------- -anything_but(NotThis, NotThis, NotThis) -> - NotThis; -anything_but(NotThis, NotThis, This) -> - This; -anything_but(NotThis, This, NotThis) -> - This; -anything_but(_NotThis, This, This) -> - This. - -boolean_or(True, True, _Any) -> - True; -boolean_or(True, _Any, True) -> - True; -boolean_or(_True, Any, Any) -> - Any. - -new_deletions() -> - dict:new(). - -add_deletion(XName, Init = {X, Deleted, Bindings}, Deletions) -> - dict:update( - XName, fun ({X1, Deleted1, Bindings1}) -> - {anything_but(undefined, X, X1), - boolean_or(deleted, Deleted, Deleted1), - [Bindings | Bindings1]} - end, Init, Deletions). +anything_but( NotThis, NotThis, NotThis) -> NotThis; +anything_but( NotThis, NotThis, This) -> This; +anything_but( NotThis, This, NotThis) -> This; +anything_but(_NotThis, This, This) -> This. + +boolean_or( True, True, _Any) -> True; +boolean_or( True, _Any, True) -> True; +boolean_or(_True, Any, Any) -> Any. + +new_deletions() -> dict:new(). + +add_deletion(XName, Entry, Deletions) -> + dict:update(XName, fun (Entry1) -> combine_entries(Entry1, Entry) end, + Entry, Deletions). combine_deletions(Deletions1, Deletions2) -> - dict:merge( - fun (_XName, {X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> - {anything_but(undefined, X1, X2), - boolean_or(deleted, Deleted1, Deleted2), - [Bindings1 | Bindings2]} - end, Deletions1, Deletions2). + dict:merge(fun (_XName, E1, E2) -> combine_entries(E1, E2) end, + Deletions1, Deletions2). + +combine_entries({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), boolean_or(deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]}. process_deletions(Deletions) -> dict:fold( - fun (_XName, {X = #exchange{ type = Type }, not_deleted, Bindings}, ok) -> - (type_to_module(Type)):remove_bindings(X, lists:flatten(Bindings)); - (_XName, {X = #exchange{ type = Type }, deleted, Bindings}, ok) -> - (type_to_module(Type)):delete(X, lists:flatten(Bindings)) + fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> + TypeModule = type_to_module(Type), + FlatBindings = lists:flatten(Bindings), + case Deleted of + not_deleted -> TypeModule:remove_bindings(X, FlatBindings); + deleted -> TypeModule:delete(X, FlatBindings) + end end, ok, Deletions). -- cgit v1.2.1 From fc49c47a99b53c8ca27382ec6cff515b753433ec Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 11 Oct 2010 23:00:54 +0100 Subject: rename --- src/rabbit_binding.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 8b041fde..517434c0 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -415,14 +415,14 @@ boolean_or(_True, Any, Any) -> Any. new_deletions() -> dict:new(). add_deletion(XName, Entry, Deletions) -> - dict:update(XName, fun (Entry1) -> combine_entries(Entry1, Entry) end, + dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, Entry, Deletions). combine_deletions(Deletions1, Deletions2) -> - dict:merge(fun (_XName, E1, E2) -> combine_entries(E1, E2) end, + dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, Deletions1, Deletions2). -combine_entries({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> +merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> {anything_but(undefined, X1, X2), boolean_or(deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -- cgit v1.2.1 From 088c1f341fd3c5fb4f8bbcff8ecf0fd7bf86b1be Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 11 Oct 2010 23:42:17 +0100 Subject: boolean_or(True, X, Y) = anything_but(False, X, Y) --- src/rabbit_binding.erl | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 517434c0..53c9c663 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -408,10 +408,6 @@ anything_but( NotThis, NotThis, This) -> This; anything_but( NotThis, This, NotThis) -> This; anything_but(_NotThis, This, This) -> This. -boolean_or( True, True, _Any) -> True; -boolean_or( True, _Any, True) -> True; -boolean_or(_True, Any, Any) -> Any. - new_deletions() -> dict:new(). add_deletion(XName, Entry, Deletions) -> @@ -423,7 +419,8 @@ combine_deletions(Deletions1, Deletions2) -> Deletions1, Deletions2). merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> - {anything_but(undefined, X1, X2), boolean_or(deleted, Deleted1, Deleted2), + {anything_but(undefined, X1, X2), + anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. process_deletions(Deletions) -> -- cgit v1.2.1