summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-10-05 18:36:17 +0100
committerBen Hood <0x6e6562@gmail.com>2008-10-05 18:36:17 +0100
commit87262ced6866eeb2c62a26949824140861d73f71 (patch)
treeeea6f46a112ad5344f6b3c4e5a1f67eb4eb8b81f
parentdbc799e2d322a3df515b9afa699bf294f03dceae (diff)
downloadrabbitmq-server-87262ced6866eeb2c62a26949824140861d73f71.tar.gz
Fixes for auto-delete exchanges
-rw-r--r--src/rabbit_exchange.erl94
1 files changed, 67 insertions, 27 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index bd75e301..bfd47039 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -43,6 +43,8 @@
-import(qlc).
-import(regexp).
+-define(CHUNK_SIZE, 10).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -224,38 +226,73 @@ delete_bindings(Binding = #binding{}) ->
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(ReverseRoute),
ok = mnesia:delete_object(durable_routes, Route, write);
-
+
+% Must be called in a transaction
delete_bindings(QueueName) ->
- % TODO: The head of this list *SHOULD* always be the default exchange
- % what if somebody nukes it?
- [_|Exchanges] = exchanges_for_queue(QueueName),
- lists:foreach(fun (Name) ->
- Exchange = #exchange{name = Name, auto_delete = true,
- type = '_', durable = '_',
- arguments = '_'},
- ok = mnesia:delete_object(Exchange),
- ok = mnesia:delete_object(durable_exchanges,
- Exchange, write)
- end, Exchanges),
-
- % TODO: What about auto_delete on durable exchanges?
delete_bindings(#binding{exchange_name = '_',
queue_name = QueueName,
- key = '_'}).
-
+ key = '_'}),
+ lists:foreach(fun(ExchangeName) ->
+ call_with_exchange(ExchangeName,
+ fun(Exchange) ->
+ if Exchange#exchange.auto_delete ->
+ Predicate = fun(E) -> E == QueueName end,
+ case has_bindings(ExchangeName, Predicate) of
+ true -> ok;
+ false -> do_internal_delete(ExchangeName)
+ end;
+ true -> ok
+ end
+ end)
+ end, exchanges_for_queue(QueueName)).
+
exchanges_for_queue(QueueName) ->
- MatchHead = #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- key = '_'}},
- mnesia:dirty_select(route, [{MatchHead, [], ['$1']}]).
+ MatchHead = #reverse_route{reverse_binding =
+ #reverse_binding{exchange_name = '$1',
+ queue_name = QueueName,
+ key = '_'}},
+ sets:to_list(sets:from_list(
+ mnesia:dirty_select(reverse_route, [{MatchHead, [], ['$1']}]))).
+
+
+has_bindings(ExchangeName) ->
+ has_bindings(ExchangeName, fun(_) -> false end).
-routes_for_exchange(ExchangeName) ->
+has_bindings(ExchangeName, Predicate) ->
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = '_',
- key = '$1'}},
- mnesia:dirty_select(route, [{MatchHead, [], ['$1']}]).
+ queue_name = '$1',
+ key = '_'}},
+ case mnesia:select(route,
+ [{MatchHead, [], ['$1']}], ?CHUNK_SIZE, write) of
+ '$end_of_table' -> ok;
+ {Routes, Continuation} ->
+ case lists:dropwhile(Predicate, Routes) of
+ [] -> continue(Continuation, Predicate);
+ _ -> true
+ end
+ end.
+
+continue(Continuation, Predicate) ->
+ case mnesia:select(Continuation) of
+ '$end_of_table' -> false;
+ {Routes, Cont} ->
+ case lists:dropwhile(Predicate, Routes) of
+ [] -> continue(Cont, Predicate);
+ _ -> true
+ end
+ end.
+
+call_with_exchange(Exchange, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({exchange, Exchange}) of
+ [] -> {error, exchange_not_found};
+ [X] -> Fun(X)
+ end
+ end).
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ % TODO Refactor to avoid duplication with call_with_exchange/2
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({exchange, Exchange}) of
@@ -362,14 +399,17 @@ delete(ExchangeName, IfUnused) ->
fun () -> internal_delete(ExchangeName, IfUnused) end).
internal_delete(ExchangeName, _IfUnused = true) ->
- case routes_for_exchange(ExchangeName) of
- [] -> do_internal_delete(ExchangeName);
- _ -> {error, in_use}
+ case has_bindings(ExchangeName) of
+ false -> do_internal_delete(ExchangeName);
+ true -> {error, in_use}
end;
internal_delete(ExchangeName, false) ->
do_internal_delete(ExchangeName).
+% TODO: Think about an optional do_internal_delete that takes an Exchange
+% instead of an Exchange, i.e. something that has already done the lookup
+% already, e.g. delete_bindings/1
do_internal_delete(ExchangeName) ->
case mnesia:wread({exchange, ExchangeName}) of
[] -> {error, not_found};