summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-02 15:40:26 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-02 15:40:26 +0100
commit4013096a30291bd1f98b6016e018f405c7dbe0f8 (patch)
treeb387fa6fa21e9e0db3aa7d54c8ec2651f0fab217
parent3b57619d1e4b2ed349390f82e68ba8ae2517c425 (diff)
downloadrabbitmq-server-4013096a30291bd1f98b6016e018f405c7dbe0f8.tar.gz
Reduce diff from bug23554: Extend backing queue api to present reason for termination
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl6
5 files changed, 17 insertions, 17 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e2..295d9039 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f7b710a4..07a24af8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -114,16 +114,16 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc5..217ad3eb 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdff..3f4aa54e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43..a167cca0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.