diff options
Diffstat (limited to 'src/rabbit_mirror_queue_coordinator.erl')
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 69 |
1 files changed, 48 insertions, 21 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index f6664a27..0b9f053f 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/3, get_gm/1, ensure_monitoring/2]). +-export([start_link/4, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,15 +32,17 @@ -record(state, { q, gm, monitors, - death_fun + death_fun, + length_fun }). -define(ONE_SECOND, 1000). -ifdef(use_specs). --spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', - rabbit_mirror_queue_master:death_fun()) -> +-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:length_fun()) -> rabbit_types:ok_pid_or_error()). -spec(get_gm/1 :: (pid()) -> pid()). -spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). @@ -138,9 +140,28 @@ %% state of the master. The detection of the sync-status of a slave is %% done entirely based on length: if the slave and the master both %% agree on the length of the queue after the fetch of the head of the -%% queue, then the queues must be in sync. The only other possibility -%% is that the slave's queue is shorter, and thus the fetch should be -%% ignored. +%% queue (or a 'set_length' results in a slave having to drop some +%% messages from the head of its queue), then the queues must be in +%% sync. The only other possibility is that the slave's queue is +%% shorter, and thus the fetch should be ignored. In case slaves are +%% joined to an empty queue which only goes on to receive publishes, +%% they start by asking the master to broadcast its length. This is +%% enough for slaves to always be able to work out when their head +%% does not differ from the master (and is much simpler and cheaper +%% than getting the master to hang on to the guid of the msg at the +%% head of its queue). When a slave is promoted to a master, it +%% unilaterally broadcasts its length, in order to solve the problem +%% of length requests from new slaves being unanswered by a dead +%% master. +%% +%% Obviously, due to the async nature of communication across gm, the +%% slaves can fall behind. This does not matter from a sync pov: if +%% they fall behind and the master dies then a) no publishes are lost +%% because all publishes go to all mirrors anyway; b) the worst that +%% happens is that acks get lost and so messages come back to +%% life. This is no worse than normal given you never get confirmation +%% that an ack has been received (not quite true with QoS-prefetch, +%% but close enough for jazz). %% %% Because acktags are issued by the bq independently, and because %% there is no requirement for the master and all slaves to use the @@ -279,8 +300,8 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). +start_link(Queue, GM, DeathFun, LengthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, - gm = GM1, - monitors = dict:new(), - death_fun = DeathFun }, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun, + length_fun = LengthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -329,6 +351,10 @@ handle_cast({gm_deaths, Deaths}, {stop, normal, State} end; +handle_cast(request_length, State = #state { length_fun = LengthFun }) -> + ok = LengthFun(), + noreply(State); + handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Monitors }) -> Monitors1 = @@ -343,13 +369,12 @@ handle_cast({ensure_monitoring, Pids}, handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, - death_fun = Fun }) -> - noreply( - case dict:is_key(Pid, Monitors) of - false -> State; - true -> ok = Fun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } - end); + death_fun = DeathFun }) -> + noreply(case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = DeathFun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -379,6 +404,8 @@ members_changed([CPid], _Births, Deaths) -> handle_msg([_CPid], _From, heartbeat) -> ok; +handle_msg([CPid], _From, request_length = Msg) -> + ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([_CPid], _From, _Msg) -> |