summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-09 19:06:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-09 19:06:06 +0100
commit215887d34c8964b3a83b5fa4929dc1d06bc8bf69 (patch)
tree801ee5ddb2dfde0a41f31549ae50fe2773964060
parentc243ede4fd2a08e058a4609ca6c747de48902526 (diff)
downloadrabbitmq-server-215887d34c8964b3a83b5fa4929dc1d06bc8bf69.tar.gz
Reduce distance to default, especially WRT rabbit_amqqueue_process startup.
-rw-r--r--src/rabbit_amqqueue.erl33
-rw-r--r--src/rabbit_amqqueue_process.erl119
-rw-r--r--src/rabbit_prequeue.erl49
3 files changed, 105 insertions, 96 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8b13ac49..aadedda7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -36,7 +36,7 @@
cancel_sync_mirrors/1]).
%% internal
--export([internal_declare/1, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -77,8 +77,9 @@
-> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
{'absent', rabbit_types:amqqueue(), absent_reason()} |
rabbit_types:channel_exit()).
--spec(internal_declare/1 ::
- (rabbit_types:amqqueue())
+%% TODO nonsense
+-spec(internal_declare/2 ::
+ (rabbit_types:amqqueue(), boolean())
-> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} |
{'absent', rabbit_types:amqqueue()}).
-spec(update/2 ::
@@ -277,13 +278,25 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
{init, new}, infinity).
-internal_declare(Q = #amqqueue{name = QueueName}) ->
- case not_found_or_absent(QueueName) of
- not_found -> ok = store_queue(Q),
- B = add_default_binding(Q),
- {new, fun () -> B(), Q end};
- {absent, _Q, _Reason} = R -> R
- end.
+internal_declare(Q, true) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+internal_declare(Q = #amqqueue{name = QueueName}, false) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case not_found_or_absent(QueueName) of
+ not_found -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {absent, _Q, _} = R -> rabbit_misc:const(R)
+ end;
+ [ExistingQ] ->
+ rabbit_misc:const(ExistingQ)
+ end
+ end).
update(Name, Fun) ->
case mnesia:wread({rabbit_queue, Name}) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 829c5523..b21a4c88 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([info_keys/0]).
--export([become/3, init_with_backing_queue_state/7]).
+-export([become/1, init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -107,55 +107,62 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
init(_) -> exit(cannot_be_called_directly).
-%% We have just been declared or recovered
-become(Recover, From, Q = #amqqueue{name = QName,
- exclusive_owner = Owner}) ->
+become(Q = #amqqueue{name = QName}) ->
process_flag(trap_exit, true),
?store_proc_name(QName),
- State = init_state(Q),
- case Owner of
- none -> finish_init(Recover, From, State);
- _ -> case rabbit_misc:is_process_alive(Owner) of %% [1]
- true -> erlang:monitor(process, Owner),
- finish_init(Recover, From, State);
- false -> gen_server2:reply(From, {owner_died, Q}),
- BQ = backing_queue_module(Q),
- {_, Terms} = recovery_status(Recover),
- BQS = bq_init(BQ, Q, Terms),
- %% Rely on terminate to delete the queue.
- {stop, {shutdown, missing_owner},
- State#q{backing_queue = BQ,
- backing_queue_state = BQS}}
- end
- end.
-%% [1] You used to be able to declare an exclusive durable
-%% queue. Sadly we need to still tidy up after that case, there could
-%% be the remnants of one left over from an upgrade. So that's why we
-%% don't enforce Recover = new here.
+ {become, ?MODULE, init_state(Q), hibernate}.
finish_init(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- send_reply(From, Q),
- {RecoveryPid, TermsOrNew} = recovery_status(Recover),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- BQ = backing_queue_module(Q),
- BQS = bq_init(BQ, Q, TermsOrNew),
- recovery_barrier(RecoveryPid),
- State1 = process_args_policy(State#q{backing_queue = BQ,
- backing_queue_state = BQS}),
- notify_decorators(startup, State1),
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- {become, ?MODULE, State1, hibernate}.
+ {Barrier, TermsOrNew} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
+ #amqqueue{} = Q1 ->
+ case matches(Recover, Q, Q1) of
+ true ->
+ send_reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQ = backing_queue_module(Q1),
+ BQS = bq_init(BQ, Q, TermsOrNew),
+ recovery_barrier(Barrier),
+ State1 = process_args_policy(
+ State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
+ notify_decorators(startup, State),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
+ false ->
+ {stop, normal, {existing, Q1}, State}
+ end;
+ Err ->
+ {stop, normal, Err, State}
+ end.
recovery_status(new) -> {no_barrier, new};
recovery_status({Recover, Terms}) -> {Recover, Terms}.
+send_reply(none, _Q) -> ok;
+send_reply(From, Q) -> gen_server2:reply(From, {new, Q}).
+
+matches(new, Q1, Q2) ->
+ %% i.e. not policy
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+matches(_, Q, Q) -> true;
+matches(_, _Q, _Q1) -> false.
+
recovery_barrier(no_barrier) ->
ok;
recovery_barrier(BarrierPid) ->
@@ -165,9 +172,6 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-send_reply(none, _Q) -> ok;
-send_reply(From, Q) -> gen_server2:reply(From, {new, Q}).
-
%% We have been promoted
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, Deliveries, Senders, MTC) ->
@@ -915,6 +919,31 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ finish_init(Recover, From, State);
+
+%% You used to be able to declare an exclusive durable queue. Sadly we
+%% need to still tidy up after that case, there could be the remnants
+%% of one left over from an upgrade. So that's why we don't enforce
+%% Recover = new here.
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ finish_init(Recover, From, State);
+ false -> #q{backing_queue = undefined,
+ backing_queue_state = undefined,
+ q = Q} = State,
+ gen_server2:reply(From, {owner_died, Q}),
+ BQ = backing_queue_module(Q),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
+ %% Rely on terminate to delete the queue.
+ {stop, {shutdown, missing_owner},
+ State#q{backing_queue = BQ, backing_queue_state = BQS}}
+ end;
+
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1058,6 +1087,10 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
+%% TODO exclusive?
+handle_cast(init, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ finish_init({no_barrier, non_clean_shutdown}, none, State);
+
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 5733c732..f95b0ced 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -65,8 +65,8 @@ init({Q, StartMode0, Marker}) ->
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
-handle_cast(init, {Q, declare}) -> init_declared(Q);
-handle_cast(init, {Q, recovery}) -> init_recovery(Q);
+handle_cast(init, {Q, declare}) -> init_master(Q);
+handle_cast(init, {Q, recovery}) -> init_master(Q);
handle_cast(init, {Q, slave}) -> init_slave(Q);
handle_cast(init, {Q, restart}) -> init_restart(Q);
@@ -84,42 +84,8 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-init_declared(Q = #amqqueue{name = QueueName}) ->
- Decl = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> rabbit_amqqueue:internal_declare(Q);
- [ExistingQ] -> {existing, ExistingQ}
- end
- end),
- %% We have just been declared. Block waiting for an init call so
- %% that we can let the declarer know whether we actually started
- %% something.
- receive {'$gen_call', From, {init, new}} ->
- case Decl of
- {new, Fun} ->
- Q1 = Fun(),
- rabbit_amqqueue_process:become(new, From, Q1);
- {absent, _, _} ->
- gen_server2:reply(From, Decl),
- {stop, normal, Q};
- {existing, _} ->
- gen_server2:reply(From, Decl),
- {stop, normal, Q}
- end
- end.
-
-init_recovery(Q) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> ok = rabbit_amqqueue:store_queue(Q) end),
- %% This time we have an init call to ensure the recovery terms do
- %% not sit in the supervisor forever.
- receive {'$gen_call', From, {init, Terms}} ->
- rabbit_amqqueue_process:become(Terms, From, Q)
- end.
-
-init_slave(Q) ->
- rabbit_mirror_queue_slave:become(Q).
+init_master(Q) -> rabbit_amqqueue_process:become(Q).
+init_slave(Q) -> rabbit_mirror_queue_slave:become(Q).
init_restart(#amqqueue{name = QueueName}) ->
{ok, Q = #amqqueue{pid = QPid,
@@ -147,8 +113,5 @@ init_restart(#amqqueue{name = QueueName}) ->
crash_restart(Q = #amqqueue{name = QueueName}) ->
rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end),
- rabbit_amqqueue_process:become(
- {no_barrier, non_clean_shutdown}, none, Q).
+ gen_server2:cast(self(), init),
+ rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}).