diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-09 19:06:06 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-09 19:06:06 +0100 |
commit | 215887d34c8964b3a83b5fa4929dc1d06bc8bf69 (patch) | |
tree | 801ee5ddb2dfde0a41f31549ae50fe2773964060 | |
parent | c243ede4fd2a08e058a4609ca6c747de48902526 (diff) | |
download | rabbitmq-server-215887d34c8964b3a83b5fa4929dc1d06bc8bf69.tar.gz |
Reduce distance to default, especially WRT rabbit_amqqueue_process startup.
-rw-r--r-- | src/rabbit_amqqueue.erl | 33 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 119 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 49 |
3 files changed, 105 insertions, 96 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8b13ac49..aadedda7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -36,7 +36,7 @@ cancel_sync_mirrors/1]). %% internal --export([internal_declare/1, internal_delete/1, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -77,8 +77,9 @@ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | {'absent', rabbit_types:amqqueue(), absent_reason()} | rabbit_types:channel_exit()). --spec(internal_declare/1 :: - (rabbit_types:amqqueue()) +%% TODO nonsense +-spec(internal_declare/2 :: + (rabbit_types:amqqueue(), boolean()) -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | {'absent', rabbit_types:amqqueue()}). -spec(update/2 :: @@ -277,13 +278,25 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), {init, new}, infinity). -internal_declare(Q = #amqqueue{name = QueueName}) -> - case not_found_or_absent(QueueName) of - not_found -> ok = store_queue(Q), - B = add_default_binding(Q), - {new, fun () -> B(), Q end}; - {absent, _Q, _Reason} = R -> R - end. +internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); +internal_declare(Q = #amqqueue{name = QueueName}, false) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case not_found_or_absent(QueueName) of + not_found -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {absent, _Q, _} = R -> rabbit_misc:const(R) + end; + [ExistingQ] -> + rabbit_misc:const(ExistingQ) + end + end). update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 829c5523..b21a4c88 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([info_keys/0]). --export([become/3, init_with_backing_queue_state/7]). +-export([become/1, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -107,55 +107,62 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). init(_) -> exit(cannot_be_called_directly). -%% We have just been declared or recovered -become(Recover, From, Q = #amqqueue{name = QName, - exclusive_owner = Owner}) -> +become(Q = #amqqueue{name = QName}) -> process_flag(trap_exit, true), ?store_proc_name(QName), - State = init_state(Q), - case Owner of - none -> finish_init(Recover, From, State); - _ -> case rabbit_misc:is_process_alive(Owner) of %% [1] - true -> erlang:monitor(process, Owner), - finish_init(Recover, From, State); - false -> gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, - backing_queue_state = BQS}} - end - end. -%% [1] You used to be able to declare an exclusive durable -%% queue. Sadly we need to still tidy up after that case, there could -%% be the remnants of one left over from an upgrade. So that's why we -%% don't enforce Recover = new here. + {become, ?MODULE, init_state(Q), hibernate}. finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - send_reply(From, Q), - {RecoveryPid, TermsOrNew} = recovery_status(Recover), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQ = backing_queue_module(Q), - BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(RecoveryPid), - State1 = process_args_policy(State#q{backing_queue = BQ, - backing_queue_state = BQS}), - notify_decorators(startup, State1), - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - {become, ?MODULE, State1, hibernate}. + {Barrier, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recover /= new) of + #amqqueue{} = Q1 -> + case matches(Recover, Q, Q1) of + true -> + send_reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q1), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Barrier), + State1 = process_args_policy( + State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, State), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + noreply(State1); + false -> + {stop, normal, {existing, Q1}, State} + end; + Err -> + {stop, normal, Err, State} + end. recovery_status(new) -> {no_barrier, new}; recovery_status({Recover, Terms}) -> {Recover, Terms}. +send_reply(none, _Q) -> ok; +send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). + +matches(new, Q1, Q2) -> + %% i.e. not policy + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; +matches(_, Q, Q) -> true; +matches(_, _Q, _Q1) -> false. + recovery_barrier(no_barrier) -> ok; recovery_barrier(BarrierPid) -> @@ -165,9 +172,6 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -send_reply(none, _Q) -> ok; -send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). - %% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> @@ -915,6 +919,31 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = none}}) -> + finish_init(Recover, From, State); + +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = new here. +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + finish_init(Recover, From, State); + false -> #q{backing_queue = undefined, + backing_queue_state = undefined, + q = Q} = State, + gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, backing_queue_state = BQS}} + end; + handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1058,6 +1087,10 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). +%% TODO exclusive? +handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) -> + finish_init({no_barrier, non_clean_shutdown}, none, State); + handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 5733c732..f95b0ced 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -65,8 +65,8 @@ init({Q, StartMode0, Marker}) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, {Q, declare}) -> init_declared(Q); -handle_cast(init, {Q, recovery}) -> init_recovery(Q); +handle_cast(init, {Q, declare}) -> init_master(Q); +handle_cast(init, {Q, recovery}) -> init_master(Q); handle_cast(init, {Q, slave}) -> init_slave(Q); handle_cast(init, {Q, restart}) -> init_restart(Q); @@ -84,42 +84,8 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_declared(Q = #amqqueue{name = QueueName}) -> - Decl = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> rabbit_amqqueue:internal_declare(Q); - [ExistingQ] -> {existing, ExistingQ} - end - end), - %% We have just been declared. Block waiting for an init call so - %% that we can let the declarer know whether we actually started - %% something. - receive {'$gen_call', From, {init, new}} -> - case Decl of - {new, Fun} -> - Q1 = Fun(), - rabbit_amqqueue_process:become(new, From, Q1); - {absent, _, _} -> - gen_server2:reply(From, Decl), - {stop, normal, Q}; - {existing, _} -> - gen_server2:reply(From, Decl), - {stop, normal, Q} - end - end. - -init_recovery(Q) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> ok = rabbit_amqqueue:store_queue(Q) end), - %% This time we have an init call to ensure the recovery terms do - %% not sit in the supervisor forever. - receive {'$gen_call', From, {init, Terms}} -> - rabbit_amqqueue_process:become(Terms, From, Q) - end. - -init_slave(Q) -> - rabbit_mirror_queue_slave:become(Q). +init_master(Q) -> rabbit_amqqueue_process:become(Q). +init_slave(Q) -> rabbit_mirror_queue_slave:become(Q). init_restart(#amqqueue{name = QueueName}) -> {ok, Q = #amqqueue{pid = QPid, @@ -147,8 +113,5 @@ init_restart(#amqqueue{name = QueueName}) -> crash_restart(Q = #amqqueue{name = QueueName}) -> rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), - rabbit_amqqueue_process:become( - {no_barrier, non_clean_shutdown}, none, Q). + gen_server2:cast(self(), init), + rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}). |