diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-11 13:58:14 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-11 13:58:14 +0000 |
commit | 8eae638319aa7ca9c1ddc84ae0756653d6f96cff (patch) | |
tree | 8b009259e63cb99ccff7efec462780116cbb3663 | |
parent | 9e06374d6f93bf33c347661b433a445158b77711 (diff) | |
download | rabbitmq-server-8eae638319aa7ca9c1ddc84ae0756653d6f96cff.tar.gz |
A few fixes to VQ book-keeping, which get the tests passing again:
* Don't count messages / bytes read during read, do so in
betas_from_index_entries since unacked msgs don't count
* Introduce qi_pending_ack to prevent us trying to page out messages that
don't touch the store
* Fix purge bookkeeping, account for the fact messages can be paged in to be
purged
-rw-r--r-- | src/rabbit_queue_index.erl | 44 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 183 | ||||
-rw-r--r-- | test/src/rabbit_tests.erl | 21 |
3 files changed, 139 insertions, 109 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9a9c4d18..75b13e02 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -238,8 +238,7 @@ -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), - boolean(), boolean()}], - non_neg_integer(), non_neg_integer(), qistate()}). + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -354,12 +353,11 @@ read(Start, End, State = #qistate { segments = Segments, %% Start is inclusive, End is exclusive. LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), - {Messages, {BodiesRead, BodyBytesRead}, Segments1} = + {Messages, Segments1} = lists:foldr(fun (Seg, Acc) -> read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) - end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)), - {Messages, BodiesRead, BodyBytesRead, - State #qistate { segments = Segments1 }}. + end, {[], Segments}, lists:seq(StartSeg, EndSeg)), + {Messages, State #qistate { segments = Segments1 }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -636,7 +634,7 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), {Msg, Props}; - _ -> exit(could_not_read) + _ -> exit(could_not_read) %% TODO end end. @@ -922,21 +920,19 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> Hdl. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, - {Messages, BodyReadCounts, Segments}, Dir) -> + {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), - {Messages1, BodyReadCounts1} = - segment_entries_foldr( - fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, - {Acc, BodyReadAcc}) - when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso - (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, - IsPersistent, IsDelivered == del} | Acc], - incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)}; - (_RelSeq, _Value, Acc) -> - Acc - end, {Messages, BodyReadCounts}, Segment), - {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}. + {segment_entries_foldr( + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, + Acc) + when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso + (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> + [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + IsPersistent, IsDelivered == del} | Acc]; + (_RelSeq, _Value, Acc) -> + Acc + end, Messages, Segment), + segment_store(Segment, Segments)}. segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> @@ -944,12 +940,6 @@ segment_entries_foldr(Fun, Init, {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldr(Fun, Init, SegEntries1). -incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) -> - Counts; -incr_body_read_counts(#basic_message{}, #message_properties{size = Size}, - {BodiesRead, BodyBytesRead}) -> - {BodiesRead + 1, BodyBytesRead + Size}. - %% Loading segments %% %% Does not do any combining with the journal at all. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 905ed6be..f4a65333 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -248,8 +248,9 @@ q3, q4, next_seq_id, - ram_pending_ack, - disk_pending_ack, + ram_pending_ack, %% msgs using store, still in RAM + disk_pending_ack, %% msgs in store, paged out + qi_pending_ack, %% msgs using qi, *can't* be paged out index_state, msg_store_clients, durable, @@ -341,6 +342,7 @@ next_seq_id :: seq_id(), ram_pending_ack :: gb_trees:tree(), disk_pending_ack :: gb_trees:tree(), + qi_pending_ack :: gb_trees:tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -524,26 +526,25 @@ delete_crashed(#amqqueue{name = QName}) -> purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, - len = Len, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }) -> + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {RamBytes, PCount, PBytes}, + Stats = {0, 0, 0}, {Stats1, IndexState1} = remove_queue_entries(Q4, Stats, IndexState, MSCState), {Stats2, State1 = #vqstate { q1 = Q1, index_state = IndexState2, - msg_store_clients = MSCState1 }} = - + msg_store_clients = MSCState1, + ram_bytes = RamBytes, + persistent_count = PCount, + persistent_bytes = PBytes }} = purge_betas_and_deltas( Stats1, State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), - {{RamBytes3, PCount3, PBytes3}, IndexState3} = + {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} = remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), @@ -551,9 +552,9 @@ purge(State = #vqstate { q4 = Q4, len = 0, bytes = 0, ram_msg_count = 0, - ram_bytes = RamBytes3, - persistent_count = PCount3, - persistent_bytes = PBytes3 })}. + ram_bytes = RamBytes - RamBytesDec, + persistent_count = PCount - PCountDec, + persistent_bytes = PBytes - PBytesDec })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -740,15 +741,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, [msg_iterator(State), disk_ack_iterator(State), - ram_ack_iterator(State)]), + ram_ack_iterator(State), + qi_ack_iterator(State)]), ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA). +depth(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -814,10 +818,11 @@ ram_duration(State) -> ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, + qi_pending_ack = QPA, ram_ack_count_prev = RamAckCountPrev } = update_rates(State), - RamAckCount = gb_trees:size(RPA), + RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec case lists:all(fun (X) -> X < 0.01 end, @@ -853,8 +858,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> - gb_trees:size(RPA); +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, + qi_pending_ack = QPA}) -> + gb_trees:size(RPA) + gb_trees:size(QPA); info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> @@ -1055,26 +1061,36 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> - {Filtered, Delivers, Acks} = +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> + {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1} = Acc) -> + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1]}; - false -> case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {?QUEUE:in_r(m(beta_msg_status(M)), - Filtered1), - Delivers1, Acks1}; - true -> Acc - end + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = MsgStatus#msg_status.msg =/= undefined, + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end end - end, {?QUEUE:new(), [], []}, List), - {Filtered, rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + end, {?QUEUE:new(), [], [], 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. +%% [0] We don't increase RamBytes here, even though it pertains to +%% unacked messages too, since if HaveMsg then the message must have +%% been stored in the QI, thus the message must have been in +%% qi_pending_ack, thus it must already have been in RAM. expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); @@ -1121,6 +1137,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, next_seq_id = NextSeqId, ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1288,20 +1305,15 @@ purge_betas_and_deltas(Stats, index_state = IndexState1 })) end. -remove_queue_entries(Q, {RamBytes, PCount, PBytes}, +remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec}, IndexState, MSCState) -> - {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = + {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytes, PBytes, [], []}, Q), + {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytes1, - PCount - case orddict:find(true, MsgIdsByStore) of - error -> 0; - {ok, Ids} -> length(Ids) - end, - PBytes1}, + {{RamBytesDec1, PCountDec1, PBytesDec1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. @@ -1310,13 +1322,14 @@ remove_queue_entries1( is_delivered = IsDelivered, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> + {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) -> {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytes - Size * one_if(Msg =/= undefined), - PBytes - Size * one_if(IsPersistent), + RamBytesDec + Size * one_if(Msg =/= undefined), + PCountDec + one_if(IsPersistent), + PBytesDec + Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. @@ -1390,24 +1403,33 @@ prepare_to_store(Msg) -> %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg, + msg_props = MsgProps } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, ack_in_counter = AckInCount}) -> - {RPA1, DPA1} = - case Msg of - undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)}; - _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA} + Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, + {RPA1, DPA1, QPA1} = + case {Msg, persist_to(MsgProps)} of + {undefined, _} -> {RPA, Insert(DPA), QPA}; + {_, queue_index} -> {RPA, DPA, Insert(QPA)}; + {_, msg_store} -> {Insert(RPA), DPA, QPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, + qi_pending_ack = QPA1, ack_in_counter = AckInCount + 1}. lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> V; - none -> gb_trees:get(SeqId, DPA) + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> V; + none -> gb_trees:get(SeqId, QPA) + end end. %% First parameter = UpdatePersistentCount @@ -1417,27 +1439,38 @@ remove_pending_ack(true, SeqId, State) -> PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), {MsgStatus, upd_bytes(0, -1, MsgStatus, State1 # vqstate{ persistent_count = PCount1 })}; -remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> DPA1 = gb_trees:delete(SeqId, DPA), - {gb_trees:get(SeqId, DPA), - State #vqstate { disk_pending_ack = DPA1 }} + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> + DPA1 = gb_trees:delete(SeqId, DPA), + {V, State#vqstate{disk_pending_ack = DPA1}}; + none -> + QPA1 = gb_trees:delete(SeqId, QPA), + {gb_trees:get(SeqId, QPA), + State#vqstate{qi_pending_ack = QPA1}} + end end. purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, index_state = IndexState, msg_store_clients = MSCState }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA), + F, rabbit_misc:gb_trees_fold( + F, rabbit_misc:gb_trees_fold( + F, accumulate_ack_init(), RPA), DPA), QPA), State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty() }, + disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty()}, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1531,7 +1564,11 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - {m(trim_msg_status(MsgStatus1)), State1}. + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + {MsgStatus2, case MsgStatus2#msg_status.msg of + undefined -> State1; + _ -> inc_ram_msg_count(State1) + end}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1567,8 +1604,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), + State3 = case MsgStatus#msg_status.msg of + undefined -> State2; + _ -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State2)} + upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1598,6 +1639,9 @@ ram_ack_iterator(State) -> disk_ack_iterator(State) -> {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. +qi_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. + msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; @@ -1621,14 +1665,15 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, _, _, IndexState1} = + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1795,6 +1840,7 @@ maybe_deltas_to_betas(State = #vqstate { ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1802,13 +1848,14 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - {List, RamCountDelta, RamBytesDelta, IndexState1} = + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, IndexState1), + {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + betas_from_index_entries(List, TransientThreshold, + RPA, DPA, QPA, IndexState1), State1 = State #vqstate { index_state = IndexState2, - ram_msg_count = RamMsgCount + RamCountDelta, - ram_bytes = RamBytes + RamBytesDelta }, + ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc }, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index e7da3acb..6d9606b5 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -2283,7 +2283,7 @@ test_queue_index_props() -> Props = #message_properties{expiry=12345, size = 10}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = - queue_index_read(1, 2, Qi1), + rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -2306,7 +2306,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), - {ReadA, Qi4} = queue_index_read(0, SegmentSize, Qi3), + {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient @@ -2314,7 +2314,7 @@ test_queue_index() -> {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), - {ReadB, Qi10} = queue_index_read(0, SegmentSize, Qi9), + {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsMsgIdsB)), %% should get length back as MostOfASegment @@ -2323,7 +2323,7 @@ test_queue_index() -> {LenB, BytesB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), - {ReadC, Qi15} = queue_index_read(0, SegmentSize, Qi14), + {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsMsgIdsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), @@ -2386,11 +2386,11 @@ test_queue_index() -> {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {[], Qi8} = queue_index_read(0, 4, Qi7), - {ReadD, Qi9} = queue_index_read(4, 7, Qi8), + {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7), + {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8), ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]), - {ReadE, Qi10} = queue_index_read(7, 9, Qi9), + {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9), ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]), Qi10 @@ -2417,10 +2417,6 @@ test_queue_index() -> passed. -queue_index_read(Seq1, Seq2, State) -> - {Res, _, _, State1} = rabbit_queue_index:read(Seq1, Seq2, State), - {Res, State1}. - variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( Q, case Recover of @@ -2800,8 +2796,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> VQ7 = lists:foldl( fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), - io:format("~p:~n~p~n", - [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2862,7 +2856,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), S = variable_queue_status(VQ1), - io:format("~p~n", [S]), assert_props(S, Props), VQ1. |