diff options
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 58 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 27 |
3 files changed, 53 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1aba7ecb..e45e026e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -612,7 +612,7 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -internal_delete1(QueueName) -> +internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log @@ -622,7 +622,7 @@ internal_delete1(QueueName) -> end, %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_destination(QueueName). + rabbit_binding:remove_for_destination(QueueName, OnlyDurable). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -632,7 +632,7 @@ internal_delete(QueueName) -> {[], []} -> rabbit_misc:const({error, not_found}); _ -> - Deletions = internal_delete1(QueueName), + Deletions = internal_delete1(QueueName, false), T = rabbit_binding:process_deletions(Deletions), fun() -> ok = T(), @@ -651,7 +651,7 @@ forget_all_durable(Node) -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), [rabbit_binding:process_deletions( - internal_delete1(Name)) || + internal_delete1(Name, true)) || #amqqueue{name = Name, pid = Pid} = Q <- Qs, node(Pid) =:= Node, rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7a095e06..d887f26a 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -25,7 +25,7 @@ -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1]). + remove_for_destination/2, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -78,8 +78,8 @@ -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). --spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_for_destination/2 :: + (rabbit_types:binding_destination(), boolean()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). -spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). @@ -215,7 +215,8 @@ remove(Binding, InnerFun) -> remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), - Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), + Deletions = maybe_auto_delete( + B#binding.source, [B], new_deletions(), false), process_deletions(Deletions). list(VHostPath) -> @@ -298,11 +299,11 @@ remove_for_source(SrcName) -> mnesia:match_object(rabbit_route, Match, write) ++ mnesia:match_object(rabbit_semi_durable_route, Match, write))). -remove_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_routes/1). +remove_for_destination(DstName, OnlyDurable) -> + remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). remove_transient_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_transient_routes/1). + remove_for_destination(DstName, false, fun remove_transient_routes/1). %%---------------------------------------------------------------------------- @@ -428,36 +429,47 @@ remove_transient_routes(Routes) -> R#route.binding end || R <- Routes]. -remove_for_destination(DstName, Fun) -> +remove_for_destination(DstName, OnlyDurable, Fun) -> lock_route_tables(), - Match = reverse_route( - #route{binding = #binding{destination = DstName, _ = '_'}}), - Routes = [reverse_route(R) || R <- mnesia:match_object( - rabbit_reverse_route, Match, write)], + MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}}, + MatchRev = reverse_route(MatchFwd), + Routes = case OnlyDurable of + false -> [reverse_route(R) || + R <- mnesia:match_object( + rabbit_reverse_route, MatchRev, write)]; + true -> lists:usort( + mnesia:match_object( + rabbit_durable_route, MatchFwd, write) ++ + mnesia:match_object( + rabbit_semi_durable_route, MatchFwd, write)) + end, Bindings = Fun(Routes), - group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), - lists:keysort(#binding.source, Bindings)). + group_bindings_fold(fun maybe_auto_delete/4, new_deletions(), + lists:keysort(#binding.source, Bindings), OnlyDurable). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_fold(_Fun, Acc, []) -> +group_bindings_fold(_Fun, Acc, [], _OnlyDurable) -> Acc; -group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs], + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable). group_bindings_fold( - Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); -group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings, + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) -> %% Either Removed is [], or its head has a non-matching SrcName. - group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed, + OnlyDurable). -maybe_auto_delete(XName, Bindings, Deletions) -> +maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) -> {Entry, Deletions1} = case mnesia:read({rabbit_exchange, XName}) of [] -> {{undefined, not_deleted, Bindings}, Deletions}; - [X] -> case rabbit_exchange:maybe_auto_delete(X) of + [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of not_deleted -> {{X, not_deleted, Bindings}, Deletions}; {deleted, Deletions2} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4d4a2a58..685c311f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). +-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -86,8 +86,8 @@ -spec(validate_binding/2 :: (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). --spec(maybe_auto_delete/1:: - (rabbit_types:exchange()) +-spec(maybe_auto_delete/2:: + (rabbit_types:exchange(), boolean()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -spec(serial/1 :: (rabbit_types:exchange()) -> fun((boolean()) -> 'none' | pos_integer())). @@ -400,13 +400,13 @@ call_with_exchange(XName, Fun) -> delete(XName, IfUnused) -> Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 + true -> fun conditional_delete/2; + false -> fun unconditional_delete/2 end, call_with_exchange( XName, fun (X) -> - case Fun(X) of + case Fun(X, false) of {deleted, X, Bs, Deletions} -> rabbit_binding:process_deletions( rabbit_binding:add_deletion( @@ -420,21 +420,21 @@ validate_binding(X = #exchange{type = XType}, Binding) -> Module = type_to_module(XType), Module:validate_binding(X, Binding). -maybe_auto_delete(#exchange{auto_delete = false}) -> +maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = X) -> - case conditional_delete(X) of +maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) -> + case conditional_delete(X, OnlyDurable) of {error, in_use} -> not_deleted; {deleted, X, [], Deletions} -> {deleted, Deletions} end. -conditional_delete(X = #exchange{name = XName}) -> +conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> case rabbit_binding:has_for_source(XName) of - false -> unconditional_delete(X); + false -> unconditional_delete(X, OnlyDurable); true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = XName}) -> +unconditional_delete(X = #exchange{name = XName}, OnlyDurable) -> %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log case mnesia:wread({rabbit_durable_exchange, XName}) of @@ -444,7 +444,8 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), - {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + {deleted, X, Bindings, rabbit_binding:remove_for_destination( + XName, OnlyDurable)}. next_serial(XName) -> Serial = peek_serial(XName, write), |