diff options
| author | Tim Watson <tim@rabbitmq.com> | 2012-09-24 12:17:23 +0100 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2012-09-24 12:17:23 +0100 |
| commit | 7739d5a402b7bbc564e36030147bc6f936fcf92f (patch) | |
| tree | 7da5192ebb2d876a93b858aae75667a082ad71a4 /src/rabbit_mirror_queue_slave.erl | |
| parent | 48a77b93a2ef54deb201fcc3dc085239601cd03e (diff) | |
| parent | aca8685a571472041fdd34e6ff5f7f22e86da932 (diff) | |
| download | rabbitmq-server-bug25148.tar.gz | |
merge default into bug25148bug25148
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 205 |
1 files changed, 96 insertions, 109 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964c3e24..1f6567e0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -70,14 +70,15 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag ack_num, msg_id_status, known_senders, - synchronised + %% Master depth - local depth + depth_delta }). start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -166,27 +167,10 @@ init_it(Self, Node, QueueName) -> end end. -handle_call({deliver, Delivery = #delivery { immediate = true }}, - From, State) -> - %% It is safe to reply 'false' here even if a) we've not seen the - %% msg via gm, or b) the master dies before we receive the msg via - %% gm. In the case of (a), we will eventually receive the msg via - %% gm, and it's only the master's result to the channel that is - %% important. In the case of (b), if the master does die and we do - %% get promoted then at that point we have no consumers, thus - %% 'false' is precisely the correct answer. However, we must be - %% careful to _not_ enqueue the message in this case. - - %% Note this is distinct from the case where we receive the msg - %% via gm first, then we're promoted to master, and only then do - %% we receive the msg from the channel. - gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, false, State)); - -handle_call({deliver, Delivery = #delivery { mandatory = true }}, - From, State) -> - gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, true, State)); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -231,12 +215,12 @@ handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); noflow -> ok end, - noreply(maybe_enqueue_message(Delivery, true, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -385,7 +369,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -553,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - {Delivery, true} <- queue:to_list(PubQ)], + Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -654,14 +638,13 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, sender = ChPid }, - EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> @@ -731,10 +714,9 @@ process_instruction( {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid}, MS)}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. @@ -746,7 +728,7 @@ process_instruction( ChPid, [MsgSeqNo]), MS end}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -783,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {empty, _MQ} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, discarded, MS)}; - {{value, {#delivery { message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, #delivery { message = #basic_message { id = MsgId } }}, + MQ2} -> %% We've already seen it from the channel, we're not %% going to see this again, so don't add it to MS {MQ2, PendingCh, MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -800,43 +782,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, +process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - ToDrop = QLen - Length, - {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + ToDrop = case QLen - Length of + N when N > 0 -> N; + _ -> 0 + end, + State1 = lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = + BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + {ok, case AckRequired of + true -> State1; + false -> set_synchronised(ToDrop - Dropped, State1) + end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State - end}; + {State1, Delta} = + case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + {maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }), + 0}; + _ when QLen =< Remaining -> + {State, case AckRequired of + true -> 0; + false -> -1 + end} + end, + {ok, set_synchronised(Delta, State1)}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -844,27 +828,17 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; + {ok, set_synchronised(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -882,10 +856,11 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, +process_instruction({depth, Depth}, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; + {ok, set_synchronised( + 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -904,9 +879,6 @@ msg_ids_to_acktags(MsgIds, MA) -> end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, @@ -914,23 +886,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -%% We intentionally leave out the head where a slave becomes -%% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, - synchronised = false }) -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q2) - end - end), - State #state { synchronised = true }; -set_synchronised(true, State) -> +set_synchronised(Delta, State) -> + set_synchronised(Delta, false, State). + +set_synchronised(_Delta, _AddAnyway, + State = #state { depth_delta = undefined }) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> - State. +set_synchronised(Delta, AddAnyway, + State = #state { depth_delta = DepthDelta, + q = #amqqueue { name = QName }}) -> + DepthDelta1 = DepthDelta + Delta, + %% We intentionally leave out the head where a slave becomes + %% unsynchronised: we assert that can never happen. + %% The `AddAnyway' param is there since in the `depth' instruction we + %% receive the master depth for the first time, and we want to set the sync + %% state anyway if we are synced. + case DepthDelta1 =:= 0 of + true when not (DepthDelta =:= 0) orelse AddAnyway -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + %% We might be there already, in the `AddAnyway' + %% case + SSPids1 = SSPids -- [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) + end + end); + _ when DepthDelta1 >= 0 -> + ok + end, + State #state { depth_delta = DepthDelta1 }. |
