summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2014-01-15 14:30:56 +0000
committerTim Watson <watson.timothy@gmail.com>2014-01-15 14:30:56 +0000
commitd9506fe01ba41065df37e4f95de550918e08c2c8 (patch)
tree6e251ba6d49c59364f70ad23dffcdeaca2107b51
parent01859e16637e82aacaad7d397d55b284c2543f83 (diff)
downloadrabbitmq-server-d9506fe01ba41065df37e4f95de550918e08c2c8.tar.gz
Refactor - simplify BQ/QI interface & reduce the distance to default
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_queue_index.erl7
-rw-r--r--src/rabbit_tests.erl17
-rw-r--r--src/rabbit_variable_queue.erl23
6 files changed, 39 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9094acf..51032fc7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -173,10 +173,10 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- {IsRecovering, MediatorPid} = recovery_status(Recover),
- case rabbit_amqqueue:internal_declare(Q, IsRecovering) of
+ {Recovery, TermsOrNew} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
#amqqueue{} = Q1 ->
- case matches(IsRecovering, Q, Q1) of
+ case matches(Recovery, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -185,8 +185,8 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
- BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(MediatorPid),
+ BQS = bq_init(BQ, Q, TermsOrNew),
+ recovery_barrier(Recovery),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -203,10 +203,10 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
-recovery_status(new) -> {false, new};
-recovery_status({Recover, _}) -> {true, Recover}.
+recovery_status(new) -> {new, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
-matches(false, Q1, Q2) ->
+matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
@@ -859,7 +859,7 @@ handle_call({init, Recover}, From,
%% You used to be able to declare an exclusive durable queue. Sadly we
%% need to still tidy up after that case, there could be the remnants
%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = false here.
+%% Recover = new here.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rabbit_misc:is_process_alive(Owner) of
@@ -870,7 +870,8 @@ handle_call({init, Recover}, From,
q = Q} = State,
gen_server2:reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
- BQS = bq_init(BQ, Q, Recover),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 3361a02e..f66327ac 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -28,7 +28,7 @@
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
-type(recovery_terms() :: [term()] | 'non_clean_shutdown').
--type(recovery_info() :: 'new' | {pid(), recovery_terms()}).
+-type(recovery_info() :: 'new' | recovery_terms()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cb2e272f..da185dce 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, {false, []}),
+ BQS = bq_init(BQ, Q1, []),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e05b2174..292ad983 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -194,8 +194,7 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown').
--type(shutdown_terms() :: {recovery_type(), [any()]}).
+-type(shutdown_terms() :: [any()] | 'non_clean_shutdown').
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
@@ -234,11 +233,11 @@ init(Name, OnSyncFun) ->
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
-recover(Name, {Recovery, Terms}, MsgStoreRecovered,
+recover(Name, Terms, MsgStoreRecovered,
ContainsCheckFun, OnSyncFun) ->
State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown = Recovery =/= non_clean_shutdown,
+ CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 07925dbf..af2a6e86 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2132,7 +2132,7 @@ init_test_queue() ->
PRef = rabbit_guid:gen(),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, {clean_shutdown, []}, false,
+ TestQueue, [], false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
@@ -2334,7 +2334,10 @@ test_queue_index() ->
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/2, fun nop/2, fun nop/1).
+ Q, case Recover of
+ true -> non_clean_shutdown;
+ false -> new
+ end, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
@@ -2384,7 +2387,7 @@ with_fresh_variable_queue(Fun) ->
%% bump_credit messages and we want to ignore them
spawn_link(fun() ->
ok = empty_test_queue(),
- VQ = variable_queue_init(test_amqqueue(true), new),
+ VQ = variable_queue_init(test_amqqueue(true), false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta,
@@ -2773,8 +2776,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
- VQ7 = variable_queue_init(test_amqqueue(true),
- {self(), non_clean_shutdown}),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2791,8 +2793,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
- VQ7 = variable_queue_init(test_amqqueue(true),
- {self(), non_clean_shutdown}),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2824,7 +2825,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
exit(QPid1, shutdown),
- VQ1 = variable_queue_init(Q, {self(), non_clean_shutdown}),
+ VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
CountMinusOne = rabbit_variable_queue:len(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 99035324..58647d3b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -432,30 +432,30 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, {_, Terms},
+init(#amqqueue { name = QueueName, durable = true }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- {PRef, Recovery, Terms1} = process_recovery_terms(Terms),
+ {PRef, RecoveryTerms} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, {Recovery, Terms1},
+ QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, RecoveryTerms,
PersistentClient, TransientClient).
-process_recovery_terms(Recovery=non_clean_shutdown) ->
- {rabbit_guid:gen(), Recovery, []};
+process_recovery_terms(Terms=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Terms};
process_recovery_terms(Terms) ->
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), clean_shutdown, []};
- PRef1 -> {PRef1, clean_shutdown, Terms}
+ undefined -> {rabbit_guid:gen(), []};
+ PRef1 -> {PRef1, Terms}
end.
terminate(_Reason, State) ->
@@ -1008,7 +1008,12 @@ init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ DeltaCount1 =
+ case Terms of
+ non_clean_shutdown -> DeltaCount;
+ _ -> proplists:get_value(persistent_count,
+ Terms, DeltaCount)
+ end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
false -> d(#delta { start_seq_id = LowSeqId,