summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-09-14 15:06:05 +0100
committerTim Watson <tim@rabbitmq.com>2012-09-14 15:06:05 +0100
commit947a900f6597325433659b737fc12924f227f940 (patch)
tree661a540174c9ad07be761424a655fdc25acc131b
parentef341f575b7a96c4c0c958e9e3400dcf73991597 (diff)
parent6a5d5733a1254472eefb5a69bc946ccfb94ca33f (diff)
downloadrabbitmq-server-bug25118.tar.gz
merge default into bug25118bug25118
-rw-r--r--src/rabbit_backing_queue.erl10
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl44
-rw-r--r--src/rabbit_mirror_queue_master.erl17
-rw-r--r--src/rabbit_mirror_queue_slave.erl154
-rw-r--r--src/rabbit_variable_queue.erl7
5 files changed, 127 insertions, 105 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ed5340fe..d69a6c3b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -152,6 +152,9 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
+
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
@@ -212,9 +215,10 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
- {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
- {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
+ {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
+ {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
+ {discard, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 10debb0b..4455b441 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -132,25 +132,31 @@
%% gm should be processed as normal, but fetches which are for
%% messages the slave has never seen should be ignored. Similarly,
%% acks for messages the slave never fetched should be
-%% ignored. Eventually, as the master is consumed from, the messages
-%% at the head of the queue which were there before the slave joined
-%% will disappear, and the slave will become fully synced with the
-%% state of the master. The detection of the sync-status of a slave is
-%% done entirely based on length: if the slave and the master both
-%% agree on the length of the queue after the fetch of the head of the
-%% queue (or a 'set_length' results in a slave having to drop some
-%% messages from the head of its queue), then the queues must be in
-%% sync. The only other possibility is that the slave's queue is
-%% shorter, and thus the fetch should be ignored. In case slaves are
-%% joined to an empty queue which only goes on to receive publishes,
-%% they start by asking the master to broadcast its length. This is
-%% enough for slaves to always be able to work out when their head
-%% does not differ from the master (and is much simpler and cheaper
-%% than getting the master to hang on to the guid of the msg at the
-%% head of its queue). When a slave is promoted to a master, it
-%% unilaterally broadcasts its length, in order to solve the problem
-%% of length requests from new slaves being unanswered by a dead
-%% master.
+%% ignored. Similarly, we don't republish rejected messages that we
+%% haven't seen. Eventually, as the master is consumed from, the
+%% messages at the head of the queue which were there before the slave
+%% joined will disappear, and the slave will become fully synced with
+%% the state of the master.
+%%
+%% The detection of the sync-status is based on the depth of the BQs,
+%% where the depth is defined as the sum of the length of the BQ (as
+%% per BQ:len) and the messages pending an acknowledgement. When the
+%% depth of the slave is equal to the master's, then the slave is
+%% synchronised. We only store the difference between the two for
+%% simplicity. Comparing the length is not enough since we need to
+%% take into account rejected messages which will make it back into
+%% the master queue but can't go back in the slave, since we don't
+%% want "holes" in the slave queue. Note that the depth, and the
+%% length likewise, must always be shorter on the slave - we assert
+%% that in various places. In case slaves are joined to an empty queue
+%% which only goes on to receive publishes, they start by asking the
+%% master to broadcast its depth. This is enough for slaves to always
+%% be able to work out when their head does not differ from the master
+%% (and is much simpler and cheaper than getting the master to hang on
+%% to the guid of the msg at the head of its queue). When a slave is
+%% promoted to a master, it unilaterally broadcasts its length, in
+%% order to solve the problem of length requests from new slaves being
+%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
%% slaves can fall behind. This does not matter from a sync pov: if
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3..fb9f7e34 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -274,6 +274,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
@@ -372,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -403,7 +406,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State
end)
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964c3e24..3e45f026 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,7 +77,8 @@
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -385,7 +386,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -800,43 +801,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> set_synchronised(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {State1, Delta} =
+ case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ 0};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> 0;
+ false -> -1
+ end}
+ end,
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -844,27 +847,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -882,10 +875,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -904,9 +898,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -914,23 +905,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2)
- end
- end),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
+ %% We intentionally leave out the head where a slave becomes
+ %% unsynchronised: we assert that can never happen.
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
+ end
+ end);
+ _ when DepthDelta1 >= 0 ->
+ ok
+ end,
+ State #state { depth_delta = DepthDelta1 }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bd606dfb..98c45717 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,8 +19,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
- timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ depth/1, set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+depth(State = #vqstate { pending_ack = Ack }) ->
+ len(State) + gb_trees:size(Ack).
+
set_ram_duration_target(
DurationTarget, State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,