summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-02-25 11:45:01 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-02-25 11:45:01 +0000
commit68f38b3473415f7aa82c7261c503f879b5cbb492 (patch)
tree40668b6fb10983e5a776f7bb53c6fbcf4b199050
parentf9d0c870f8199c8be5ffdb300a439e7b91c237fd (diff)
downloadrabbitmq-server-bug26036.tar.gz
minimal changes necessary to assess potential performance gainbug26036
-rw-r--r--src/rabbit_variable_queue.erl53
1 files changed, 36 insertions, 17 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73e9f6b5..5d5d5f5a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -345,8 +345,8 @@
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
- ram_pending_ack :: gb_tree(),
- disk_pending_ack :: gb_tree(),
+ ram_pending_ack :: ?QUEUE:?QUEUE(),
+ disk_pending_ack :: ?QUEUE:?QUEUE(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -729,7 +729,7 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
+ len(State) + ?QUEUE:len(RPA) + ?QUEUE:len(DPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -798,7 +798,7 @@ ram_duration(State) ->
ram_ack_count_prev = RamAckCountPrev } =
update_rates(State),
- RamAckCount = gb_trees:size(RPA),
+ RamAckCount = ?QUEUE:len(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -860,10 +860,10 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
+ {pending_acks , ?QUEUE:len(RPA) + ?QUEUE:len(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RPA)},
+ {ram_ack_count , ?QUEUE:len(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -1071,8 +1071,8 @@ init(IsDurable, IndexState, DeltaCount, Terms,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty(),
+ ram_pending_ack = ?QUEUE:new(),
+ disk_pending_ack = ?QUEUE:new(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1290,10 +1290,13 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
ack_in_counter = AckInCount}) ->
+ %% FIXME SeqId iss not necessarily higher than anything in
+ %% RPA/DPA, due to re-queueing, so we actually need to do a proper
+ %% 'insert' here.
{RPA1, DPA1} =
case Msg of
- undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
- _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ undefined -> {RPA, ?QUEUE:in({SeqId, MsgStatus}, DPA)};
+ _ -> {?QUEUE:in({SeqId, MsgStatus}, RPA), DPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
@@ -1301,6 +1304,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
+ %% FIXME this is only used in ackfold; and when RPA/DPA are queues
+ %% that context is important for optimisation
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> V;
none -> gb_trees:get(SeqId, DPA)
@@ -1308,19 +1313,31 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
- case gb_trees:lookup(SeqId, RPA) of
- {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
- {V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> DPA1 = gb_trees:delete(SeqId, DPA),
- {gb_trees:get(SeqId, DPA),
- State #vqstate { disk_pending_ack = DPA1 }}
+ case remove_pending_ack(SeqId, RPA, []) of
+ {V, RPA1} -> {V, State #vqstate { ram_pending_ack = RPA1 }};
+ not_found -> {V, DPA1} = remove_pending_ack(SeqId, DPA, []),
+ {V, State #vqstate { disk_pending_ack = DPA1 }}
end.
+remove_pending_ack(SeqId, Q, Prefix) ->
+ case ?QUEUE:out(Q) of
+ {{value, {SeqId1, MsgStatus}}, Q1} when SeqId =:= SeqId1 ->
+ {MsgStatus, prefix_queue(Q1, Prefix)};
+ {{value, {SeqId1, _MsgStatus} = V}, Q1} when SeqId < SeqId1 ->
+ remove_pending_ack(SeqId, Q1, [V | Prefix]);
+ _ ->
+ not_found
+ end.
+
+prefix_queue(Q, []) -> Q;
+prefix_queue(Q, P) -> ?QUEUE:join(?QUEUE:from_list(lists:reverse(P)), Q).
+
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
+ %% FIXME
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
@@ -1582,7 +1599,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ack_out = AvgAckEgress } }) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ case chunk_size(RamMsgCount + ?QUEUE:len(RPA), TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1605,6 +1622,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
_ -> {Reduce, State1}
end.
+%% FIXME
limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
@@ -1682,6 +1700,7 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
{loaded, {MsgStatus, State2}}
end.
+%% FIXME
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(State = #vqstate {