From 46fd22bc2a1eb42cd26353b0bea2862c2b5ec5ea Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 30 Sep 2011 17:52:01 +0100 Subject: Avoid delta's low end extending beyond the low end of q3 --- src/rabbit_variable_queue.erl | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0c3ac4f7..a9b78be9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -912,6 +912,8 @@ combine_deltas(#delta { start_seq_id = StartLow, andalso ((StartLow + Count) =< EndHigh), #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. +combine_deltas_flip(A, B) -> combine_deltas(B, A). + beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q). update_rate(Now, Then, Count, {OThen, OCount}) -> @@ -1680,11 +1682,11 @@ push_betas_to_deltas(Quota, ram_index_count = RamIndexCount }) -> {Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} = push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun ?QUEUE:out/1, Quota, Q2, + fun ?QUEUE:out/1, fun combine_deltas/2, Quota, Q2, RamIndexCount, IndexState), {_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} = push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun ?QUEUE:out_r/1, Quota1, Q3, + fun ?QUEUE:out_r/1, fun combine_deltas_flip/2, Quota1, Q3, RamIndexCount2, IndexState2), Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), State #vqstate { q2 = Q2a, @@ -1693,7 +1695,7 @@ push_betas_to_deltas(Quota, index_state = IndexState3, ram_index_count = RamIndexCount3 }. -push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) -> +push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, IndexState) -> case ?QUEUE:out(Q) of {empty, _Q} -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; @@ -1702,26 +1704,26 @@ push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) - Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {Len, Qc, RamIndexCount1, IndexState1} = - push_betas_to_deltas(Generator, Limit, Quota, Q, 0, - RamIndexCount, IndexState), - {Quota - Len, #delta { start_seq_id = Limit, - count = Len, - end_seq_id = MaxSeqId + 1 }, + false -> {Delta, Qc, RamIndexCount1, IndexState1} = + push_betas_to_deltas( + Generator, Combine, Limit, Quota, ?BLANK_DELTA, + Q, RamIndexCount, IndexState), + {Quota - Delta #delta.count, Delta, Qc, RamIndexCount1, IndexState1} end end. -push_betas_to_deltas(_Generator, _Limit, 0, Q, Count, RamIndexCount, IndexState) -> - {Count, Q, RamIndexCount, IndexState}; -push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas(_Generator, _Combine, _Limit, 0, Delta, Q, RamIndexCount, IndexState) -> + {Delta, Q, RamIndexCount, IndexState}; +push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount, IndexState) -> case Generator(Q) of {empty, _Q} -> - {Count, Q, RamIndexCount, IndexState}; + {Delta, Q, RamIndexCount, IndexState}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Count, Q, RamIndexCount, IndexState}; - {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Qa} -> + {Delta, Q, RamIndexCount, IndexState}; + {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, + seq_id = SeqId }}, Qa} -> {Quota1, RamIndexCount1, IndexState1} = case IndexOnDisk of true -> {Quota, RamIndexCount, IndexState}; @@ -1731,8 +1733,11 @@ push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexStat IndexState), {Quota - 1, RamIndexCount - 1, IndexState2} end, + Delta1 = Combine(Delta, #delta { start_seq_id = SeqId, + count = 1, + end_seq_id = SeqId + 1 }), push_betas_to_deltas( - Generator, Limit, Quota1, Qa, Count + 1, RamIndexCount1, IndexState1) + Generator, Combine, Limit, Quota1, Delta1, Qa, RamIndexCount1, IndexState1) end. %%---------------------------------------------------------------------------- -- cgit v1.2.1