diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 11:59:06 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 11:59:06 +0100 |
commit | 5eb6dbe387b1aa93241333411aecb819bd9c864c (patch) | |
tree | 411e17a69caaf4074523fc9db6df956db1af45b2 | |
parent | 183007616bc81421ccd0bdf96f333591ca637b83 (diff) | |
download | rabbitmq-server-5eb6dbe387b1aa93241333411aecb819bd9c864c.tar.gz |
Further reduce distance to default (especially in slave), by getting the prequeue to pass straight into the right module at the end of init/1.
-rw-r--r-- | src/gen_server2.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 42 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 64 |
5 files changed, 75 insertions, 62 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ee82bcb3..d2f96b52 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -69,7 +69,9 @@ %% which will be passed into any of the callback functions in the new %% module. Note there is no form also encompassing a reply, thus if %% you wish to reply in handle_call/3 and change the callback module, -%% you need to use gen_server2:reply/2 to issue the reply manually. +%% you need to use gen_server2:reply/2 to issue the reply +%% manually. The init function can similarly return a 5th argument, +%% Module, in order to dynamically decide the callback module on init. %% %% 8) The callback module can optionally implement %% format_message_queue/2 which is the equivalent of format_status/2 @@ -125,6 +127,7 @@ %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} +%%% {ok, State, Timeout, Backoff, Module} %%% ignore %%% {stop, Reason} %%% @@ -242,6 +245,8 @@ {ok, State :: term(), timeout() | hibernate} | {ok, State :: term(), timeout() | hibernate, {backoff, millis(), millis(), millis()}} | + {ok, State :: term(), timeout() | hibernate, + {backoff, millis(), millis(), millis()}, atom()} | ignore | {stop, Reason :: term()}. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, @@ -568,6 +573,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(GS2State #gs2_state { state = State, time = Timeout, timeout_state = Backoff1 }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { mod = Mod1, + state = State, + time = Timeout, + timeout_state = Backoff1 }); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b21a4c88..e068b93c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([info_keys/0]). --export([become/1, init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -105,12 +105,12 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(_) -> exit(cannot_be_called_directly). - -become(Q = #amqqueue{name = QName}) -> +init(Q) -> process_flag(trap_exit, true), - ?store_proc_name(QName), - {become, ?MODULE, init_state(Q), hibernate}. + ?store_proc_name(Q#amqqueue.name), + {ok, init_state(Q#amqqueue{pid = self()}), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, + ?MODULE}. finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5cb871e8..aec6f93d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -210,10 +210,7 @@ add_mirror(QName, MirrorNode, SyncMode) -> MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), - case SyncMode of - sync -> rabbit_mirror_queue_slave:await(SPid); - async -> ok - end + rabbit_mirror_queue_slave:go(SPid, SyncMode) end); {error, not_found} = E -> E diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c2bf1b89..2da2e7a5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([set_maximum_since_use/2, info/1, become/1, await/1]). +-export([set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -76,12 +76,16 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -await(SPid) -> gen_server2:call(SPid, await, infinity). +init(Q) -> + ?store_proc_name(Q#amqqueue.name), + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}, ?MODULE}. -init(_) -> exit(cannot_be_called_directly). +go(SPid, sync) -> gen_server2:call(SPid, go, infinity); +go(SPid, async) -> gen_server2:cast(SPid, go). -become(Q = #amqqueue{name = QName}) -> - ?store_proc_name(QName), +handle_go(Q = #amqqueue{name = QName}) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -136,25 +140,25 @@ become(Q = #amqqueue{name = QName}) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {become, ?MODULE, State, hibernate}; + {ok, State}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), - {stop, {stale_master_pid, StalePid}, Q}; + {error, {stale_master_pid, StalePid}}; duplicate_live_master -> gm:leave(GM), - {stop, {duplicate_live_master, Node}, Q}; + {error, {duplicate_live_master, Node}}; existing -> gm:leave(GM), - {stop, normal, Q}; + {error, normal}; master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {stop, normal, Q} + {error, normal} end. init_it(Self, GM, Node, QName) -> @@ -188,8 +192,11 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). -handle_call(await, _From, State) -> - {reply, ok, State}; +handle_call(go, _From, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {reply, ok, State}; + {error, Error} -> {stop, Error, NotStarted} + end; handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { @@ -227,6 +234,12 @@ handle_call({gm_deaths, DeadGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {noreply, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -307,6 +320,8 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. +terminate(_Reason, {not_started, _Q}) -> + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -345,6 +360,9 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate({not_started, _Q} = State) -> + {hibernate, State}; + handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index f95b0ced..708cad53 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -51,54 +51,29 @@ start_link(Q, StartMode, Marker) -> %%---------------------------------------------------------------------------- -init({Q, StartMode0, Marker}) -> - %% Hand back to supervisor ASAP - gen_server2:cast(self(), init), - StartMode = case is_process_alive(Marker) of - true -> StartMode0; - false -> restart - end, - {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}. - -handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. - -handle_cast(init, {Q, declare}) -> init_master(Q); -handle_cast(init, {Q, recovery}) -> init_master(Q); -handle_cast(init, {Q, slave}) -> init_slave(Q); -handle_cast(init, {Q, restart}) -> init_restart(Q); - -handle_cast(Msg, State) -> - {stop, {unexpected_cast, Msg}, State}. +init({Q, StartMode, Marker}) -> + init(Q, case {is_process_alive(Marker), StartMode} of + {true, slave} -> slave; + {true, _} -> master; + {false, _} -> restart + end). -handle_info(Msg, State) -> - {stop, {unexpected_info, Msg}, State}. +init(Q, master) -> rabbit_amqqueue_process:init(Q); +init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- - -init_master(Q) -> rabbit_amqqueue_process:become(Q). -init_slave(Q) -> rabbit_mirror_queue_slave:become(Q). - -init_restart(#amqqueue{name = QueueName}) -> +init(#amqqueue{name = QueueName}, restart) -> {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), Local = node(QPid) =:= node(), Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], case rabbit_misc:is_process_alive(QPid) of true -> false = Local, %% assertion - rabbit_mirror_queue_slave:become(Q); %% [1] + rabbit_mirror_queue_slave:go(self(), async), + rabbit_mirror_queue_slave:init(Q); %% [1] false -> case Local andalso Slaves =:= [] of - true -> crash_restart(Q); %% [2] + true -> crash_restart(Q); %% [2] false -> timer:sleep(25), - init_restart(Q) %% [3] + init(Q, restart) %% [3] end end. %% [1] There is a master on another node. Regardless of whether we @@ -114,4 +89,15 @@ init_restart(#amqqueue{name = QueueName}) -> crash_restart(Q = #amqqueue{name = QueueName}) -> rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), gen_server2:cast(self(), init), - rabbit_amqqueue_process:become(Q#amqqueue{pid = self()}). + rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}). + +%%---------------------------------------------------------------------------- + +%% This gen_server2 always hands over to some other module at the end +%% of init/1. +handle_call(_Msg, _From, _State) -> exit(unreachable). +handle_cast(_Msg, _State) -> exit(unreachable). +handle_info(_Msg, _State) -> exit(unreachable). +terminate(_Reason, _State) -> exit(unreachable). +code_change(_OldVsn, _State, _Extra) -> exit(unreachable). + |