summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-05 14:37:41 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-05 14:37:41 +0000
commit42fb44d8a398124333db478b23724b3a46686835 (patch)
treefe5f1ea4db36df3d203ce56519484f2bd9923e4d
parentcb16018fc2376699a2e5b6a19417ad7157c0f310 (diff)
downloadrabbitmq-server-42fb44d8a398124333db478b23724b3a46686835.tar.gz
Fix bookkeeping of ram_msg_{count,bytes} - don't assume it decrements on alpha -> beta transition, allow it to decrement on beta -> delta as well if the message went to the QI. (We were already correct on delta -> beta, and there isn't a bulk beta -> alpha move.)
-rw-r--r--src/rabbit_variable_queue.erl51
1 files changed, 28 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9921417e..884342d8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1157,11 +1157,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes(
+ upd_ram_bytes_count(
1, MsgStatus,
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }))
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1209,6 +1208,9 @@ upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
unacked_bytes = UBytes + SignUnacked * S,
persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
+upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) ->
+ upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}).
+
upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
@@ -1498,7 +1500,7 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
{MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
+ upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
%% [1] We increase the ram_bytes here because we paged the message in
@@ -1844,18 +1846,17 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1,
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ {MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(
- MsgStatus2, Qa,
- upd_ram_bytes(
- -1, MsgStatus2,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1})),
+ State2 = case MsgStatus2#msg_status.msg of
+ undefined -> upd_ram_bytes_count(
+ -1, MsgStatus2, State1);
+ _ -> State1
+ end,
+ State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
+ Qa, State3)
end
end.
@@ -1863,7 +1864,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
delta = Delta,
q3 = Q3,
index_state = IndexState }) ->
- PushState = {Quota, Delta, IndexState},
+ PushState = {Quota, Delta, IndexState, State},
{Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
@@ -1872,11 +1873,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
Q2, PushState1),
- {_, Delta1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1 }.
+ {_, Delta1, IndexState1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
@@ -1893,10 +1894,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end.
push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _IndexState} = PushState) ->
+ {0, _Delta, _IndexState, _State} = PushState) ->
{Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, IndexState} = PushState) ->
+ {Quota, Delta, IndexState, State} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1906,9 +1907,13 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
+ State1 = case MsgStatus#msg_status.msg of
+ undefined -> State;
+ _ -> upd_ram_bytes_count(-1, MsgStatus, State)
+ end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, IndexState1})
+ {Quota - 1, Delta1, IndexState1, State1})
end.
%%----------------------------------------------------------------------------