diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 148 |
1 files changed, 67 insertions, 81 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e4d78c45..964c3e24 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -19,17 +19,8 @@ %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator %% -%% We join the GM group before we add ourselves to the amqqueue -%% record. As a result: -%% 1. We can receive msgs from GM that correspond to messages we will -%% never receive from publishers. -%% 2. When we receive a message from publishers, we must receive a -%% message from the GM group for it. -%% 3. However, that instruction from the GM group can arrive either -%% before or after the actual message. We need to be able to -%% distinguish between GM instructions arriving early, and case (1) -%% above. -%% +%% We receive messages from GM and from publishers, and the gm +%% messages can arrive either before or after the 'actual' message. %% All instructions from the GM group must be processed in the order %% in which they're received. @@ -89,28 +80,33 @@ synchronised }). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -info(QPid) -> - gen_server2:call(QPid, info, infinity). +info(QPid) -> gen_server2:call(QPid, info, infinity). init(#amqqueue { name = QueueName } = Q) -> + %% We join the GM group before we add ourselves to the amqqueue + %% record. As a result: + %% 1. We can receive msgs from GM that correspond to messages we will + %% never receive from publishers. + %% 2. When we receive a message from publishers, we must receive a + %% message from the GM group for it. + %% 3. However, that instruction from the GM group can arrive either + %% before or after the actual message. We need to be able to + %% distinguish between GM instructions arriving early, and case (1) + %% above. + %% + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> ok end, Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction(fun() -> - init_it(Self, Node, - QueueName) - end) of + case rabbit_misc:execute_mnesia_transaction( + fun() -> init_it(Self, Node, QueueName) end) of {new, MPid} -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), @@ -153,24 +149,21 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> - MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), - {new, QPid}; - [QPid] -> - case rabbit_misc:is_process_alive(QPid) of - true -> duplicate_live_master; - false -> {stale, QPid} - end; - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), - {new, QPid} - end + [] -> MPids1 = MPids ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid}; + [QPid] -> case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> MPids1 = (MPids -- [SPid]) ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid} + end end. handle_call({deliver, Delivery = #delivery { immediate = true }}, @@ -356,14 +349,10 @@ prioritise_info(Msg, _State) -> %% GM %% --------------------------------------------------------------------------- -joined([SPid], _Members) -> - SPid ! {joined, self()}, - ok. +joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> - ok; -members_changed([SPid], _Births, Deaths) -> - inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, []) -> ok; +members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). handle_msg([_SPid], _From, master_changed) -> ok; @@ -584,9 +573,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 })), case BQ:needs_timeout(BQS1) of - false -> {stop_sync_timer(State1), hibernate}; - idle -> {stop_sync_timer(State1), 0 }; - timed -> {ensure_sync_timer(State1), 0 } + false -> {stop_sync_timer(State1), hibernate }; + idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; + timed -> {ensure_sync_timer(State1), 0 } end. backing_queue_timeout(State = #state { backing_queue = BQ }) -> @@ -680,26 +669,24 @@ maybe_enqueue_message( %% msg_seq_no was at the time. We do now! ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { sender_queues = SQ1, - msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State1) of - never -> - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - eventually -> - State1 #state { - msg_id_status = - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 } - end; + {MS1, SQ1} = + case needs_confirming(Delivery, State1) of + never -> {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)}; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + {dict:store(MsgId, MMS, MS), SQ}; + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)} + end, + State1 #state { msg_id_status = MS1, + sender_queues = SQ1 }; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. @@ -748,18 +735,17 @@ process_instruction( msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }, _EnqueueOnPromotion}}, MQ2} -> - %% We received the msg from the channel first. Thus we - %% need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> - {MQ2, PendingCh, MS}; - eventually -> - {MQ2, PendingCh, - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - {MQ2, PendingCh, MS} - end; + {MQ2, PendingCh, + %% We received the msg from the channel first. Thus + %% we need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> MS; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + dict:store(MsgId, MMS , MS); + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + MS + end}; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} |