summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-04 15:42:22 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-04 15:42:22 +0100
commitf0c218288954052e4786e65228f5ad8be24c85ec (patch)
tree7449b2d4a7418b661866292ecc6701a05e5aa2af
parentc4d78500e17f080300710a9e36dcb85740c13675 (diff)
downloadrabbitmq-server-f0c218288954052e4786e65228f5ad8be24c85ec.tar.gz
track the delta of the depths, and replace `pending_ack' with `depth' in BQ
The kill-multi test is still failing...
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl19
-rw-r--r--src/rabbit_mirror_queue_slave.erl92
-rw-r--r--src/rabbit_variable_queue.erl6
4 files changed, 52 insertions, 71 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index eac1db2f..d69a6c3b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -152,8 +152,8 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
-%% How many pending acks do we have?
--callback pending_ack(state()) -> non_neg_integer().
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
@@ -215,7 +215,7 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {pending_ack, 1}, {set_ram_duration_target, 2},
+ {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
{discard, 3}];
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 62109dae..ad66d059 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, pending_ack/1, drain_confirmed/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -274,8 +274,8 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
-pending_ack(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:pending_ack(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -407,7 +407,7 @@ length_fun() ->
backing_queue = BQ,
backing_queue_state = BQS }) ->
ok = gm:broadcast(
- GM, {depth, depth(BQ, BQS)}),
+ GM, {depth, BQ:depth(BQS)}),
State
end)
end.
@@ -425,10 +425,3 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
end.
-
-%% ---------------------------------------------------------------------------
-%% Internal exports
-%% ---------------------------------------------------------------------------
-
-depth(BQ, BQS) ->
- BQ:len(BQS) + BQ:pending_ack(BQS).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6f3d1382..ee65a0a7 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,10 +77,8 @@
msg_id_status,
known_senders,
- %% The depth is the BQ len + the number of messages pending
- %% acks.
- depth,
- master_depth
+ %% Master depth - local depth
+ depth_delta
}).
start_link(Q) ->
@@ -134,8 +132,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- depth = 0,
- master_depth = undefined
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -395,7 +392,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { depth = D, master_depth = MD }) -> D =:= MD;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -770,22 +767,16 @@ process_instruction(
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
- {State3, Delta} =
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- {State2 #state { backing_queue_state = BQS1 }, 1};
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(
- AckRequired, Msg, MsgProps, ChPid, BQS),
- {maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state {backing_queue_state = BQS1}),
- case AckRequired of
- true -> 1;
- false -> 0
- end}
- end,
- {ok, set_synchronised(Delta, Delta, State3)};
+ {ok, case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State2 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(
+ AckRequired, Msg, MsgProps, ChPid, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State2 #state {backing_queue_state = BQS1})
+ end};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
backing_queue = BQ,
@@ -835,30 +826,27 @@ process_instruction({drop, Length, Dropped, AckRequired},
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
true -> State1;
- false -> set_synchronised(-ToDrop, -Dropped, State1)
+ false -> set_synchronised(ToDrop - Dropped, State1)
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {State1, {Delta, MasterDelta}} =
+ {State1, Delta} =
case {QLen - 1} of
Remaining ->
{{#basic_message{id = MsgId}, _IsDelivered,
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
{maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 }),
- case AckRequired of
- true -> {0, 0};
- false -> {-1, -1}
- end};
+ 0};
_ when QLen =< Remaining ->
{State, case AckRequired of
- true -> {0, 0};
- false -> {0, -1}
+ true -> 0;
+ false -> -1
end}
end,
- {ok, set_synchronised(Delta, MasterDelta, State1)};
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -866,7 +854,7 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(-length(AckTags), -length(MsgIds),
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
@@ -895,8 +883,11 @@ process_instruction({sender_death, ChPid},
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({depth, Depth}, State) ->
- {ok, set_synchronised(0, 0, true, State #state { master_depth = Depth })};
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
@@ -923,26 +914,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(Delta, MasterDelta, State) ->
- set_synchronised(Delta, MasterDelta, false, State).
-
-set_synchronised(Delta, _MasterDelta, _AddAnyway,
- State = #state { depth = Depth,
- master_depth = undefined }) ->
- State #state { depth = Depth + Delta };
-set_synchronised(Delta, MasterDelta, AddAnyway,
- State = #state { depth = Depth,
- master_depth = MasterDepth,
- q = #amqqueue { name = QName }}) ->
- Depth1 = Depth + Delta,
- MasterDepth1 = MasterDepth + MasterDelta,
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
+ State;
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
%% The `AddAnyway' param is there since in the `depth' instruction we
%% receive the master depth for the first time, and we want to set the sync
%% state anyway if we are synced.
- case {Depth =:= MasterDepth, Depth1 =:= MasterDepth1} of
- {WasSync, true} when not WasSync orelse AddAnyway ->
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -957,7 +945,7 @@ set_synchronised(Delta, MasterDelta, AddAnyway,
Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
end
end);
- {Same, Same} ->
+ _ when DepthDelta1 > 0->
ok
end,
- State #state { depth = Depth1, master_depth = MasterDepth1 }.
+ State #state { depth_delta = DepthDelta1 }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 22829765..98c45717 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,7 +19,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- pending_ack/1, set_ram_duration_target/2, ram_duration/1,
+ depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
@@ -681,8 +681,8 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-pending_ack(#vqstate { pending_ack = Ack }) ->
- gb_trees:size(Ack).
+depth(State = #vqstate { pending_ack = Ack }) ->
+ len(State) + gb_trees:size(Ack).
set_ram_duration_target(
DurationTarget, State = #vqstate {