diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 09:36:55 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 09:36:55 +0100 |
commit | ac9cb2a32a2c3cf63e03a44416dd3a1d9c2969b7 (patch) | |
tree | 4d3cf7080d89f17874d8c20a57fd4c7138d08c3a | |
parent | 96479285ba6409174ae313ca7bdff491dbf78942 (diff) | |
download | rabbitmq-server-ac9cb2a32a2c3cf63e03a44416dd3a1d9c2969b7.tar.gz |
refactor: extract rates into separate part of vq state
-rw-r--r-- | src/rabbit_variable_queue.erl | 88 |
1 files changed, 45 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 06094950..8bff66af 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -236,13 +236,11 @@ ram_index_count, out_counter, in_counter, - egress_rate, - avg_egress_rate, - ingress_rate, - avg_ingress_rate, - rate_timestamp + rates }). +-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). + -record(msg_status, { seq_id, guid, @@ -284,6 +282,12 @@ -type(seq_id() :: non_neg_integer()). -type(ack() :: seq_id() | 'blank_ack'). +-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, + ingress :: {timestamp(), non_neg_integer()}, + avg_egress :: float(), + avg_ingress :: float(), + timestamp :: timestamp() }). + -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer (), end_seq_id :: non_neg_integer() }). @@ -318,12 +322,7 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - egress_rate :: {timestamp(), non_neg_integer()}, - avg_egress_rate :: float(), - ingress_rate :: {timestamp(), non_neg_integer()}, - avg_ingress_rate :: float(), - rate_timestamp :: timestamp() - }). + rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -425,12 +424,11 @@ init(QueueName, IsDurable, _Recover) -> ram_index_count = 0, out_counter = 0, in_counter = 0, - egress_rate = {Now, 0}, - avg_egress_rate = 0.0, - ingress_rate = {Now, DeltaCount1}, - avg_ingress_rate = 0.0, - rate_timestamp = Now - }, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -648,8 +646,8 @@ is_empty(State) -> 0 == len(State). set_ram_duration_target(DurationTarget, State = #vqstate { - avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, target_ram_msg_count = TargetRamMsgCount }) -> Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = @@ -666,14 +664,15 @@ set_ram_duration_target(DurationTarget, false -> reduce_memory_use(State1) end). -ram_duration(State = #vqstate { egress_rate = Egress, - ingress_rate = Ingress, - rate_timestamp = Timestamp, - in_counter = InCount, - out_counter = OutCount, - ram_msg_count = RamMsgCount, - duration_target = DurationTarget, - ram_msg_count_prev = RamMsgCountPrev }) -> +ram_duration(State = #vqstate { + rates = #rates { egress = Egress, + ingress = Ingress, + timestamp = Timestamp } = Rates, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget, + ram_msg_count_prev = RamMsgCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -685,16 +684,18 @@ ram_duration(State = #vqstate { egress_rate = Egress, (2 * (AvgEgressRate + AvgIngressRate)) end, - {Duration, set_ram_duration_target(DurationTarget, - State #vqstate { - egress_rate = Egress1, - avg_egress_rate = AvgEgressRate, - ingress_rate = Ingress1, - avg_ingress_rate = AvgIngressRate, - rate_timestamp = Now, - in_counter = 0, - out_counter = 0, - ram_msg_count_prev = RamMsgCount })}. + {Duration, set_ram_duration_target( + DurationTarget, + State #vqstate { + rates = Rates #rates { + egress = Egress1, + ingress = Ingress1, + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate, + timestamp = Now }, + in_counter = 0, + out_counter = 0, + ram_msg_count_prev = RamMsgCount })}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, @@ -717,10 +718,11 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, next_seq_id = NextSeqId, - persistent_count = PersistentCount }) -> + persistent_count = PersistentCount, + rates = #rates { + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate } }) -> [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, @@ -732,10 +734,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {target_ram_msg_count , TargetRamMsgCount}, {ram_msg_count , RamMsgCount}, {ram_index_count , RamIndexCount}, - {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate}, {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount} ]. + {persistent_count , PersistentCount}, + {avg_egress_rate , AvgEgressRate}, + {avg_ingress_rate , AvgIngressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers |