summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-10 11:59:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-10 11:59:06 +0100
commit5eb6dbe387b1aa93241333411aecb819bd9c864c (patch)
tree411e17a69caaf4074523fc9db6df956db1af45b2
parent183007616bc81421ccd0bdf96f333591ca637b83 (diff)
downloadrabbitmq-server-5eb6dbe387b1aa93241333411aecb819bd9c864c.tar.gz
Further reduce distance to default (especially in slave), by getting the prequeue to pass straight into the right module at the end of init/1.
-rw-r--r--src/gen_server2.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_mirror_queue_misc.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl42
-rw-r--r--src/rabbit_prequeue.erl64
5 files changed, 75 insertions, 62 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ee82bcb3..d2f96b52 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -69,7 +69,9 @@
%% which will be passed into any of the callback functions in the new
%% module. Note there is no form also encompassing a reply, thus if
%% you wish to reply in handle_call/3 and change the callback module,
-%% you need to use gen_server2:reply/2 to issue the reply manually.
+%% you need to use gen_server2:reply/2 to issue the reply
+%% manually. The init function can similarly return a 5th argument,
+%% Module, in order to dynamically decide the callback module on init.
%%
%% 8) The callback module can optionally implement
%% format_message_queue/2 which is the equivalent of format_status/2
@@ -125,6 +127,7 @@
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% {ok, State, Timeout, Backoff}
+%%% {ok, State, Timeout, Backoff, Module}
%%% ignore
%%% {stop, Reason}
%%%
@@ -242,6 +245,8 @@
{ok, State :: term(), timeout() | hibernate} |
{ok, State :: term(), timeout() | hibernate,
{backoff, millis(), millis(), millis()}} |
+ {ok, State :: term(), timeout() | hibernate,
+ {backoff, millis(), millis(), millis()}, atom()} |
ignore |
{stop, Reason :: term()}.
-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
@@ -568,6 +573,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(GS2State #gs2_state { state = State,
time = Timeout,
timeout_state = Backoff1 });
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} ->
+ Backoff1 = extend_backoff(Backoff),
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { mod = Mod1,
+ state = State,
+ time = Timeout,
+ timeout_state = Backoff1 });
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b21a4c88..e068b93c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([info_keys/0]).
--export([become/1, init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -105,12 +105,12 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
-init(_) -> exit(cannot_be_called_directly).
-
-become(Q = #amqqueue{name = QName}) ->
+init(Q) ->
process_flag(trap_exit, true),
- ?store_proc_name(QName),
- {become, ?MODULE, init_state(Q), hibernate}.
+ ?store_proc_name(Q#amqqueue.name),
+ {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
+ ?MODULE}.
finish_init(Recover, From, State = #q{q = Q,
backing_queue = undefined,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 5cb871e8..aec6f93d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -210,10 +210,7 @@ add_mirror(QName, MirrorNode, SyncMode) ->
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
- case SyncMode of
- sync -> rabbit_mirror_queue_slave:await(SPid);
- async -> ok
- end
+ rabbit_mirror_queue_slave:go(SPid, SyncMode)
end);
{error, not_found} = E ->
E
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c2bf1b89..2da2e7a5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([set_maximum_since_use/2, info/1, become/1, await/1]).
+-export([set_maximum_since_use/2, info/1, go/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -76,12 +76,16 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-await(SPid) -> gen_server2:call(SPid, await, infinity).
+init(Q) ->
+ ?store_proc_name(Q#amqqueue.name),
+ {ok, {not_started, Q}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}, ?MODULE}.
-init(_) -> exit(cannot_be_called_directly).
+go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
+go(SPid, async) -> gen_server2:cast(SPid, go).
-become(Q = #amqqueue{name = QName}) ->
- ?store_proc_name(QName),
+handle_go(Q = #amqqueue{name = QName}) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -136,25 +140,25 @@ become(Q = #amqqueue{name = QName}) ->
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
- {become, ?MODULE, State, hibernate};
+ {ok, State};
{stale, StalePid} ->
rabbit_mirror_queue_misc:log_warning(
QName, "Detected stale HA master: ~p~n", [StalePid]),
gm:leave(GM),
- {stop, {stale_master_pid, StalePid}, Q};
+ {error, {stale_master_pid, StalePid}};
duplicate_live_master ->
gm:leave(GM),
- {stop, {duplicate_live_master, Node}, Q};
+ {error, {duplicate_live_master, Node}};
existing ->
gm:leave(GM),
- {stop, normal, Q};
+ {error, normal};
master_in_recovery ->
gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- {stop, normal, Q}
+ {error, normal}
end.
init_it(Self, GM, Node, QName) ->
@@ -188,8 +192,11 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
-handle_call(await, _From, State) ->
- {reply, ok, State};
+handle_call(go, _From, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {reply, ok, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
@@ -227,6 +234,12 @@ handle_call({gm_deaths, DeadGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
+handle_cast(go, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {noreply, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -307,6 +320,8 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
+terminate(_Reason, {not_started, _Q}) ->
+ ok;
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
@@ -345,6 +360,9 @@ terminate_common(State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_pre_hibernate({not_started, _Q} = State) ->
+ {hibernate, State};
+
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index f95b0ced..708cad53 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -51,54 +51,29 @@ start_link(Q, StartMode, Marker) ->
%%----------------------------------------------------------------------------
-init({Q, StartMode0, Marker}) ->
- %% Hand back to supervisor ASAP
- gen_server2:cast(self(), init),
- StartMode = case is_process_alive(Marker) of
- true -> StartMode0;
- false -> restart
- end,
- {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}}.
-
-handle_call(Msg, _From, State) ->
- {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
-
-handle_cast(init, {Q, declare}) -> init_master(Q);
-handle_cast(init, {Q, recovery}) -> init_master(Q);
-handle_cast(init, {Q, slave}) -> init_slave(Q);
-handle_cast(init, {Q, restart}) -> init_restart(Q);
-
-handle_cast(Msg, State) ->
- {stop, {unexpected_cast, Msg}, State}.
+init({Q, StartMode, Marker}) ->
+ init(Q, case {is_process_alive(Marker), StartMode} of
+ {true, slave} -> slave;
+ {true, _} -> master;
+ {false, _} -> restart
+ end).
-handle_info(Msg, State) ->
- {stop, {unexpected_info, Msg}, State}.
+init(Q, master) -> rabbit_amqqueue_process:init(Q);
+init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-
-init_master(Q) -> rabbit_amqqueue_process:become(Q).
-init_slave(Q) -> rabbit_mirror_queue_slave:become(Q).
-
-init_restart(#amqqueue{name = QueueName}) ->
+init(#amqqueue{name = QueueName}, restart) ->
{ok, Q = #amqqueue{pid = QPid,
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
Local = node(QPid) =:= node(),
Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
case rabbit_misc:is_process_alive(QPid) of
true -> false = Local, %% assertion
- rabbit_mirror_queue_slave:become(Q); %% [1]
+ rabbit_mirror_queue_slave:go(self(), async),
+ rabbit_mirror_queue_slave:init(Q); %% [1]
false -> case Local andalso Slaves =:= [] of
- true -> crash_restart(Q); %% [2]
+ true -> crash_restart(Q); %% [2]
false -> timer:sleep(25),
- init_restart(Q) %% [3]
+ init(Q, restart) %% [3]
end
end.
%% [1] There is a master on another node. Regardless of whether we
@@ -114,4 +89,15 @@ init_restart(#amqqueue{name = QueueName}) ->
crash_restart(Q = #amqqueue{name = QueueName}) ->
rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
gen_server2:cast(self(), init),
- rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}).
+ rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}).
+
+%%----------------------------------------------------------------------------
+
+%% This gen_server2 always hands over to some other module at the end
+%% of init/1.
+handle_call(_Msg, _From, _State) -> exit(unreachable).
+handle_cast(_Msg, _State) -> exit(unreachable).
+handle_info(_Msg, _State) -> exit(unreachable).
+terminate(_Reason, _State) -> exit(unreachable).
+code_change(_OldVsn, _State, _Extra) -> exit(unreachable).
+