summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl42
1 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c2bf1b89..2da2e7a5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([set_maximum_since_use/2, info/1, become/1, await/1]).
+-export([set_maximum_since_use/2, info/1, go/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -76,12 +76,16 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-await(SPid) -> gen_server2:call(SPid, await, infinity).
+init(Q) ->
+ ?store_proc_name(Q#amqqueue.name),
+ {ok, {not_started, Q}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}, ?MODULE}.
-init(_) -> exit(cannot_be_called_directly).
+go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
+go(SPid, async) -> gen_server2:cast(SPid, go).
-become(Q = #amqqueue{name = QName}) ->
- ?store_proc_name(QName),
+handle_go(Q = #amqqueue{name = QName}) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -136,25 +140,25 @@ become(Q = #amqqueue{name = QName}) ->
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
- {become, ?MODULE, State, hibernate};
+ {ok, State};
{stale, StalePid} ->
rabbit_mirror_queue_misc:log_warning(
QName, "Detected stale HA master: ~p~n", [StalePid]),
gm:leave(GM),
- {stop, {stale_master_pid, StalePid}, Q};
+ {error, {stale_master_pid, StalePid}};
duplicate_live_master ->
gm:leave(GM),
- {stop, {duplicate_live_master, Node}, Q};
+ {error, {duplicate_live_master, Node}};
existing ->
gm:leave(GM),
- {stop, normal, Q};
+ {error, normal};
master_in_recovery ->
gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- {stop, normal, Q}
+ {error, normal}
end.
init_it(Self, GM, Node, QName) ->
@@ -188,8 +192,11 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
-handle_call(await, _From, State) ->
- {reply, ok, State};
+handle_call(go, _From, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {reply, ok, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
@@ -227,6 +234,12 @@ handle_call({gm_deaths, DeadGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
+handle_cast(go, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {noreply, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -307,6 +320,8 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
+terminate(_Reason, {not_started, _Q}) ->
+ ok;
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
@@ -345,6 +360,9 @@ terminate_common(State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_pre_hibernate({not_started, _Q} = State) ->
+ {hibernate, State};
+
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),