summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-11 19:40:24 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-11 19:40:24 +0000
commit18acdc0126fa7e36c1611e424d551ba39e70f139 (patch)
tree327cec3a4839c8e182e130fbe0230e56c744bb68
parentc5035d704a75a1b829a845fc475f7ece7df085db (diff)
parent77b9aa447953dbb760c8e0bc766b089a5bdc9e79 (diff)
downloadrabbitmq-server-18acdc0126fa7e36c1611e424d551ba39e70f139.tar.gz
Merge with default
-rw-r--r--src/rabbit_tests.erl32
-rw-r--r--src/rabbit_variable_queue.erl439
2 files changed, 327 insertions, 144 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 71b23e01..3130bca3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1865,9 +1865,39 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_dropwhile/1]],
+ fun test_dropwhile/1,
+ fun test_variable_queue_ack_limiting/1]],
passed.
+test_variable_queue_ack_limiting(VQ0) ->
+ %% start by sending in a bunch of messages <
+ Len = 1024,
+ VQ1 = variable_queue_publish(false, Len, VQ0),
+
+ %% squeeze and relax queue
+ Churn = Len div 32,
+ VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
+ %% update stats for duration
+ {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
+
+ %% fetch half the messages
+ {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
+
+ VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
+ {ram_ack_count , Len div 2},
+ {ram_msg_count , Len div 2}]),
+
+ %% quarter the allowed duration
+ VQ6 = check_variable_queue_status(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ5),
+ [{len, Len div 2},
+ {target_ram_item_count, 0},
+ {ram_msg_count, 0},
+ {ram_ack_count, 0}]),
+
+ VQ6.
+
test_dropwhile(VQ0) ->
Count = 10,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 69d62fde..48f621e1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -89,12 +89,14 @@
%%
%% The duration indicated to us by the memory_monitor is used to
%% calculate, given our current ingress and egress rates, how many
-%% messages we should hold in RAM. When we need to push alphas to
-%% betas or betas to gammas, we favour writing out messages that are
-%% further from the head of the queue. This minimises writes to disk,
-%% as the messages closer to the tail of the queue stay in the queue
-%% for longer, thus do not need to be replaced as quickly by sending
-%% other messages to disk.
+%% messages we should hold in RAM. We track the ingress and egress
+%% rates for both messages and pending acks and rates for both are
+%% considered when calculating the number of messages to hold in
+%% RAM. When we need to push alphas to betas or betas to gammas, we
+%% favour writing out messages that are further from the head of the
+%% queue. This minimises writes to disk, as the messages closer to the
+%% tail of the queue stay in the queue for longer, thus do not need to
+%% be replaced as quickly by sending other messages to disk.
%%
%% Whilst messages are pushed to disk and forgotten from RAM as soon
%% as requested by a new setting of the queue RAM duration, the
@@ -156,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the
-%% target_ram_msg_count and ensures the queue remains responsive
+%% target_ram_item_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
@@ -168,6 +170,30 @@
%% the latter) are both cheap and do require any scanning through qi
%% segments.
%%
+%% Pending acks are recorded in memory either as the tuple {SeqId,
+%% Guid, MsgProps} (tuple form) or as the message itself (message
+%% form). Acks for persistent messages are always stored in the tuple
+%% form. Acks for transient messages are also stored in tuple form if
+%% the message has been forgotten to disk as part of the memory
+%% reduction process. For transient messages that haven't already been
+%% written to disk, acks are stored in message form to avoid the
+%% overhead of writing to disk.
+%%
+%% During memory reduction, acks stored in message form are converted
+%% to tuple form, and the corresponding messages are pushed out to
+%% disk.
+%%
+%% The order in which alphas are pushed to betas and message form acks
+%% are pushed to disk is determined dynamically. We always prefer to
+%% push messages for the source (alphas or acks) that is growing the
+%% fastest (with growth measured as avg. ingress - avg. egress). In
+%% each round of memory reduction a chunk of messages at most
+%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
+%% fastest growing source will be reduced by as much of this chunk as
+%% possible. If there is any remaining allocation in the chunk after
+%% the first source has been reduced to zero, the second source will
+%% be reduced by as much of the remaining chunk as possible.
+%%
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
%% msg_store.)
@@ -220,6 +246,8 @@
q4,
next_seq_id,
pending_ack,
+ pending_ack_index,
+ ram_ack_index,
index_state,
msg_store_clients,
on_sync,
@@ -229,13 +257,17 @@
len,
persistent_count,
- target_ram_msg_count,
+ target_ram_item_count,
ram_msg_count,
ram_msg_count_prev,
+ ram_ack_count_prev,
ram_index_count,
out_counter,
in_counter,
- rates
+ ack_out_counter,
+ ack_in_counter,
+ rates,
+ ack_rates
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -299,30 +331,34 @@
funs :: [fun (() -> any())] }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: bpqueue:bpqueue(),
- delta :: delta(),
- q3 :: bpqueue:bpqueue(),
- q4 :: queue(),
- next_seq_id :: seq_id(),
- pending_ack :: dict(),
- index_state :: any(),
- msg_store_clients :: 'undefined' | {{any(), binary()},
+ q1 :: queue(),
+ q2 :: bpqueue:bpqueue(),
+ delta :: delta(),
+ q3 :: bpqueue:bpqueue(),
+ q4 :: queue(),
+ next_seq_id :: seq_id(),
+ pending_ack :: dict(),
+ ram_ack_index :: gb_tree(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- on_sync :: sync(),
- durable :: boolean(),
-
- len :: non_neg_integer(),
- persistent_count :: non_neg_integer(),
-
- transient_threshold :: non_neg_integer(),
- target_ram_msg_count :: non_neg_integer() | 'infinity',
- ram_msg_count :: non_neg_integer(),
- ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
- out_counter :: non_neg_integer(),
- in_counter :: non_neg_integer(),
- rates :: rates() }).
+ on_sync :: sync(),
+ durable :: boolean(),
+
+ len :: non_neg_integer(),
+ persistent_count :: non_neg_integer(),
+
+ transient_threshold :: non_neg_integer(),
+ target_ram_item_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_index_count :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ rates :: rates(),
+ ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -479,19 +515,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- PA1 = record_pending_ack(m(MsgStatus1), PA),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ persistent_count = PCount1 })}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -561,8 +595,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -582,12 +615,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
%% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {blank_ack, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -595,12 +628,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
{{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ a(State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 })}.
ack(AckTags, State) ->
a(ack(fun msg_store_remove/3,
@@ -680,17 +712,21 @@ set_ram_duration_target(DurationTarget,
State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate },
- target_ram_msg_count = TargetRamMsgCount }) ->
- Rate = AvgEgressRate + AvgIngressRate,
- TargetRamMsgCount1 =
+ ack_rates =
+ #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ target_ram_item_count = TargetRamItemCount }) ->
+ Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate
+ + AvgAckIngressRate,
+ TargetRamItemCount1 =
case DurationTarget of
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 },
- a(case TargetRamMsgCount1 == infinity orelse
- (TargetRamMsgCount =/= infinity andalso
- TargetRamMsgCount1 >= TargetRamMsgCount) of
+ State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
+ a(case TargetRamItemCount1 == infinity orelse
+ (TargetRamItemCount =/= infinity andalso
+ TargetRamItemCount1 >= TargetRamItemCount) of
true -> State1;
false -> reduce_memory_use(State1)
end).
@@ -699,19 +735,35 @@ ram_duration(State = #vqstate {
rates = #rates { egress = Egress,
ingress = Ingress,
timestamp = Timestamp } = Rates,
+ ack_rates = #rates { egress = AckEgress,
+ ingress = AckIngress } = ARates,
in_counter = InCount,
out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev }) ->
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_ack_index = RamAckIndex,
+ ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
- Duration = %% msgs / (msgs/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
+ {AvgAckEgressRate, AckEgress1} =
+ update_rate(Now, Timestamp, AckOutCount, AckEgress),
+ {AvgAckIngressRate, AckIngress1} =
+ update_rate(Now, Timestamp, AckInCount, AckIngress),
+
+ RamAckCount = gb_trees:size(RamAckIndex),
+
+ Duration = %% msgs+acks / (msgs+acks/sec) == sec
+ case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount) /
- (2 * (AvgEgressRate + AvgIngressRate))
+ false -> (RamMsgCountPrev + RamMsgCount +
+ RamAckCount + RamAckCountPrev) /
+ (2 * (AvgEgressRate + AvgIngressRate +
+ AvgAckEgressRate + AvgAckIngressRate))
end,
{Duration, State #vqstate {
@@ -721,14 +773,21 @@ ram_duration(State = #vqstate {
avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate,
timestamp = Now },
+ ack_rates = ARates #rates {
+ egress = AckEgress1,
+ ingress = AckIngress1,
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
in_counter = 0,
out_counter = 0,
- ram_msg_count_prev = RamMsgCount }}.
+ ram_msg_count_prev = RamMsgCount,
+ ram_ack_count_prev = RamAckCount }}.
needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
State),
Res;
needs_idle_timeout(_State) ->
@@ -740,32 +799,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- pending_ack = PA,
- on_sync = #sync { funs = From },
- target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- rates = #rates {
+ len = Len,
+ pending_ack = PA,
+ ram_ack_index = RAI,
+ on_sync = #sync { funs = From },
+ target_ram_item_count = TargetRamItemCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates {
avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
- {delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
- {len , Len},
- {pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
- {target_ram_msg_count , TargetRamMsgCount},
- {ram_msg_count , RamMsgCount},
- {ram_index_count , RamIndexCount},
- {next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {avg_egress_rate , AvgEgressRate},
- {avg_ingress_rate , AvgIngressRate} ].
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates {
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {ram_ack_count , gb_trees:size(RAI)},
+ {outstanding_txns , length(From)},
+ {target_ram_item_count , TargetRamItemCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate},
+ {avg_ack_ingress_rate , AvgAckIngressRate}].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -955,33 +1021,42 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_msg_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- rates = #rates { egress = {Now, 0},
- ingress = {Now, DeltaCount1},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Now } },
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_item_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ rates = #rates { egress = {Now, 0},
+ ingress = {Now, DeltaCount1},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Now },
+ ack_rates = #rates { egress = {Now, 0},
+ ingress = {Now, 0},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = undefined } },
a(maybe_deltas_to_betas(State)).
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
@@ -1191,12 +1266,21 @@ record_pending_ack(#msg_status { seq_id = SeqId,
guid = Guid,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
- msg_props = MsgProps } = MsgStatus, PA) ->
- AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid, MsgProps};
- false -> MsgStatus
- end,
- dict:store(SeqId, AckEntry, PA).
+ msg_props = MsgProps } = MsgStatus,
+ State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI,
+ ack_in_counter = AckInCount}) ->
+ {AckEntry, RAI1} =
+ case MsgOnDisk of
+ true ->
+ {{IsPersistent, Guid, MsgProps}, RAI};
+ false ->
+ {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ end,
+ PA1 = dict:store(SeqId, AckEntry, PA),
+ State #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1,
+ ack_in_counter = AckInCount + 1}.
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1204,7 +1288,8 @@ remove_pending_ack(KeepPersistent,
msg_store_clients = MSCState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
- State1 = State #vqstate { pending_ack = dict:new() },
+ State1 = State #vqstate { pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, GuidsByStore) of
error -> State1;
@@ -1226,13 +1311,17 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount }} =
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI}}) ->
AckEntry = dict:fetch(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
+ pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index =
+ gb_trees:delete_any(SeqId, RAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
@@ -1241,7 +1330,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1270,7 +1360,7 @@ find_persistent_count(LensByStore) ->
%% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a
-%% target_ram_msg_count of zero or after an incremental alpha->beta
+%% target_ram_item_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up
@@ -1280,14 +1370,52 @@ find_persistent_count(LensByStore) ->
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
- State #vqstate.target_ram_msg_count) of
- 0 -> {false, State};
- S1 -> {true, AlphaBetaFun(S1, State)}
- end,
- case State1 #vqstate.target_ram_msg_count of
- infinity -> {Reduce, State1};
+reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
+ State = #vqstate {target_ram_item_count = infinity}) ->
+ {false, State};
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
+ State = #vqstate {
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_item_count = TargetRamItemCount,
+ rates = #rates {
+ avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates {
+ avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress } }) ->
+
+ {Reduce, State1} =
+ case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ TargetRamItemCount) of
+ 0 ->
+ {false, State};
+ S1 ->
+ ReduceFuns =
+ case (AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress) of
+ true ->
+ %% ACKs are growing faster than the queue,
+ %% push messages from there first.
+ [AckFun, AlphaBetaFun];
+ false ->
+ %% The queue is growing faster than the
+ %% acks, push queue messages first.
+ [AlphaBetaFun, AckFun]
+ end,
+ {_, StateOut} =
+ %% Both reduce functions get a chance to reduce
+ %% memory. The second may very well get a quota of
+ %% 0 if the first function managed to push out the
+ %% maximum number of messages.
+ lists:foldl(
+ fun(ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, ReduceFuns),
+ {true, StateOut}
+ end,
+
+ case State1 #vqstate.target_ram_item_count of
0 -> {Reduce, BetaDeltaFun(State1)};
_ -> case chunk_size(State1 #vqstate.ram_index_count,
permitted_ram_index_count(State1)) of
@@ -1296,10 +1424,34 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
+
+limit_ram_acks(0, State) ->
+ {0, State};
+limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ case gb_trees:is_empty(RAI) of
+ true ->
+ {Quota, State};
+ false ->
+ {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ MsgStatus = #msg_status {
+ guid = Guid, %% ASSERTION
+ is_persistent = false, %% ASSERTION
+ msg_props = MsgProps } = dict:fetch(SeqId, PA),
+ {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ limit_ram_acks(Quota - 1,
+ State1 #vqstate {
+ pending_ack =
+ dict:store(SeqId, {false, Guid, MsgProps}, PA),
+ ram_ack_index = RAI1 })
+ end.
+
+
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,
fun push_betas_to_deltas/1,
+ fun limit_ram_acks/2,
State),
State1.
@@ -1432,9 +1584,9 @@ maybe_deltas_to_betas(State = #vqstate {
end.
push_alphas_to_betas(Quota, State) ->
- { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
- {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
- State2.
+ {Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ {Quota2, State2}.
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
@@ -1460,10 +1612,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
+ ram_msg_count = RamMsgCount,
+ target_ram_item_count = TargetRamItemCount })
when Quota =:= 0 orelse
- TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
+ TargetRamItemCount =:= infinity orelse
+ TargetRamItemCount >= RamMsgCount ->
{Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of