summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-09-30 17:52:01 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-09-30 17:52:01 +0100
commit46fd22bc2a1eb42cd26353b0bea2862c2b5ec5ea (patch)
tree0d1e62c6c7c275cbddfa153a9f0c3066debb92f6
parent62a60bc7a134eeb4f0568a75cc8ae9a39d353ca1 (diff)
downloadrabbitmq-server-46fd22bc2a1eb42cd26353b0bea2862c2b5ec5ea.tar.gz
Avoid delta's low end extending beyond the low end of q3
-rw-r--r--src/rabbit_variable_queue.erl37
1 files changed, 21 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0c3ac4f7..a9b78be9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -912,6 +912,8 @@ combine_deltas(#delta { start_seq_id = StartLow,
andalso ((StartLow + Count) =< EndHigh),
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
+combine_deltas_flip(A, B) -> combine_deltas(B, A).
+
beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q).
update_rate(Now, Then, Count, {OThen, OCount}) ->
@@ -1680,11 +1682,11 @@ push_betas_to_deltas(Quota,
ram_index_count = RamIndexCount }) ->
{Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} =
push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun ?QUEUE:out/1, Quota, Q2,
+ fun ?QUEUE:out/1, fun combine_deltas/2, Quota, Q2,
RamIndexCount, IndexState),
{_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} =
push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun ?QUEUE:out_r/1, Quota1, Q3,
+ fun ?QUEUE:out_r/1, fun combine_deltas_flip/2, Quota1, Q3,
RamIndexCount2, IndexState2),
Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
State #vqstate { q2 = Q2a,
@@ -1693,7 +1695,7 @@ push_betas_to_deltas(Quota,
index_state = IndexState3,
ram_index_count = RamIndexCount3 }.
-push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) ->
+push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, IndexState) ->
case ?QUEUE:out(Q) of
{empty, _Q} ->
{Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
@@ -1702,26 +1704,26 @@ push_betas_to_deltas(LimitFun, Generator, Quota, Q, RamIndexCount, IndexState) -
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
- false -> {Len, Qc, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(Generator, Limit, Quota, Q, 0,
- RamIndexCount, IndexState),
- {Quota - Len, #delta { start_seq_id = Limit,
- count = Len,
- end_seq_id = MaxSeqId + 1 },
+ false -> {Delta, Qc, RamIndexCount1, IndexState1} =
+ push_betas_to_deltas(
+ Generator, Combine, Limit, Quota, ?BLANK_DELTA,
+ Q, RamIndexCount, IndexState),
+ {Quota - Delta #delta.count, Delta,
Qc, RamIndexCount1, IndexState1}
end
end.
-push_betas_to_deltas(_Generator, _Limit, 0, Q, Count, RamIndexCount, IndexState) ->
- {Count, Q, RamIndexCount, IndexState};
-push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas(_Generator, _Combine, _Limit, 0, Delta, Q, RamIndexCount, IndexState) ->
+ {Delta, Q, RamIndexCount, IndexState};
+push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, _Q} ->
- {Count, Q, RamIndexCount, IndexState};
+ {Delta, Q, RamIndexCount, IndexState};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Count, Q, RamIndexCount, IndexState};
- {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Qa} ->
+ {Delta, Q, RamIndexCount, IndexState};
+ {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
+ seq_id = SeqId }}, Qa} ->
{Quota1, RamIndexCount1, IndexState1} =
case IndexOnDisk of
true -> {Quota, RamIndexCount, IndexState};
@@ -1731,8 +1733,11 @@ push_betas_to_deltas(Generator, Limit, Quota, Q, Count, RamIndexCount, IndexStat
IndexState),
{Quota - 1, RamIndexCount - 1, IndexState2}
end,
+ Delta1 = Combine(Delta, #delta { start_seq_id = SeqId,
+ count = 1,
+ end_seq_id = SeqId + 1 }),
push_betas_to_deltas(
- Generator, Limit, Quota1, Qa, Count + 1, RamIndexCount1, IndexState1)
+ Generator, Combine, Limit, Quota1, Delta1, Qa, RamIndexCount1, IndexState1)
end.
%%----------------------------------------------------------------------------