From b67b322aba29653d7e7a3808828ac8b85c13294e Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Sun, 4 Mar 2012 13:51:28 +0000 Subject: Notifying queue deletion in rabbit_amqqueue:internal_delete/2. --- src/rabbit_amqqueue.erl | 24 +++++++++++++++--------- src/rabbit_amqqueue_process.erl | 18 ++++++++---------- src/rabbit_tests.erl | 2 +- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c95efa14..0ce7efd6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,7 +32,7 @@ %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -142,11 +142,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/1 :: - (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/2 :: + (name(), pid()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -229,7 +229,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), + false -> TailFun = internal_delete(QueueName, QPid), fun () -> TailFun(), ExistingQ end end end @@ -515,13 +515,19 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName) -> +internal_delete(QueueName, QPid) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - rabbit_binding:process_deletions(Deletions) + T = rabbit_binding:process_deletions(Deletions), + fun() -> + ok = T(), + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QueueName}]) + end end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b8b27443..82962715 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -183,16 +183,14 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - terminate_shutdown(fun (BQS) -> - rabbit_event:notify( - queue_deleted, [{pid, self()}, - {name, QName}]), - BQS1 = BQ:delete_and_terminate(Reason, BQS), - %% don't care if the internal delete - %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), - BQS1 - end, State). + terminate_shutdown( + fun (BQS) -> + BQS1 = BQ:delete_and_terminate(Reason, BQS), + %% don't care if the internal delete + %% doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName, self()), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f7e3baa7..629224b2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2530,7 +2530,7 @@ test_queue_recover() -> {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName) + rabbit_amqqueue:internal_delete(QName, QPid1) end), passed. -- cgit v1.2.1 From 7121f5a1f3766e6fef48d5a2ec54ee224e4feaf7 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Sun, 4 Mar 2012 17:36:18 +0000 Subject: Notifying queue deletion in rabbit_amqqueue:on_node_down/1 --- src/rabbit_amqqueue.erl | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0ce7efd6..48236ca5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -542,14 +542,25 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid, - slave_pids = []} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])), - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels)) + fun () -> QsDels = + qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid, + slave_pids = []} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun({QName, QPid}) -> + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QName}]) + end, Qs) + end end). delete_queue(QueueName) -> -- cgit v1.2.1 From c63f2bf395054787bb855cebfd0f3158acdf84f3 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 26 Mar 2012 16:58:43 +0100 Subject: Cosmetic --- src/rabbit_amqqueue_process.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 82962715..ee547138 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -186,8 +186,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, terminate_shutdown( fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), - %% don't care if the internal delete - %% doesn't return 'ok'. + %% don't care if the internal delete doesn't return 'ok'. rabbit_amqqueue:internal_delete(QName, self()), BQS1 end, State). -- cgit v1.2.1