From 63c9e1de03ec055598beed856aef4f2cddc50ff0 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Mon, 11 Oct 2010 15:54:48 +0100 Subject: Started recording the acks that are stored as full messages in memory --- src/rabbit_variable_queue.erl | 61 +++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbc71bcc..208f3d56 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,6 +220,7 @@ q4, next_seq_id, pending_ack, + pending_ack_index, index_state, msg_store_clients, on_sync, @@ -305,6 +306,7 @@ q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict:dictionary(), + pending_ack_index :: gb_trees:gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -407,6 +409,7 @@ init(QueueName, IsDurable, Recover) -> q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), + pending_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, @@ -509,27 +512,24 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + persistent_count = PCount1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of @@ -560,24 +560,24 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, + State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, + a(State1 #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + persistent_count = PCount1})} end. ack(AckTags, State) -> @@ -1090,19 +1090,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_on_disk = MsgOnDisk } = MsgStatus, + State = #vqstate { pending_ack = PA, + pending_ack_index = PAI }) -> + {AckEntry, PAI1} = + case MsgOnDisk of + true -> + {{IsPersistent, Guid}, PAI}; + false -> + {MsgStatus, gb_trees:insert(SeqId, Guid, PAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, pending_ack_index = PAI1 }. +%% TODO: On remove, need to prevent any seqids that remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + State1 = State #vqstate { pending_ack = dict:new(), + pending_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of error -> State1; @@ -1124,11 +1132,14 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + pending_ack_index = PAI }}) -> {ok, AckEntry} = dict:find(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} + pending_ack = dict:erase(SeqId, PA), + pending_ack_index = + gb_trees:delete_any(SeqId, PAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> -- cgit v1.2.1 From 13b4bea3b3f437cb1f066dd9e16b7611cf4b2a7c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 14 Oct 2010 12:57:44 +0100 Subject: rename pending_ack_index to ram_ack_index --- src/rabbit_variable_queue.erl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 208f3d56..28055af3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,7 +220,7 @@ q4, next_seq_id, pending_ack, - pending_ack_index, + ram_ack_index, index_state, msg_store_clients, on_sync, @@ -306,7 +306,7 @@ q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict:dictionary(), - pending_ack_index :: gb_trees:gb_tree(), + ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -409,7 +409,7 @@ init(QueueName, IsDurable, Recover) -> q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), - pending_ack_index = gb_trees:empty(), + ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, @@ -1092,16 +1092,16 @@ record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk } = MsgStatus, State = #vqstate { pending_ack = PA, - pending_ack_index = PAI }) -> - {AckEntry, PAI1} = + ram_ack_index = RAI }) -> + {AckEntry, RAI1} = case MsgOnDisk of true -> - {{IsPersistent, Guid}, PAI}; + {{IsPersistent, Guid}, RAI}; false -> - {MsgStatus, gb_trees:insert(SeqId, Guid, PAI)} + {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), - State #vqstate { pending_ack = PA1, pending_ack_index = PAI1 }. + State #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }. %% TODO: On remove, need to prevent any seqids that remove_pending_ack(KeepPersistent, @@ -1110,7 +1110,7 @@ remove_pending_ack(KeepPersistent, {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new(), - pending_ack_index = gb_trees:empty() }, + ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of error -> State1; @@ -1133,13 +1133,13 @@ ack(MsgStoreFun, Fun, AckTags, State) -> persistent_count = PCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, - pending_ack_index = PAI }}) -> + ram_ack_index = RAI }}) -> {ok, AckEntry} = dict:find(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA), - pending_ack_index = - gb_trees:delete_any(SeqId, PAI)})} + ram_ack_index = + gb_trees:delete_any(SeqId, RAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> -- cgit v1.2.1 From ae551e5eed1320800f0851d267f3c8192abf64ec Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 14 Oct 2010 14:19:21 +0100 Subject: experimenting with moving acks to disk --- src/rabbit_variable_queue.erl | 61 ++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 28055af3..90fbcf81 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -232,6 +232,7 @@ duration_target, target_ram_msg_count, + target_ram_ack_count, ram_msg_count, ram_msg_count_prev, ram_index_count, @@ -319,6 +320,7 @@ transient_threshold :: non_neg_integer(), duration_target :: number() | 'infinity', target_ram_msg_count :: non_neg_integer() | 'infinity', + target_ram_ack_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), @@ -422,6 +424,7 @@ init(QueueName, IsDurable, Recover) -> duration_target = infinity, target_ram_msg_count = infinity, + target_ram_ack_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_index_count = 0, @@ -651,10 +654,16 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +within_limits(Previous, Current) -> + Current == infinity + orelse (Previous =/= infinity andalso Current >= Previous). + set_ram_duration_target(DurationTarget, State = #vqstate { + len = Len, pending_ack = PA, rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, + target_ram_ack_count = TargetRamAckCount, target_ram_msg_count = TargetRamMsgCount }) -> Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = @@ -662,13 +671,24 @@ set_ram_duration_target(DurationTarget, infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, + + io:format("MSG: ~p[~p]~n", [TargetRamMsgCount1, Len]), + + TargetRamAckCount1 = case TargetRamMsgCount1 == infinity orelse Len == 0 of + true -> TargetRamMsgCount1; + _ -> (TargetRamMsgCount1 / Len) * dict:size(PA) + end, + + io:format("ACK: ~p~n", [TargetRamAckCount1]), + + State1 = State #vqstate { target_ram_ack_count = TargetRamAckCount1, + target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + + a(case within_limits(TargetRamMsgCount, TargetRamMsgCount1) andalso + within_limits(TargetRamAckCount, TargetRamAckCount1) of true -> State1; - false -> reduce_memory_use(State1) + false -> io:format("Reducing~n"), reduce_memory_use(State1) end). ram_duration(State = #vqstate { @@ -1188,21 +1208,32 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + Size = gb_trees:size(State #vqstate.ram_ack_index), + State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of + 0 -> State; + S -> limit_ram_acks(S, State) + end, + {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} end, - case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} + case State2 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State2}; + 0 -> {Reduce, BetaDeltaFun(State2)}; + _ -> case chunk_size(State2 #vqstate.ram_index_count, + permitted_ram_index_count(State2)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)}; + _ -> {Reduce, State2} end end. +limit_ram_acks(0, State) -> + State; +limit_ram_acks(Quota, State) -> + + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, -- cgit v1.2.1 From 3288425642fb401db913e5bf783515ebf218957c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 14 Oct 2010 19:22:11 +0100 Subject: rough sketch of pushing ram acks to disk. doesnt actually work... --- src/rabbit_variable_queue.erl | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90fbcf81..f09fb4d4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1211,7 +1211,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> Size = gb_trees:size(State #vqstate.ram_ack_index), State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of 0 -> State; - S -> limit_ram_acks(S, State) + S -> io:format("Limiting~n"), limit_ram_acks(S, State) end, {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, State #vqstate.target_ram_msg_count) of @@ -1230,8 +1230,24 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> limit_ram_acks(0, State) -> State; -limit_ram_acks(Quota, State) -> - +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + io:format("Limiting acks~p~n", [Quota]), + case gb_trees:is_empty(RAI) of + true -> + State; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + io:format("Largest~p~n", [SeqId]), + MsgStatus = dict:fetch(SeqId, PA), + State1 = maybe_write_to_disk(true, false, MsgStatus, State), + io:format("Wrote~n"), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:update(SeqId, {false, Guid}, PA), + ram_ack_index = RAI1 }) + end. reduce_memory_use(State) -> -- cgit v1.2.1 From 423a27191f248d75362ba6675c401d6968cf8497 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 2 Nov 2010 16:57:38 +0000 Subject: Calculating RAM duration takes into consideration ack rates --- src/rabbit_variable_queue.erl | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 97833991..d2f79eb6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -235,6 +235,7 @@ target_ram_ack_count, ram_msg_count, ram_msg_count_prev, + ram_ack_count_prev, ram_index_count, out_counter, in_counter, @@ -706,19 +707,34 @@ ram_duration(State = #vqstate { rates = #rates { egress = Egress, ingress = Ingress, timestamp = Timestamp } = Rates, + ack_rates = #rates { egress = AckEgress, + ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> + ram_msg_count_prev = RamMsgCountPrev, + ram_ack_index = RamAckIndex, + ram_ack_count_prev = RamAckCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - Duration = %% msgs / (msgs/sec) == sec + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, Timestamp, AckOutCount, AckEgress), + {AvgAckIngressRate, AckIngress1} = + update_rate(Now, Timestamp, AckInCount, AckIngress), + + RamAckCount = gb_trees:size(RamAckIndex), + + Duration = %% msgs+acks / (msgs+acks/sec) == sec case AvgEgressRate == 0 andalso AvgIngressRate == 0 of true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) + false -> (RamMsgCountPrev + RamMsgCount + + RamAckCount + RamAckCountPrev) / + (2 * (AvgEgressRate + AvgIngressRate + + AvgAckEgressRate + AvgAckIngressRate)) end, {Duration, State #vqstate { @@ -728,9 +744,15 @@ ram_duration(State = #vqstate { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate, timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, in_counter = 0, out_counter = 0, - ram_msg_count_prev = RamMsgCount }}. + ram_msg_count_prev = RamMsgCount, + ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, @@ -982,6 +1004,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, target_ram_msg_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, + ram_ack_count_prev = 0, ram_index_count = 0, out_counter = 0, in_counter = 0, @@ -996,7 +1019,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, ingress = {Now, 0}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = undefined } }, a(maybe_deltas_to_betas(State)). msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> -- cgit v1.2.1 From 3499cb05a36e41b456a807935811e097bd4a676d Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 3 Nov 2010 11:29:38 +0000 Subject: Further experimentation. Still not quite working --- src/rabbit_amqqueue_process.erl | 3 +++ src/rabbit_variable_queue.erl | 42 +++++++++++++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2c975b..83b360b8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -239,6 +239,7 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + io:format("Ensuring rate timer~n"), {ok, TRef} = timer:apply_after( ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, update_ram_duration, @@ -963,7 +964,9 @@ handle_cast({flush, ChPid}, State) -> handle_cast(update_ram_duration, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + io:format("Before ram duration~n"), {RamDuration, BQS1} = BQ:ram_duration(BQS), + io:format("RamDuration~p~n", [RamDuration]), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d2f79eb6..ad62248a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -688,8 +688,13 @@ set_ram_duration_target(DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, + ack_rates = + #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, target_ram_msg_count = TargetRamMsgCount }) -> - Rate = AvgEgressRate + AvgIngressRate, + io:format("set:~p~n", [DurationTarget]), + Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + + AvgAckIngressRate, TargetRamMsgCount1 = case DurationTarget of infinity -> infinity; @@ -717,6 +722,7 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCountPrev, ram_ack_index = RamAckIndex, ram_ack_count_prev = RamAckCountPrev }) -> + io:format("Setting ram duration~n"), Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -729,7 +735,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -737,6 +744,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, + io:format("Duration:~p~n", [Duration]), {Duration, State #vqstate { rates = Rates #rates { egress = Egress1, @@ -1335,14 +1343,11 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - Size = gb_trees:size(State #vqstate.ram_ack_index), - State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of - 0 -> State; - S -> io:format("Limiting~n"), limit_ram_acks(S, State) - end, - {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, - State #vqstate.target_ram_msg_count) of - 0 -> {false, State1}; + %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]), + {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State), + {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, + State1 #vqstate.target_ram_msg_count) of + 0 -> {ReduceAck, State1}; S1 -> {true, AlphaBetaFun(S1, State1)} end, case State2 #vqstate.target_ram_msg_count of @@ -1355,6 +1360,23 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. +reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) -> + {false, State}; +reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + ram_ack_index = RamAckIndex} ) -> + io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]), + PermittedAckCount = case TargetRamMsgCount > RamMsgCount of + true -> TargetRamMsgCount - RamMsgCount; + false -> 0 + end, + case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of + 0 -> {false, State}; + C -> {true, limit_ram_acks(C, State)} + end. + + + limit_ram_acks(0, State) -> State; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, -- cgit v1.2.1 From a14752efdaac19e4f771673e06c26c7b2037db39 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 3 Nov 2010 14:30:39 +0000 Subject: Roughly working ack shedding --- src/rabbit_amqqueue_process.erl | 5 +++++ src/rabbit_variable_queue.erl | 37 ++++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 83b360b8..9699ac32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -969,6 +969,7 @@ handle_cast(update_ram_duration, State = #q{backing_queue = BQ, io:format("RamDuration~p~n", [RamDuration]), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + io:format("Desired duration:~p~n", [DesiredDuration]), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), noreply(State#q{rate_timer_ref = just_measured, backing_queue_state = BQS2}); @@ -1025,11 +1026,15 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. +handle_post_hibernate(_) -> + io:format("hello~n"). + handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> + io:format("Hibernating~p~n", [self()]), BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ad62248a..491d4e38 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -722,7 +722,7 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCountPrev, ram_ack_index = RamAckIndex, ram_ack_count_prev = RamAckCountPrev }) -> - io:format("Setting ram duration~n"), + io:format("Ram duration~n"), Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -766,6 +766,7 @@ needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, + fun (_Quota, State1) -> State1 end, State), Res; needs_idle_timeout(_State) -> @@ -779,6 +780,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, pending_ack = PA, + ram_ack_index = RAI, on_sync = #sync { funs = From }, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, @@ -787,7 +789,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> + avg_ingress = AvgIngressRate }, + ack_rates = #rates { + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, @@ -795,6 +800,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, + {ram_ack_count , gb_trees:size(RAI)}, {outstanding_txns , length(From)}, {target_ram_msg_count , TargetRamMsgCount}, {ram_msg_count , RamMsgCount}, @@ -802,7 +808,9 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate} ]. + {avg_ingress_rate , AvgIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1342,9 +1350,8 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]), - {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State), +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> + {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State), {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, State1 #vqstate.target_ram_msg_count) of 0 -> {ReduceAck, State1}; @@ -1360,9 +1367,9 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. -reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) -> +reduce_ack_memory_use(_AckFun, State = #vqstate { target_ram_msg_count = infinity }) -> {false, State}; -reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount, +reduce_ack_memory_use(AckFun, State = #vqstate {target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_ack_index = RamAckIndex} ) -> io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]), @@ -1372,7 +1379,7 @@ reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount end, case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of 0 -> {false, State}; - C -> {true, limit_ram_acks(C, State)} + C -> {true, AckFun(C, State)} end. @@ -1381,20 +1388,19 @@ limit_ram_acks(0, State) -> State; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - io:format("Limiting acks~p~n", [Quota]), case gb_trees:is_empty(RAI) of true -> State; false -> {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), - io:format("Largest~p~n", [SeqId]), - MsgStatus = dict:fetch(SeqId, PA), - State1 = maybe_write_to_disk(true, false, MsgStatus, State), - io:format("Wrote~n"), + MsgStatus = #msg_status { + guid = Guid, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), + {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = - dict:update(SeqId, {false, Guid}, PA), + dict:store(SeqId, {false, Guid, MsgProps}, PA), ram_ack_index = RAI1 }) end. @@ -1403,6 +1409,7 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, fun push_betas_to_deltas/1, + fun limit_ram_acks/2, State), State1. -- cgit v1.2.1 From 1a45fb36b79da3c4c2531f36fd3647fd0e8b20ca Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 3 Nov 2010 15:19:05 +0000 Subject: cleaned up some io:format --- src/rabbit_amqqueue_process.erl | 3 --- src/rabbit_variable_queue.erl | 6 +----- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9699ac32..2cf7f268 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1026,9 +1026,6 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_post_hibernate(_) -> - io:format("hello~n"). - handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 491d4e38..3a8a913f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -692,7 +692,6 @@ set_ram_duration_target(DurationTarget, #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate }, target_ram_msg_count = TargetRamMsgCount }) -> - io:format("set:~p~n", [DurationTarget]), Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, TargetRamMsgCount1 = @@ -705,7 +704,7 @@ set_ram_duration_target(DurationTarget, (TargetRamMsgCount =/= infinity andalso TargetRamMsgCount1 >= TargetRamMsgCount) of true -> State1; - false -> io:format("Reducing~n"), reduce_memory_use(State1) + false -> reduce_memory_use(State1) end). ram_duration(State = #vqstate { @@ -722,7 +721,6 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCountPrev, ram_ack_index = RamAckIndex, ram_ack_count_prev = RamAckCountPrev }) -> - io:format("Ram duration~n"), Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -744,7 +742,6 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, - io:format("Duration:~p~n", [Duration]), {Duration, State #vqstate { rates = Rates #rates { egress = Egress1, @@ -1372,7 +1369,6 @@ reduce_ack_memory_use(_AckFun, State = #vqstate { target_ram_msg_count = infinit reduce_ack_memory_use(AckFun, State = #vqstate {target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_ack_index = RamAckIndex} ) -> - io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]), PermittedAckCount = case TargetRamMsgCount > RamMsgCount of true -> TargetRamMsgCount - RamMsgCount; false -> 0 -- cgit v1.2.1 From f0eaffef9b6eefb01721deb88d8c7b98c5753875 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 4 Nov 2010 12:25:43 +0000 Subject: Removed io:format calls --- src/rabbit_amqqueue_process.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cf7f268..fe2c975b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -239,7 +239,6 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - io:format("Ensuring rate timer~n"), {ok, TRef} = timer:apply_after( ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, update_ram_duration, @@ -964,12 +963,9 @@ handle_cast({flush, ChPid}, State) -> handle_cast(update_ram_duration, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - io:format("Before ram duration~n"), {RamDuration, BQS1} = BQ:ram_duration(BQS), - io:format("RamDuration~p~n", [RamDuration]), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - io:format("Desired duration:~p~n", [DesiredDuration]), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), noreply(State#q{rate_timer_ref = just_measured, backing_queue_state = BQS2}); @@ -1031,7 +1027,6 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - io:format("Hibernating~p~n", [self()]), BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = -- cgit v1.2.1 From c70c248fb698be85b8c30e4b9d20dd154e353091 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 4 Nov 2010 12:32:21 +0000 Subject: clean up --- src/rabbit_variable_queue.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3a8a913f..3b3f9c8d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1248,7 +1248,7 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> + true -> {{IsPersistent, Guid, MsgProps}, RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} @@ -1364,11 +1364,14 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> end end. -reduce_ack_memory_use(_AckFun, State = #vqstate { target_ram_msg_count = infinity }) -> +reduce_ack_memory_use(_AckFun, + State = #vqstate { target_ram_msg_count = infinity }) -> {false, State}; -reduce_ack_memory_use(AckFun, State = #vqstate {target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_ack_index = RamAckIndex} ) -> +reduce_ack_memory_use(AckFun, + State = #vqstate { + target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + ram_ack_index = RamAckIndex} ) -> PermittedAckCount = case TargetRamMsgCount > RamMsgCount of true -> TargetRamMsgCount - RamMsgCount; false -> 0 @@ -1395,7 +1398,7 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), limit_ram_acks(Quota - 1, State1 #vqstate { - pending_ack = + pending_ack = dict:store(SeqId, {false, Guid, MsgProps}, PA), ram_ack_index = RAI1 }) end. -- cgit v1.2.1 From 39740b60706338a45fec7c856c40026d086e5d0f Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 4 Nov 2010 12:52:46 +0000 Subject: Removed stray TODO --- src/rabbit_variable_queue.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3b3f9c8d..2ef4ee27 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1258,7 +1258,6 @@ record_pending_ack(#msg_status { seq_id = SeqId, ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. -%% TODO: On remove, need to prevent any seqids that remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, -- cgit v1.2.1 From 867c225ccf18ae85529f3125c82e651e26ab6632 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 4 Nov 2010 16:00:08 +0000 Subject: Added a test for the ram ack flushing. Tweaked the way RAM is shared between acks and msgs --- src/rabbit_tests.erl | 33 ++++++++++++++++++++++++++++++++- src/rabbit_variable_queue.erl | 20 +++++++++++++++----- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..8efcf239 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1865,9 +1865,40 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages < + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count , Len div 2}, + {ram_msg_count , Len div 2}]), + + %% quarter the allowed duration + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(Duration / 4, VQ5), + [{len, Len div 2}, + {target_ram_msg_count, Len div 8}, + {ram_msg_count, Len div 8}, + {ram_ack_count, 0}]), + + VQ6. + + test_dropwhile(VQ0) -> Count = 10, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2ef4ee27..57d1344e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1307,7 +1307,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount - length(AckTags) }. + ack_out_counter = AckOutCount + length(AckTags) }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1348,10 +1348,20 @@ find_persistent_count(LensByStore) -> %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State), - {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, - State1 #vqstate.target_ram_msg_count) of - 0 -> {ReduceAck, State1}; - S1 -> {true, AlphaBetaFun(S1, State1)} + + {Reduce, State2} = case ReduceAck of + true -> + %% Don't want to reduce the number of + %% ram messages if we might yet be able + %% to reduce more acks. + {true, State1}; + false -> + case chunk_size( + State1 #vqstate.ram_msg_count, + State1 #vqstate.target_ram_msg_count) of + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} + end end, case State2 #vqstate.target_ram_msg_count of infinity -> {Reduce, State2}; -- cgit v1.2.1 From e8821b7480c0c2df2536571647b1ed012ddc58f3 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 4 Nov 2010 22:51:56 +0000 Subject: Purely cosmetic --- src/rabbit_tests.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8efcf239..eecc27b3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1898,7 +1898,6 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6. - test_dropwhile(VQ0) -> Count = 10, -- cgit v1.2.1 From 6fde9cbafc73040b33cb750a8e429209df1ae473 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 00:11:30 +0000 Subject: Cosmetic change and first draft of comment updates --- src/rabbit_variable_queue.erl | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 57d1344e..8b36767c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -168,6 +168,22 @@ %% the latter) are both cheap and do require any scanning through qi %% segments. %% +%% Pending acks are recorded in memory either as the tuple {SeqId, +%% Guid, MsgProps} (tuple form) or as the message itself (message +%% form). Acks for persistent messages are always stored in the tuple +%% form. Acks for transient messages are also stored in tuple form if +%% the message has been forgotten to disk as part of the memory +%% reduction process. For transient messages that haven't already been +%% written to disk, acks are stored in message form to avoid the +%% overhead of writing to disk. +%% +%% During memory reduction, messages stored as transient ack records +%% are pushed out to disk before messages in the queue. More +%% precisely, messages from the queue will not be pushed out to disk +%% while the number of messages stored for acks is greater than +%% zero. Messages for acks are written to disk in batches of at most +%% ?IO_BATCH_SIZE. +%% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and %% msg_store.) @@ -1347,7 +1363,20 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> - {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State), + {Reduce, State2} = case reduce_ack_memory_use(AckFun, State) of + {true, State1} -> + %% Don't want to reduce the number of + %% ram messages if we might yet be able + %% to reduce more acks. + {true, State1}; + {false, State1} -> + case chunk_size( + State1 #vqstate.ram_msg_count, + State1 #vqstate.target_ram_msg_count) of + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} + end + end, {Reduce, State2} = case ReduceAck of true -> -- cgit v1.2.1 From 5d600413dee1ad71d4bb6efc1597e1c3e334fb9b Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 00:43:08 +0000 Subject: a bit more clarification on the comments --- src/rabbit_variable_queue.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8b36767c..11e55353 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -89,12 +89,14 @@ %% %% The duration indicated to us by the memory_monitor is used to %% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM. When we need to push alphas to -%% betas or betas to gammas, we favour writing out messages that are -%% further from the head of the queue. This minimises writes to disk, -%% as the messages closer to the tail of the queue stay in the queue -%% for longer, thus do not need to be replaced as quickly by sending -%% other messages to disk. +%% messages we should hold in RAM. We track the ingress and egress +%% rates for both messages and pending acks and rates for both are +%% included when calculating the number of messages to hold in +%% RAM. When we need to push alphas to betas or betas to gammas, we +%% favour writing out messages that are further from the head of the +%% queue. This minimises writes to disk, as the messages closer to the +%% tail of the queue stay in the queue for longer, thus do not need to +%% be replaced as quickly by sending other messages to disk. %% %% Whilst messages are pushed to disk and forgotten from RAM as soon %% as requested by a new setting of the queue RAM duration, the -- cgit v1.2.1 From 8a7809f7d02db1354a8d1d335031c76592a5e468 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 09:18:05 +0000 Subject: Clarification of the comments in VQ --- src/rabbit_variable_queue.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 11e55353..1b86320b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -91,7 +91,7 @@ %% calculate, given our current ingress and egress rates, how many %% messages we should hold in RAM. We track the ingress and egress %% rates for both messages and pending acks and rates for both are -%% included when calculating the number of messages to hold in +%% considered when calculating the number of messages to hold in %% RAM. When we need to push alphas to betas or betas to gammas, we %% favour writing out messages that are further from the head of the %% queue. This minimises writes to disk, as the messages closer to the @@ -179,12 +179,13 @@ %% written to disk, acks are stored in message form to avoid the %% overhead of writing to disk. %% -%% During memory reduction, messages stored as transient ack records -%% are pushed out to disk before messages in the queue. More -%% precisely, messages from the queue will not be pushed out to disk -%% while the number of messages stored for acks is greater than -%% zero. Messages for acks are written to disk in batches of at most -%% ?IO_BATCH_SIZE. +%% During memory reduction, acks stored in message form are converted +%% to tuple form, and the corresponding messages are pushed out to +%% disk. Message form acks are always pushed to disk before messages +%% stored in the queue. More precisely, messages from the queue will +%% not be pushed out to disk while the number of messages form acks is +%% greater than zero. Message form acks are converted to tuple form in +%% batches of at most ?IO_BATCH_SIZE. %% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and -- cgit v1.2.1 From 4767dd7933fa5343d17bbeb1befc3fbbeb04298b Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 09:30:37 +0000 Subject: That was quite special --- src/rabbit_variable_queue.erl | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1b86320b..b3f58ef9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1381,20 +1381,6 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> end end, - {Reduce, State2} = case ReduceAck of - true -> - %% Don't want to reduce the number of - %% ram messages if we might yet be able - %% to reduce more acks. - {true, State1}; - false -> - case chunk_size( - State1 #vqstate.ram_msg_count, - State1 #vqstate.target_ram_msg_count) of - 0 -> {false, State1}; - S1 -> {true, AlphaBetaFun(S1, State1)} - end - end, case State2 #vqstate.target_ram_msg_count of infinity -> {Reduce, State2}; 0 -> {Reduce, BetaDeltaFun(State2)}; -- cgit v1.2.1 From 40f91556e3d9a673d4a1c34f9e46f98052b92b45 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 12:54:03 +0000 Subject: Reworked to test to be more predictable --- src/rabbit_tests.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eecc27b3..4f543704 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1879,7 +1879,7 @@ test_variable_queue_ack_limiting(VQ0) -> VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), %% update stats for duration - {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), %% fetch half the messages {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), @@ -1890,10 +1890,10 @@ test_variable_queue_ack_limiting(VQ0) -> %% quarter the allowed duration VQ6 = check_variable_queue_status( - rabbit_variable_queue:set_ram_duration_target(Duration / 4, VQ5), + rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, - {target_ram_msg_count, Len div 8}, - {ram_msg_count, Len div 8}, + {target_ram_msg_count, 0}, + {ram_msg_count, 0}, {ram_ack_count, 0}]), VQ6. -- cgit v1.2.1 From 3ed9259814c38bc0b65317ab306740dacdea7c21 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 5 Nov 2010 14:59:21 +0000 Subject: Reworked ack shedding algorithm based on discussion with Matthew --- src/rabbit_variable_queue.erl | 125 ++++++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 54 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b3f58ef9..9c25540e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -181,11 +181,18 @@ %% %% During memory reduction, acks stored in message form are converted %% to tuple form, and the corresponding messages are pushed out to -%% disk. Message form acks are always pushed to disk before messages -%% stored in the queue. More precisely, messages from the queue will -%% not be pushed out to disk while the number of messages form acks is -%% greater than zero. Message form acks are converted to tuple form in -%% batches of at most ?IO_BATCH_SIZE. +%% disk. +%% +%% The order in which alphas are pushed to betas and message form acks +%% are pushed to disk is determined dynamically. We always prefer to +%% push messages for the source (alphas or acks) that is growing the +%% fastest (with growth measured as avg. ingress - avg. egress). In +%% each round of memory reduction a chunk of messages at most +%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The +%% fastest growing source will be reduced by as much of this chunk as +%% possible. If there is any remaining allocation in the chunk after +%% the first source has been reduced to zero, the second source will +%% be reduced by as much of the remaining chunk as possible. %% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and @@ -779,10 +786,10 @@ ram_duration(State = #vqstate { ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, - fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, State), Res; needs_idle_timeout(_State) -> @@ -1365,58 +1372,68 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> - {Reduce, State2} = case reduce_ack_memory_use(AckFun, State) of - {true, State1} -> - %% Don't want to reduce the number of - %% ram messages if we might yet be able - %% to reduce more acks. - {true, State1}; - {false, State1} -> - case chunk_size( - State1 #vqstate.ram_msg_count, - State1 #vqstate.target_ram_msg_count) of - 0 -> {false, State1}; - S1 -> {true, AlphaBetaFun(S1, State1)} - end - end, - - case State2 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State2}; - 0 -> {Reduce, BetaDeltaFun(State2)}; - _ -> case chunk_size(State2 #vqstate.ram_index_count, - permitted_ram_index_count(State2)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)}; - _ -> {Reduce, State2} - end - end. - -reduce_ack_memory_use(_AckFun, - State = #vqstate { target_ram_msg_count = infinity }) -> +reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, + State = #vqstate {target_ram_msg_count = infinity}) -> {false, State}; -reduce_ack_memory_use(AckFun, - State = #vqstate { - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_ack_index = RamAckIndex} ) -> - PermittedAckCount = case TargetRamMsgCount > RamMsgCount of - true -> TargetRamMsgCount - RamMsgCount; - false -> 0 - end, - case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of - 0 -> {false, State}; - C -> {true, AckFun(C, State)} - end. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, + State = #vqstate { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + rates = #rates { + avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { + avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } }) -> + + {Reduce, State1} = + case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + TargetRamMsgCount) of + 0 -> + {false, State}; + S1 -> + ReduceFuns = + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> + %% ACKs are growing faster than the queue, + %% push messages from there first. + [AckFun, AlphaBetaFun]; + false -> + %% The queue is growing faster than the + %% acks, push queue messages first. + [AlphaBetaFun, AckFun] + end, + {_, StateOut} = + %% Both reduce functions get a chance to reduce + %% memory. The second may very well get a quota of + %% 0 if the first function managed to push out the + %% maximum number of messages. + lists:foldl( + fun(ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, ReduceFuns), + {true, StateOut} + end, + case State1 #vqstate.target_ram_msg_count of + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end + end. limit_ram_acks(0, State) -> - State; + {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> case gb_trees:is_empty(RAI) of true -> - State; + {Quota, State}; false -> {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { @@ -1480,7 +1497,7 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> + when Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). @@ -1568,9 +1585,9 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), - State2. + {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota2, State2}. maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( -- cgit v1.2.1 From b4ce991122d79a1b25de0f5bdd3f50af6fa0b032 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 11 Nov 2010 18:44:41 +0000 Subject: Added assertion in limit_ram_acks for is_persistent=false --- src/rabbit_variable_queue.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9c25540e..6f8fd3bc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1437,8 +1437,9 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, false -> {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { - guid = Guid, %% ASSERTION - msg_props = MsgProps } = dict:fetch(SeqId, PA), + guid = Guid, %% ASSERTION + is_persistent = false, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), limit_ram_acks(Quota - 1, State1 #vqstate { -- cgit v1.2.1 From 89f14ce3f48e8c47cc2407bfb0c50d8d784d69fe Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 11 Nov 2010 18:45:22 +0000 Subject: Reinstated the infinity check for chunk_size --- src/rabbit_variable_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6f8fd3bc..4b57779b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1498,7 +1498,7 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) - when Permitted >= Current -> + when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -- cgit v1.2.1 From 2885ee4b3f2c8b12415aa0a3808b04ba7c410576 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 11 Nov 2010 18:46:15 +0000 Subject: Removed stray references to target_ram_ack_count --- src/rabbit_variable_queue.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4b57779b..cbeb9a75 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -258,7 +258,6 @@ persistent_count, target_ram_msg_count, - target_ram_ack_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, @@ -351,7 +350,6 @@ transient_threshold :: non_neg_integer(), target_ram_msg_count :: non_neg_integer() | 'infinity', - target_ram_ack_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), -- cgit v1.2.1 From 77b9aa447953dbb760c8e0bc766b089a5bdc9e79 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 11 Nov 2010 19:33:43 +0000 Subject: Rename target_ram_msg_count to target_ram_item_count --- src/rabbit_tests.erl | 2 +- src/rabbit_variable_queue.erl | 225 +++++++++++++++++++++--------------------- 2 files changed, 114 insertions(+), 113 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4f543704..3130bca3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1892,7 +1892,7 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, - {target_ram_msg_count, 0}, + {target_ram_item_count, 0}, {ram_msg_count, 0}, {ram_ack_count, 0}]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbeb9a75..833fc789 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -158,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_msg_count and ensures the queue remains responsive +%% target_ram_item_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -257,7 +257,7 @@ len, persistent_count, - target_ram_msg_count, + target_ram_item_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, @@ -331,34 +331,34 @@ funs :: [fun (() -> any())] }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), - delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), - next_seq_id :: seq_id(), - pending_ack :: dict:dictionary(), - ram_ack_index :: gb_tree(), - index_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, + q1 :: queue(), + q2 :: bpqueue:bpqueue(), + delta :: delta(), + q3 :: bpqueue:bpqueue(), + q4 :: queue(), + next_seq_id :: seq_id(), + pending_ack :: dict:dictionary(), + ram_ack_index :: gb_tree(), + index_state :: any(), + msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), - durable :: boolean(), - - len :: non_neg_integer(), - persistent_count :: non_neg_integer(), - - transient_threshold :: non_neg_integer(), - target_ram_msg_count :: non_neg_integer() | 'infinity', - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - rates :: rates(), - ack_rates :: rates() }). + on_sync :: sync(), + durable :: boolean(), + + len :: non_neg_integer(), + persistent_count :: non_neg_integer(), + + transient_threshold :: non_neg_integer(), + target_ram_item_count :: non_neg_integer() | 'infinity', + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + rates :: rates(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -715,18 +715,18 @@ set_ram_duration_target(DurationTarget, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate }, - target_ram_msg_count = TargetRamMsgCount }) -> + target_ram_item_count = TargetRamItemCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamMsgCount1 = + TargetRamItemCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 }, + a(case TargetRamItemCount1 == infinity orelse + (TargetRamItemCount =/= infinity andalso + TargetRamItemCount1 >= TargetRamItemCount) of true -> State1; false -> reduce_memory_use(State1) end). @@ -799,39 +799,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - ram_ack_index = RAI, - on_sync = #sync { funs = From }, - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_item_count = TargetRamItemCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, - ack_rates = #rates { + ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {ram_ack_count , gb_trees:size(RAI)}, - {outstanding_txns , length(From)}, - {target_ram_msg_count , TargetRamMsgCount}, - {ram_msg_count , RamMsgCount}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate}, - {avg_ack_ingress_rate , AvgAckIngressRate}]. + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {ram_ack_count , gb_trees:size(RAI)}, + {outstanding_txns , length(From)}, + {target_ram_item_count , TargetRamItemCount}, + {ram_msg_count , RamMsgCount}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_egress_rate , AvgEgressRate}, + {avg_ingress_rate , AvgIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1021,42 +1021,42 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - ram_ack_index = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - ack_out_counter = 0, - ack_in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now }, - ack_rates = #rates { egress = {Now, 0}, - ingress = {Now, 0}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = undefined } }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_item_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + ack_out_counter = 0, + ack_in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now }, + ack_rates = #rates { egress = {Now, 0}, + ingress = {Now, 0}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = undefined } }, a(maybe_deltas_to_betas(State)). msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> @@ -1360,7 +1360,7 @@ find_persistent_count(LensByStore) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_msg_count of zero or after an incremental alpha->beta +%% target_ram_item_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1371,23 +1371,23 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, - State = #vqstate {target_ram_msg_count = infinity}) -> + State = #vqstate {target_ram_item_count = infinity}) -> {false, State}; reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State = #vqstate { - ram_ack_index = RamAckIndex, - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount, - rates = #rates { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount, + rates = #rates { avg_ingress = AvgIngress, avg_egress = AvgEgress }, - ack_rates = #rates { + ack_rates = #rates { avg_ingress = AvgAckIngress, avg_egress = AvgAckEgress } }) -> {Reduce, State1} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), - TargetRamMsgCount) of + TargetRamItemCount) of 0 -> {false, State}; S1 -> @@ -1415,7 +1415,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, {true, StateOut} end, - case State1 #vqstate.target_ram_msg_count of + case State1 #vqstate.target_ram_item_count of 0 -> {Reduce, BetaDeltaFun(State1)}; _ -> case chunk_size(State1 #vqstate.ram_index_count, permitted_ram_index_count(State1)) of @@ -1612,10 +1612,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount }) when Quota =:= 0 orelse - TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + TargetRamItemCount =:= infinity orelse + TargetRamItemCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of -- cgit v1.2.1