summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-22 09:36:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-22 09:36:55 +0100
commitac9cb2a32a2c3cf63e03a44416dd3a1d9c2969b7 (patch)
tree4d3cf7080d89f17874d8c20a57fd4c7138d08c3a
parent96479285ba6409174ae313ca7bdff491dbf78942 (diff)
downloadrabbitmq-server-ac9cb2a32a2c3cf63e03a44416dd3a1d9c2969b7.tar.gz
refactor: extract rates into separate part of vq state
-rw-r--r--src/rabbit_variable_queue.erl88
1 files changed, 45 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 06094950..8bff66af 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -236,13 +236,11 @@
ram_index_count,
out_counter,
in_counter,
- egress_rate,
- avg_egress_rate,
- ingress_rate,
- avg_ingress_rate,
- rate_timestamp
+ rates
}).
+-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+
-record(msg_status,
{ seq_id,
guid,
@@ -284,6 +282,12 @@
-type(seq_id() :: non_neg_integer()).
-type(ack() :: seq_id() | 'blank_ack').
+-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
+ ingress :: {timestamp(), non_neg_integer()},
+ avg_egress :: float(),
+ avg_ingress :: float(),
+ timestamp :: timestamp() }).
+
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer (),
end_seq_id :: non_neg_integer() }).
@@ -318,12 +322,7 @@
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
- egress_rate :: {timestamp(), non_neg_integer()},
- avg_egress_rate :: float(),
- ingress_rate :: {timestamp(), non_neg_integer()},
- avg_ingress_rate :: float(),
- rate_timestamp :: timestamp()
- }).
+ rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -425,12 +424,11 @@ init(QueueName, IsDurable, _Recover) ->
ram_index_count = 0,
out_counter = 0,
in_counter = 0,
- egress_rate = {Now, 0},
- avg_egress_rate = 0.0,
- ingress_rate = {Now, DeltaCount1},
- avg_ingress_rate = 0.0,
- rate_timestamp = Now
- },
+ rates = #rates { egress = {Now, 0},
+ ingress = {Now, DeltaCount1},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Now } },
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -648,8 +646,8 @@ is_empty(State) -> 0 == len(State).
set_ram_duration_target(DurationTarget,
State = #vqstate {
- avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate,
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
target_ram_msg_count = TargetRamMsgCount }) ->
Rate = AvgEgressRate + AvgIngressRate,
TargetRamMsgCount1 =
@@ -666,14 +664,15 @@ set_ram_duration_target(DurationTarget,
false -> reduce_memory_use(State1)
end).
-ram_duration(State = #vqstate { egress_rate = Egress,
- ingress_rate = Ingress,
- rate_timestamp = Timestamp,
- in_counter = InCount,
- out_counter = OutCount,
- ram_msg_count = RamMsgCount,
- duration_target = DurationTarget,
- ram_msg_count_prev = RamMsgCountPrev }) ->
+ram_duration(State = #vqstate {
+ rates = #rates { egress = Egress,
+ ingress = Ingress,
+ timestamp = Timestamp } = Rates,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
+ duration_target = DurationTarget,
+ ram_msg_count_prev = RamMsgCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
@@ -685,16 +684,18 @@ ram_duration(State = #vqstate { egress_rate = Egress,
(2 * (AvgEgressRate + AvgIngressRate))
end,
- {Duration, set_ram_duration_target(DurationTarget,
- State #vqstate {
- egress_rate = Egress1,
- avg_egress_rate = AvgEgressRate,
- ingress_rate = Ingress1,
- avg_ingress_rate = AvgIngressRate,
- rate_timestamp = Now,
- in_counter = 0,
- out_counter = 0,
- ram_msg_count_prev = RamMsgCount })}.
+ {Duration, set_ram_duration_target(
+ DurationTarget,
+ State #vqstate {
+ rates = Rates #rates {
+ egress = Egress1,
+ ingress = Ingress1,
+ avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate,
+ timestamp = Now },
+ in_counter = 0,
+ out_counter = 0,
+ ram_msg_count_prev = RamMsgCount })}.
needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
{Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
@@ -717,10 +718,11 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
- avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate,
next_seq_id = NextSeqId,
- persistent_count = PersistentCount }) ->
+ persistent_count = PersistentCount,
+ rates = #rates {
+ avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate } }) ->
[ {q1 , queue:len(Q1)},
{q2 , bpqueue:len(Q2)},
{delta , Delta},
@@ -732,10 +734,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{target_ram_msg_count , TargetRamMsgCount},
{ram_msg_count , RamMsgCount},
{ram_index_count , RamIndexCount},
- {avg_egress_rate , AvgEgressRate},
- {avg_ingress_rate , AvgIngressRate},
{next_seq_id , NextSeqId},
- {persistent_count , PersistentCount} ].
+ {persistent_count , PersistentCount},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ingress_rate , AvgIngressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers