diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 42 |
1 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c2bf1b89..2da2e7a5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([set_maximum_since_use/2, info/1, become/1, await/1]). +-export([set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -76,12 +76,16 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -await(SPid) -> gen_server2:call(SPid, await, infinity). +init(Q) -> + ?store_proc_name(Q#amqqueue.name), + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}, ?MODULE}. -init(_) -> exit(cannot_be_called_directly). +go(SPid, sync) -> gen_server2:call(SPid, go, infinity); +go(SPid, async) -> gen_server2:cast(SPid, go). -become(Q = #amqqueue{name = QName}) -> - ?store_proc_name(QName), +handle_go(Q = #amqqueue{name = QName}) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -136,25 +140,25 @@ become(Q = #amqqueue{name = QName}) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {become, ?MODULE, State, hibernate}; + {ok, State}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), - {stop, {stale_master_pid, StalePid}, Q}; + {error, {stale_master_pid, StalePid}}; duplicate_live_master -> gm:leave(GM), - {stop, {duplicate_live_master, Node}, Q}; + {error, {duplicate_live_master, Node}}; existing -> gm:leave(GM), - {stop, normal, Q}; + {error, normal}; master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {stop, normal, Q} + {error, normal} end. init_it(Self, GM, Node, QName) -> @@ -188,8 +192,11 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). -handle_call(await, _From, State) -> - {reply, ok, State}; +handle_call(go, _From, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {reply, ok, State}; + {error, Error} -> {stop, Error, NotStarted} + end; handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { @@ -227,6 +234,12 @@ handle_call({gm_deaths, DeadGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {noreply, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -307,6 +320,8 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. +terminate(_Reason, {not_started, _Q}) -> + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -345,6 +360,9 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate({not_started, _Q} = State) -> + {hibernate, State}; + handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), |