diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-03-26 16:59:00 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-03-26 16:59:00 +0100 |
commit | a8497c025c50278ddae3c4c13ef3717c156147d8 (patch) | |
tree | 8bd3119d58a38534eea1e3dfc8393a4e7e1f26f1 | |
parent | dbecbebefad7e5c4401820e80dd966fa990db39a (diff) | |
parent | c63f2bf395054787bb855cebfd0f3158acdf84f3 (diff) | |
download | rabbitmq-server-a8497c025c50278ddae3c4c13ef3717c156147d8.tar.gz |
Merge bug23610
-rw-r--r-- | src/rabbit_amqqueue.erl | 51 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
3 files changed, 42 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9b6f14ca..c2724a12 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,7 +32,7 @@ %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -144,11 +144,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/1 :: - (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/2 :: + (name(), pid()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), + false -> TailFun = internal_delete(QueueName, QPid), fun () -> TailFun(), ExistingQ end end end @@ -534,13 +534,19 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName) -> +internal_delete(QueueName, QPid) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - rabbit_binding:process_deletions(Deletions) + T = rabbit_binding:process_deletions(Deletions), + fun() -> + ok = T(), + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QueueName}]) + end end end). @@ -555,14 +561,25 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid, - slave_pids = []} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])), - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels)) + fun () -> QsDels = + qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid, + slave_pids = []} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun({QName, QPid}) -> + ok = rabbit_event:notify(queue_deleted, + [{pid, QPid}, + {name, QName}]) + end, Qs) + end end). delete_queue(QueueName) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 83d9ae22..e1fd9bbc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -180,16 +180,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - terminate_shutdown(fun (BQS) -> - rabbit_event:notify( - queue_deleted, [{pid, self()}, - {name, QName}]), - BQS1 = BQ:delete_and_terminate(Reason, BQS), - %% don't care if the internal delete - %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), - BQS1 - end, State). + terminate_shutdown( + fun (BQS) -> + BQS1 = BQ:delete_and_terminate(Reason, BQS), + %% don't care if the internal delete doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName, self()), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 85fe5426..55e4a6f8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2551,7 +2551,7 @@ test_queue_recover() -> {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName) + rabbit_amqqueue:internal_delete(QName, QPid1) end), passed. |