diff options
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r-- | src/rabbit_exchange.erl | 95 |
1 files changed, 51 insertions, 44 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2a19d5b1..46564233 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -82,8 +82,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> - 'not_deleted' | 'auto_deleted'). +-spec(maybe_auto_delete/1:: + (rabbit_types:exchange()) + -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -endif. @@ -99,11 +100,11 @@ recover() -> end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( - lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#binding.source, Bs), lists:keysort(#exchange.name, Xs), []). -recover_with_bindings([B = #binding{exchange_name = Name} | Rest], - Xs = [#exchange{name = Name} | _], +recover_with_bindings([B = #binding{source = XName} | Rest], + Xs = [#exchange{name = XName} | _], Bindings) -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> @@ -225,38 +226,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + rabbit_router:deliver( + route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + Delivery). + +route(Delivery, {WorkList, SeenXs, QNames}) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(QNames); + {{value, X = #exchange{type = Type}}, WorkList1} -> + DstNames = process_alternate( + X, ((type_to_module(Type)):route(X, Delivery))), + route(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. +process_alternate(#exchange{name = XName, arguments = Args}, []) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> []; + AName -> [AName] + end; +process_alternate(_X, Results) -> + Results. + +process_route(#resource{kind = exchange} = XName, + {WorkList, SeenXs, QNames} = Acc) -> + case sets:is_element(XName, SeenXs) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, sets:add_element(XName, SeenXs), QNames} + end; +process_route(#resource{kind = queue} = QName, + {WorkList, SeenXs, QNames}) -> + {WorkList, SeenXs, [QName | QNames]}. + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of @@ -271,9 +278,10 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X = #exchange{type = Type}, Bs} -> - (type_to_module(Type)):delete(X, Bs), - ok; + {deleted, X, Bs, Deletions} -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); Error = {error, _InUseOrNotFound} -> Error end. @@ -282,19 +290,18 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, []} -> auto_deleted + {error, in_use} -> not_deleted; + {deleted, X, [], Deletions} -> {deleted, Deletions} end. conditional_delete(X = #exchange{name = XName}) -> - case rabbit_binding:has_for_exchange(XName) of + case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); true -> {error, in_use} end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_exchange(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), - rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings}. + Bindings = rabbit_binding:remove_for_source(XName), + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. |