diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 16:41:10 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 16:41:10 +0100 |
commit | 4dbd911a7c5ca7e00ec38dc59f60a70f8aad976f (patch) | |
tree | c514521325b85e4cde027a507df58a595f3ea0fe | |
parent | 95b30665fba2bb6af7d8e4af6ad16be1a855a462 (diff) | |
download | rabbitmq-server-4dbd911a7c5ca7e00ec38dc59f60a70f8aad976f.tar.gz |
optimisation: don't call reduce_memory_use quite so often
specififcally, on requeue and tx_commit_index only call it once at the
end rather than for every single message.
-rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 452e0276..50fa0e26 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), - a(limit_ram_index(State1)). + a(limit_ram_index(reduce_memory_use(State1))). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; @@ -560,17 +560,18 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end)}. requeue(AckTags, State) -> - a(ack(fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), - State2; - ({IsPersistent, Guid}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), - State3 - end, AckTags, State)). + a(reduce_memory_use( + ack(fun (#msg_status { msg = Msg }, State1) -> + {_SeqId, State2} = publish(Msg, true, false, State1), + State2; + ({IsPersistent, Guid}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, true, true, State2), + State3 + end, AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -951,7 +952,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, end, {Acks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], - State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. + reduce_memory_use( + State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }). purge_betas_and_deltas(State = #vqstate { q3 = Q3, index_state = IndexState }) -> @@ -1073,12 +1075,11 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - {SeqId, reduce_memory_use( - State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1})}. + {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1}}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> |