summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-11 13:58:14 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-11 13:58:14 +0000
commit8eae638319aa7ca9c1ddc84ae0756653d6f96cff (patch)
tree8b009259e63cb99ccff7efec462780116cbb3663
parent9e06374d6f93bf33c347661b433a445158b77711 (diff)
downloadrabbitmq-server-8eae638319aa7ca9c1ddc84ae0756653d6f96cff.tar.gz
A few fixes to VQ book-keeping, which get the tests passing again:
* Don't count messages / bytes read during read, do so in betas_from_index_entries since unacked msgs don't count * Introduce qi_pending_ack to prevent us trying to page out messages that don't touch the store * Fix purge bookkeeping, account for the fact messages can be paged in to be purged
-rw-r--r--src/rabbit_queue_index.erl44
-rw-r--r--src/rabbit_variable_queue.erl183
-rw-r--r--test/src/rabbit_tests.erl21
3 files changed, 139 insertions, 109 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9a9c4d18..75b13e02 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -238,8 +238,7 @@
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
- boolean(), boolean()}],
- non_neg_integer(), non_neg_integer(), qistate()}).
+ boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -354,12 +353,11 @@ read(Start, End, State = #qistate { segments = Segments,
%% Start is inclusive, End is exclusive.
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
- {Messages, {BodiesRead, BodyBytesRead}, Segments1} =
+ {Messages, Segments1} =
lists:foldr(fun (Seg, Acc) ->
read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
- end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)),
- {Messages, BodiesRead, BodyBytesRead,
- State #qistate { segments = Segments1 }}.
+ end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -636,7 +634,7 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
{ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
binary_to_term(MsgBin),
{Msg, Props};
- _ -> exit(could_not_read)
+ _ -> exit(could_not_read) %% TODO
end
end.
@@ -922,21 +920,19 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
Hdl.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
- {Messages, BodyReadCounts, Segments}, Dir) ->
+ {Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
- {Messages1, BodyReadCounts1} =
- segment_entries_foldr(
- fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
- {Acc, BodyReadAcc})
- when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
- (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc],
- incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)};
- (_RelSeq, _Value, Acc) ->
- Acc
- end, {Messages, BodyReadCounts}, Segment),
- {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}.
+ {segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ Acc)
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc];
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, Messages, Segment),
+ segment_store(Segment, Segments)}.
segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
@@ -944,12 +940,6 @@ segment_entries_foldr(Fun, Init,
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
array:sparse_foldr(Fun, Init, SegEntries1).
-incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) ->
- Counts;
-incr_body_read_counts(#basic_message{}, #message_properties{size = Size},
- {BodiesRead, BodyBytesRead}) ->
- {BodiesRead + 1, BodyBytesRead + Size}.
-
%% Loading segments
%%
%% Does not do any combining with the journal at all.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 905ed6be..f4a65333 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -248,8 +248,9 @@
q3,
q4,
next_seq_id,
- ram_pending_ack,
- disk_pending_ack,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
index_state,
msg_store_clients,
durable,
@@ -341,6 +342,7 @@
next_seq_id :: seq_id(),
ram_pending_ack :: gb_trees:tree(),
disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -524,26 +526,25 @@ delete_crashed(#amqqueue{name = QName}) ->
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
- len = Len,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }) ->
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {RamBytes, PCount, PBytes},
+ Stats = {0, 0, 0},
{Stats1, IndexState1} =
remove_queue_entries(Q4, Stats, IndexState, MSCState),
{Stats2, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
-
+ msg_store_clients = MSCState1,
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }} =
purge_betas_and_deltas(
Stats1, State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
- {{RamBytes3, PCount3, PBytes3}, IndexState3} =
+ {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} =
remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
@@ -551,9 +552,9 @@ purge(State = #vqstate { q4 = Q4,
len = 0,
bytes = 0,
ram_msg_count = 0,
- ram_bytes = RamBytes3,
- persistent_count = PCount3,
- persistent_bytes = PBytes3 })}.
+ ram_bytes = RamBytes - RamBytesDec,
+ persistent_count = PCount - PCountDec,
+ persistent_bytes = PBytes - PBytesDec })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -740,15 +741,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
{Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
[msg_iterator(State),
disk_ack_iterator(State),
- ram_ack_iterator(State)]),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
+depth(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -814,10 +818,11 @@ ram_duration(State) ->
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
ram_ack_count_prev = RamAckCountPrev } =
update_rates(State),
- RamAckCount = gb_trees:size(RPA),
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case lists:all(fun (X) -> X < 0.01 end,
@@ -853,8 +858,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
- gb_trees:size(RPA);
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
@@ -1055,26 +1061,36 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
- {Filtered, Delivers, Acks} =
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1} = Acc) ->
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1]};
- false -> case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {?QUEUE:in_r(m(beta_msg_status(M)),
- Filtered1),
- Delivers1, Acks1};
- true -> Acc
- end
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = MsgStatus#msg_status.msg =/= undefined,
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
end
- end, {?QUEUE:new(), [], []}, List),
- {Filtered, rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes,
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
@@ -1121,6 +1137,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
next_seq_id = NextSeqId,
ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1288,20 +1305,15 @@ purge_betas_and_deltas(Stats,
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, {RamBytes, PCount, PBytes},
+remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
IndexState, MSCState) ->
- {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+ {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytes1,
- PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
- PBytes1},
+ {{RamBytesDec1, PCountDec1, PBytesDec1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
@@ -1310,13 +1322,14 @@ remove_queue_entries1(
is_delivered = IsDelivered, msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
+ {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
{case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytes - Size * one_if(Msg =/= undefined),
- PBytes - Size * one_if(IsPersistent),
+ RamBytesDec + Size * one_if(Msg =/= undefined),
+ PCountDec + one_if(IsPersistent),
+ PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
@@ -1390,24 +1403,33 @@ prepare_to_store(Msg) ->
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg,
+ msg_props = MsgProps } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
ack_in_counter = AckInCount}) ->
- {RPA1, DPA1} =
- case Msg of
- undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
- _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {Msg, persist_to(MsgProps)} of
+ {undefined, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
ack_in_counter = AckInCount + 1}.
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> V;
- none -> gb_trees:get(SeqId, DPA)
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
end.
%% First parameter = UpdatePersistentCount
@@ -1417,27 +1439,38 @@ remove_pending_ack(true, SeqId, State) ->
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
{MsgStatus, upd_bytes(0, -1, MsgStatus,
State1 # vqstate{ persistent_count = PCount1 })};
-remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> DPA1 = gb_trees:delete(SeqId, DPA),
- {gb_trees:get(SeqId, DPA),
- State #vqstate { disk_pending_ack = DPA1 }}
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {gb_trees:get(SeqId, QPA),
+ State#vqstate{qi_pending_ack = QPA1}}
+ end
end.
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty() },
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1531,7 +1564,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- {m(trim_msg_status(MsgStatus1)), State1}.
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ {MsgStatus2, case MsgStatus2#msg_status.msg of
+ undefined -> State1;
+ _ -> inc_ram_msg_count(State1)
+ end}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1567,8 +1604,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
+ State3 = case MsgStatus#msg_status.msg of
+ undefined -> State2;
+ _ -> upd_ram_bytes(-1, MsgStatus, State2)
+ end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State2)}
+ upd_bytes(1, -1, MsgStatus, State3)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1598,6 +1639,9 @@ ram_ack_iterator(State) ->
disk_ack_iterator(State) ->
{ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
@@ -1621,14 +1665,15 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, _, _, IndexState1} =
+ {List, IndexState1} =
rabbit_queue_index:read(SeqId, SeqId1, IndexState),
next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1795,6 +1840,7 @@ maybe_deltas_to_betas(State = #vqstate {
ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1802,13 +1848,14 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, RamCountDelta, RamBytesDelta, IndexState1} =
+ {List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, IndexState1),
+ {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, QPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2,
- ram_msg_count = RamMsgCount + RamCountDelta,
- ram_bytes = RamBytes + RamBytesDelta },
+ ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index e7da3acb..6d9606b5 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -2283,7 +2283,7 @@ test_queue_index_props() ->
Props = #message_properties{expiry=12345, size = 10},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
- queue_index_read(1, 2, Qi1),
+ rabbit_queue_index:read(1, 2, Qi1),
Qi2
end),
@@ -2306,7 +2306,7 @@ test_queue_index() ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
- {ReadA, Qi4} = queue_index_read(0, SegmentSize, Qi3),
+ {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
@@ -2314,7 +2314,7 @@ test_queue_index() ->
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
- {ReadB, Qi10} = queue_index_read(0, SegmentSize, Qi9),
+ {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
@@ -2323,7 +2323,7 @@ test_queue_index() ->
{LenB, BytesB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
- {ReadC, Qi15} = queue_index_read(0, SegmentSize, Qi14),
+ {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsMsgIdsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
@@ -2386,11 +2386,11 @@ test_queue_index() ->
{Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {[], Qi8} = queue_index_read(0, 4, Qi7),
- {ReadD, Qi9} = queue_index_read(4, 7, Qi8),
+ {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7),
+ {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8),
ok = verify_read_with_published(true, false, ReadD,
[Four, Five, Six]),
- {ReadE, Qi10} = queue_index_read(7, 9, Qi9),
+ {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9),
ok = verify_read_with_published(false, false, ReadE,
[Seven, Eight]),
Qi10
@@ -2417,10 +2417,6 @@ test_queue_index() ->
passed.
-queue_index_read(Seq1, Seq2, State) ->
- {Res, _, _, State1} = rabbit_queue_index:read(Seq1, Seq2, State),
- {Res, State1}.
-
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
Q, case Recover of
@@ -2800,8 +2796,6 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
- io:format("~p:~n~p~n",
- [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2862,7 +2856,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
S = variable_queue_status(VQ1),
- io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.