diff options
author | Tim Watson <watson.timothy@gmail.com> | 2014-01-15 14:30:56 +0000 |
---|---|---|
committer | Tim Watson <watson.timothy@gmail.com> | 2014-01-15 14:30:56 +0000 |
commit | d9506fe01ba41065df37e4f95de550918e08c2c8 (patch) | |
tree | 6e251ba6d49c59364f70ad23dffcdeaca2107b51 | |
parent | 01859e16637e82aacaad7d397d55b284c2543f83 (diff) | |
download | rabbitmq-server-d9506fe01ba41065df37e4f95de550918e08c2c8.tar.gz |
Refactor - simplify BQ/QI interface & reduce the distance to default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 7 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 17 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 23 |
6 files changed, 39 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9094acf..51032fc7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -173,10 +173,10 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - {IsRecovering, MediatorPid} = recovery_status(Recover), - case rabbit_amqqueue:internal_declare(Q, IsRecovering) of + {Recovery, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of #amqqueue{} = Q1 -> - case matches(IsRecovering, Q, Q1) of + case matches(Recovery, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( @@ -185,8 +185,8 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, Recover), - recovery_barrier(MediatorPid), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), @@ -203,10 +203,10 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. -recovery_status(new) -> {false, new}; -recovery_status({Recover, _}) -> {true, Recover}. +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. -matches(false, Q1, Q2) -> +matches(new, Q1, Q2) -> %% i.e. not policy Q1#amqqueue.name =:= Q2#amqqueue.name andalso Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso @@ -859,7 +859,7 @@ handle_call({init, Recover}, From, %% You used to be able to declare an exclusive durable queue. Sadly we %% need to still tidy up after that case, there could be the remnants %% of one left over from an upgrade. So that's why we don't enforce -%% Recover = false here. +%% Recover = new here. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> case rabbit_misc:is_process_alive(Owner) of @@ -870,7 +870,8 @@ handle_call({init, Recover}, From, q = Q} = State, gen_server2:reply(From, {owner_died, Q}), BQ = backing_queue_module(Q), - BQS = bq_init(BQ, Q, Recover), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), %% Rely on terminate to delete the queue. {stop, {shutdown, missing_owner}, State#q{backing_queue = BQ, backing_queue_state = BQS}} diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 3361a02e..f66327ac 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -28,7 +28,7 @@ -type(drop_result(Ack) :: ('empty' | {rabbit_types:msg_id(), Ack})). -type(recovery_terms() :: [term()] | 'non_clean_shutdown'). --type(recovery_info() :: 'new' | {pid(), recovery_terms()}). +-type(recovery_info() :: 'new' | recovery_terms()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index cb2e272f..da185dce 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, {false, []}), + BQS = bq_init(BQ, Q1, []), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e05b2174..292ad983 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -194,8 +194,7 @@ -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown'). --type(shutdown_terms() :: {recovery_type(), [any()]}). +-type(shutdown_terms() :: [any()] | 'non_clean_shutdown'). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), @@ -234,11 +233,11 @@ init(Name, OnSyncFun) -> false = rabbit_file:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. -recover(Name, {Recovery, Terms}, MsgStoreRecovered, +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = blank_state(Name), State1 = State #qistate { on_sync = OnSyncFun }, - CleanShutdown = Recovery =/= non_clean_shutdown, + CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), init_clean(RecoveredCounts, State1); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 07925dbf..af2a6e86 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2132,7 +2132,7 @@ init_test_queue() -> PRef = rabbit_guid:gen(), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( - TestQueue, {clean_shutdown, []}, false, + TestQueue, [], false, fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, @@ -2334,7 +2334,10 @@ test_queue_index() -> variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/1). + Q, case Recover of + true -> non_clean_shutdown; + false -> new + end, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2384,7 +2387,7 @@ with_fresh_variable_queue(Fun) -> %% bump_credit messages and we want to ignore them spawn_link(fun() -> ok = empty_test_queue(), - VQ = variable_queue_init(test_amqqueue(true), new), + VQ = variable_queue_init(test_amqqueue(true), false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, @@ -2773,8 +2776,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), - VQ7 = variable_queue_init(test_amqqueue(true), - {self(), non_clean_shutdown}), + VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2791,8 +2793,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), - VQ7 = variable_queue_init(test_amqqueue(true), - {self(), non_clean_shutdown}), + VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2824,7 +2825,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), exit(QPid1, shutdown), - VQ1 = variable_queue_init(Q, {self(), non_clean_shutdown}), + VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), CountMinusOne = rabbit_variable_queue:len(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 99035324..58647d3b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -432,30 +432,30 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, {_, Terms}, +init(#amqqueue { name = QueueName, durable = true }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - {PRef, Recovery, Terms1} = process_recovery_terms(Terms), + {PRef, RecoveryTerms} = process_recovery_terms(Terms), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( - QueueName, {Recovery, Terms1}, + QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, RecoveryTerms, PersistentClient, TransientClient). -process_recovery_terms(Recovery=non_clean_shutdown) -> - {rabbit_guid:gen(), Recovery, []}; +process_recovery_terms(Terms=non_clean_shutdown) -> + {rabbit_guid:gen(), Terms}; process_recovery_terms(Terms) -> case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), clean_shutdown, []}; - PRef1 -> {PRef1, clean_shutdown, Terms} + undefined -> {rabbit_guid:gen(), []}; + PRef1 -> {PRef1, Terms} end. terminate(_Reason, State) -> @@ -1008,7 +1008,12 @@ init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + DeltaCount1 = + case Terms of + non_clean_shutdown -> DeltaCount; + _ -> proplists:get_value(persistent_count, + Terms, DeltaCount) + end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; false -> d(#delta { start_seq_id = LowSeqId, |