summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-09-24 12:17:23 +0100
committerTim Watson <tim@rabbitmq.com>2012-09-24 12:17:23 +0100
commit7739d5a402b7bbc564e36030147bc6f936fcf92f (patch)
tree7da5192ebb2d876a93b858aae75667a082ad71a4 /src/rabbit_mirror_queue_slave.erl
parent48a77b93a2ef54deb201fcc3dc085239601cd03e (diff)
parentaca8685a571472041fdd34e6ff5f7f22e86da932 (diff)
downloadrabbitmq-server-bug25148.tar.gz
merge default into bug25148bug25148
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl205
1 files changed, 96 insertions, 109 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964c3e24..1f6567e0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -70,14 +70,15 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -166,27 +167,10 @@ init_it(Self, Node, QueueName) ->
end
end.
-handle_call({deliver, Delivery = #delivery { immediate = true }},
- From, State) ->
- %% It is safe to reply 'false' here even if a) we've not seen the
- %% msg via gm, or b) the master dies before we receive the msg via
- %% gm. In the case of (a), we will eventually receive the msg via
- %% gm, and it's only the master's result to the channel that is
- %% important. In the case of (b), if the master does die and we do
- %% get promoted then at that point we have no consumers, thus
- %% 'false' is precisely the correct answer. However, we must be
- %% careful to _not_ enqueue the message in this case.
-
- %% Note this is distinct from the case where we receive the msg
- %% via gm first, then we're promoted to master, and only then do
- %% we receive the msg from the channel.
- gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, false, State));
-
-handle_call({deliver, Delivery = #delivery { mandatory = true }},
- From, State) ->
- gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, true, State));
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -231,12 +215,12 @@ handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(maybe_enqueue_message(Delivery, true, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -385,7 +369,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -553,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- {Delivery, true} <- queue:to_list(PubQ)],
+ Delivery <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -654,14 +638,13 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
sender = ChPid },
- EnqueueOnPromotion,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
@@ -731,10 +714,9 @@ process_instruction(
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid}, MS)};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
@@ -746,7 +728,7 @@ process_instruction(
ChPid, [MsgSeqNo]),
MS
end};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -783,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{empty, _MQ} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, discarded, MS)};
- {{value, {#delivery { message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, #delivery { message = #basic_message { id = MsgId } }},
+ MQ2} ->
%% We've already seen it from the channel, we're not
%% going to see this again, so don't add it to MS
{MQ2, PendingCh, MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -800,43 +782,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> set_synchronised(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {State1, Delta} =
+ case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ 0};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> 0;
+ false -> -1
+ end}
+ end,
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -844,27 +828,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -882,10 +856,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -904,9 +879,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -914,23 +886,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2)
- end
- end),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
+ %% We intentionally leave out the head where a slave becomes
+ %% unsynchronised: we assert that can never happen.
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
+ end
+ end);
+ _ when DepthDelta1 >= 0 ->
+ ok
+ end,
+ State #state { depth_delta = DepthDelta1 }.