summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl95
1 files changed, 51 insertions, 44 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2a19d5b1..46564233 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -82,8 +82,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
- 'not_deleted' | 'auto_deleted').
+-spec(maybe_auto_delete/1::
+ (rabbit_types:exchange())
+ -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-endif.
@@ -99,11 +100,11 @@ recover() ->
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
recover_with_bindings(
- lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#binding.source, Bs),
lists:keysort(#exchange.name, Xs), []).
-recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
- Xs = [#exchange{name = Name} | _],
+recover_with_bindings([B = #binding{source = XName} | Rest],
+ Xs = [#exchange{name = XName} | _],
Bindings) ->
recover_with_bindings(Rest, Xs, [B | Bindings]);
recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
@@ -225,38 +226,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case (type_to_module(Type)):publish(X, Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true -> R;
- false -> case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+publish(X = #exchange{name = XName}, Delivery) ->
+ rabbit_router:deliver(
+ route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ Delivery).
+
+route(Delivery, {WorkList, SeenXs, QNames}) ->
+ case queue:out(WorkList) of
+ {empty, _WorkList} ->
+ lists:usort(QNames);
+ {{value, X = #exchange{type = Type}}, WorkList1} ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route(Delivery,
+ lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
+ DstNames))
end.
+process_alternate(#exchange{name = XName, arguments = Args}, []) ->
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+ undefined -> [];
+ AName -> [AName]
+ end;
+process_alternate(_X, Results) ->
+ Results.
+
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, SeenXs, QNames} = Acc) ->
+ case sets:is_element(XName, SeenXs) of
+ true -> Acc;
+ false -> {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, sets:add_element(XName, SeenXs), QNames}
+ end;
+process_route(#resource{kind = queue} = QName,
+ {WorkList, SeenXs, QNames}) ->
+ {WorkList, SeenXs, [QName | QNames]}.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
@@ -271,9 +278,10 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
case call_with_exchange(XName, Fun) of
- {deleted, X = #exchange{type = Type}, Bs} ->
- (type_to_module(Type)):delete(X, Bs),
- ok;
+ {deleted, X, Bs, Deletions} ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
Error = {error, _InUseOrNotFound} ->
Error
end.
@@ -282,19 +290,18 @@ maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
case conditional_delete(X) of
- {error, in_use} -> not_deleted;
- {deleted, X, []} -> auto_deleted
+ {error, in_use} -> not_deleted;
+ {deleted, X, [], Deletions} -> {deleted, Deletions}
end.
conditional_delete(X = #exchange{name = XName}) ->
- case rabbit_binding:has_for_exchange(XName) of
+ case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
true -> {error, in_use}
end.
unconditional_delete(X = #exchange{name = XName}) ->
- Bindings = rabbit_binding:remove_for_exchange(XName),
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
- rabbit_event:notify(exchange_deleted, [{name, XName}]),
- {deleted, X, Bindings}.
+ Bindings = rabbit_binding:remove_for_source(XName),
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.