diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-04 15:42:22 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-04 15:42:22 +0100 |
commit | f0c218288954052e4786e65228f5ad8be24c85ec (patch) | |
tree | 7449b2d4a7418b661866292ecc6701a05e5aa2af | |
parent | c4d78500e17f080300710a9e36dcb85740c13675 (diff) | |
download | rabbitmq-server-f0c218288954052e4786e65228f5ad8be24c85ec.tar.gz |
track the delta of the depths, and replace `pending_ack' with `depth' in BQ
The kill-multi test is still failing...
-rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 19 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 92 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
4 files changed, 52 insertions, 71 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index eac1db2f..d69a6c3b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -152,8 +152,8 @@ %% Is my queue empty? -callback is_empty(state()) -> boolean(). -%% How many pending acks do we have? --callback pending_ack(state()) -> non_neg_integer(). +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). %% For the next three functions, the assumption is that you're %% monitoring something like the ingress and egress rates of the @@ -215,7 +215,7 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, - {is_empty, 1}, {pending_ack, 1}, {set_ram_duration_target, 2}, + {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 62109dae..ad66d059 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, pending_ack/1, drain_confirmed/1, + requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -274,8 +274,8 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:is_empty(BQS). -pending_ack(#state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:pending_ack(BQS). +depth(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:depth(BQS). set_ram_duration_target(Target, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -407,7 +407,7 @@ length_fun() -> backing_queue = BQ, backing_queue_state = BQS }) -> ok = gm:broadcast( - GM, {depth, depth(BQ, BQS)}), + GM, {depth, BQ:depth(BQS)}), State end) end. @@ -425,10 +425,3 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid, CPid, [ChPid]), State #state { known_senders = sets:add_element(ChPid, KS) } end. - -%% --------------------------------------------------------------------------- -%% Internal exports -%% --------------------------------------------------------------------------- - -depth(BQ, BQS) -> - BQ:len(BQS) + BQ:pending_ack(BQS). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6f3d1382..ee65a0a7 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -77,10 +77,8 @@ msg_id_status, known_senders, - %% The depth is the BQ len + the number of messages pending - %% acks. - depth, - master_depth + %% Master depth - local depth + depth_delta }). start_link(Q) -> @@ -134,8 +132,7 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - depth = 0, - master_depth = undefined + depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -395,7 +392,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { depth = D, master_depth = MD }) -> D =:= MD; +i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -770,22 +767,16 @@ process_instruction( SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, - {State3, Delta} = - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - {State2 #state { backing_queue_state = BQS1 }, 1}; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered( - AckRequired, Msg, MsgProps, ChPid, BQS), - {maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state {backing_queue_state = BQS1}), - case AckRequired of - true -> 1; - false -> 0 - end} - end, - {ok, set_synchronised(Delta, Delta, State3)}; + {ok, case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State2 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered( + AckRequired, Msg, MsgProps, ChPid, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State2 #state {backing_queue_state = BQS1}) + end}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, backing_queue = BQ, @@ -835,30 +826,27 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of true -> State1; - false -> set_synchronised(-ToDrop, -Dropped, State1) + false -> set_synchronised(ToDrop - Dropped, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {State1, {Delta, MasterDelta}} = + {State1, Delta} = case {QLen - 1} of Remaining -> {{#basic_message{id = MsgId}, _IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }), - case AckRequired of - true -> {0, 0}; - false -> {-1, -1} - end}; + 0}; _ when QLen =< Remaining -> {State, case AckRequired of - true -> {0, 0}; - false -> {0, -1} + true -> 0; + false -> -1 end} end, - {ok, set_synchronised(Delta, MasterDelta, State1)}; + {ok, set_synchronised(Delta, State1)}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -866,7 +854,7 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(-length(AckTags), -length(MsgIds), + {ok, set_synchronised(length(MsgIds1) - length(MsgIds), State #state { msg_id_ack = MA1, backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, @@ -895,8 +883,11 @@ process_instruction({sender_death, ChPid}, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({depth, Depth}, State) -> - {ok, set_synchronised(0, 0, true, State #state { master_depth = Depth })}; +process_instruction({depth, Depth}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised( + 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, @@ -923,26 +914,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(Delta, MasterDelta, State) -> - set_synchronised(Delta, MasterDelta, false, State). - -set_synchronised(Delta, _MasterDelta, _AddAnyway, - State = #state { depth = Depth, - master_depth = undefined }) -> - State #state { depth = Depth + Delta }; -set_synchronised(Delta, MasterDelta, AddAnyway, - State = #state { depth = Depth, - master_depth = MasterDepth, - q = #amqqueue { name = QName }}) -> - Depth1 = Depth + Delta, - MasterDepth1 = MasterDepth + MasterDelta, +set_synchronised(Delta, State) -> + set_synchronised(Delta, false, State). + +set_synchronised(_Delta, _AddAnyway, + State = #state { depth_delta = undefined }) -> + State; +set_synchronised(Delta, AddAnyway, + State = #state { depth_delta = DepthDelta, + q = #amqqueue { name = QName }}) -> + DepthDelta1 = DepthDelta + Delta, %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. %% The `AddAnyway' param is there since in the `depth' instruction we %% receive the master depth for the first time, and we want to set the sync %% state anyway if we are synced. - case {Depth =:= MasterDepth, Depth1 =:= MasterDepth1} of - {WasSync, true} when not WasSync orelse AddAnyway -> + case DepthDelta1 =:= 0 of + true when not (DepthDelta =:= 0) orelse AddAnyway -> Self = self(), rabbit_misc:execute_mnesia_transaction( fun () -> @@ -957,7 +945,7 @@ set_synchronised(Delta, MasterDelta, AddAnyway, Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) end end); - {Same, Same} -> + _ when DepthDelta1 > 0-> ok end, - State #state { depth = Depth1, master_depth = MasterDepth1 }. + State #state { depth_delta = DepthDelta1 }. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 22829765..98c45717 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,7 +19,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - pending_ack/1, set_ram_duration_target/2, ram_duration/1, + depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). @@ -681,8 +681,8 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -pending_ack(#vqstate { pending_ack = Ack }) -> - gb_trees:size(Ack). +depth(State = #vqstate { pending_ack = Ack }) -> + len(State) + gb_trees:size(Ack). set_ram_duration_target( DurationTarget, State = #vqstate { |