diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 14:37:41 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 14:37:41 +0000 |
commit | 42fb44d8a398124333db478b23724b3a46686835 (patch) | |
tree | fe5f1ea4db36df3d203ce56519484f2bd9923e4d | |
parent | cb16018fc2376699a2e5b6a19417ad7157c0f310 (diff) | |
download | rabbitmq-server-42fb44d8a398124333db478b23724b3a46686835.tar.gz |
Fix bookkeeping of ram_msg_{count,bytes} - don't assume it decrements on alpha -> beta transition, allow it to decrement on beta -> delta as well if the message went to the QI. (We were already correct on delta -> beta, and there isn't a bulk beta -> alpha move.)
-rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
1 files changed, 28 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9921417e..884342d8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1157,11 +1157,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_bytes( + upd_ram_bytes_count( 1, MsgStatus, - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) })) + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1209,6 +1208,9 @@ upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, unacked_bytes = UBytes + SignUnacked * S, persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. +upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) -> + upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}). + upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. @@ -1498,7 +1500,7 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), {MsgStatus#msg_status { msg = Msg }, - upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] + upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1] publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. %% [1] We increase the ram_bytes here because we paged the message in @@ -1844,18 +1846,17 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer( - MsgStatus2, Qa, - upd_ram_bytes( - -1, MsgStatus2, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1})), + State2 = case MsgStatus2#msg_status.msg of + undefined -> upd_ram_bytes_count( + -1, MsgStatus2, State1); + _ -> State1 + end, + State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) + Qa, State3) end end. @@ -1863,7 +1864,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState }) -> - PushState = {Quota, Delta, IndexState}, + PushState = {Quota, Delta, IndexState, State}, {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, @@ -1872,11 +1873,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, Q2, PushState1), - {_, Delta1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1 }. + {_, Delta1, IndexState1, State1} = PushState2, + State1 #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a, + index_state = IndexState1 }. push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of @@ -1893,10 +1894,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end. push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _IndexState} = PushState) -> + {0, _Delta, _IndexState, _State} = PushState) -> {Q, PushState}; push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, IndexState} = PushState) -> + {Quota, Delta, IndexState, State} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1906,9 +1907,13 @@ push_betas_to_deltas1(Generator, Limit, Q, {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus, IndexState), + State1 = case MsgStatus#msg_status.msg of + undefined -> State; + _ -> upd_ram_bytes_count(-1, MsgStatus, State) + end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, IndexState1}) + {Quota - 1, Delta1, IndexState1, State1}) end. %%---------------------------------------------------------------------------- |