diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-25 11:45:01 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-25 11:45:01 +0000 |
commit | 68f38b3473415f7aa82c7261c503f879b5cbb492 (patch) | |
tree | 40668b6fb10983e5a776f7bb53c6fbcf4b199050 | |
parent | f9d0c870f8199c8be5ffdb300a439e7b91c237fd (diff) | |
download | rabbitmq-server-bug26036.tar.gz |
minimal changes necessary to assess potential performance gainbug26036
-rw-r--r-- | src/rabbit_variable_queue.erl | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73e9f6b5..5d5d5f5a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -345,8 +345,8 @@ q3 :: ?QUEUE:?QUEUE(), q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), - ram_pending_ack :: gb_tree(), - disk_pending_ack :: gb_tree(), + ram_pending_ack :: ?QUEUE:?QUEUE(), + disk_pending_ack :: ?QUEUE:?QUEUE(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -729,7 +729,7 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA). + len(State) + ?QUEUE:len(RPA) + ?QUEUE:len(DPA). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -798,7 +798,7 @@ ram_duration(State) -> ram_ack_count_prev = RamAckCountPrev } = update_rates(State), - RamAckCount = gb_trees:size(RPA), + RamAckCount = ?QUEUE:len(RPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso @@ -860,10 +860,10 @@ status(#vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, {len , Len}, - {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, + {pending_acks , ?QUEUE:len(RPA) + ?QUEUE:len(DPA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RPA)}, + {ram_ack_count , ?QUEUE:len(RPA)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -1071,8 +1071,8 @@ init(IsDurable, IndexState, DeltaCount, Terms, q3 = ?QUEUE:new(), q4 = ?QUEUE:new(), next_seq_id = NextSeqId, - ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), + ram_pending_ack = ?QUEUE:new(), + disk_pending_ack = ?QUEUE:new(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1290,10 +1290,13 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, ack_in_counter = AckInCount}) -> + %% FIXME SeqId iss not necessarily higher than anything in + %% RPA/DPA, due to re-queueing, so we actually need to do a proper + %% 'insert' here. {RPA1, DPA1} = case Msg of - undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)}; - _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA} + undefined -> {RPA, ?QUEUE:in({SeqId, MsgStatus}, DPA)}; + _ -> {?QUEUE:in({SeqId, MsgStatus}, RPA), DPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, @@ -1301,6 +1304,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> + %% FIXME this is only used in ackfold; and when RPA/DPA are queues + %% that context is important for optimisation case gb_trees:lookup(SeqId, RPA) of {value, V} -> V; none -> gb_trees:get(SeqId, DPA) @@ -1308,19 +1313,31 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), - {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> DPA1 = gb_trees:delete(SeqId, DPA), - {gb_trees:get(SeqId, DPA), - State #vqstate { disk_pending_ack = DPA1 }} + case remove_pending_ack(SeqId, RPA, []) of + {V, RPA1} -> {V, State #vqstate { ram_pending_ack = RPA1 }}; + not_found -> {V, DPA1} = remove_pending_ack(SeqId, DPA, []), + {V, State #vqstate { disk_pending_ack = DPA1 }} end. +remove_pending_ack(SeqId, Q, Prefix) -> + case ?QUEUE:out(Q) of + {{value, {SeqId1, MsgStatus}}, Q1} when SeqId =:= SeqId1 -> + {MsgStatus, prefix_queue(Q1, Prefix)}; + {{value, {SeqId1, _MsgStatus} = V}, Q1} when SeqId < SeqId1 -> + remove_pending_ack(SeqId, Q1, [V | Prefix]); + _ -> + not_found + end. + +prefix_queue(Q, []) -> Q; +prefix_queue(Q, P) -> ?QUEUE:join(?QUEUE:from_list(lists:reverse(P)), Q). + purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, index_state = IndexState, msg_store_clients = MSCState }) -> + %% FIXME F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( @@ -1582,7 +1599,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ack_out = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of + case chunk_size(RamMsgCount + ?QUEUE:len(RPA), TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever @@ -1605,6 +1622,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, _ -> {Reduce, State1} end. +%% FIXME limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, @@ -1682,6 +1700,7 @@ fetch_from_q3(State = #vqstate { q1 = Q1, {loaded, {MsgStatus, State2}} end. +%% FIXME maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; maybe_deltas_to_betas(State = #vqstate { |