summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-22 16:41:10 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-22 16:41:10 +0100
commit4dbd911a7c5ca7e00ec38dc59f60a70f8aad976f (patch)
treec514521325b85e4cde027a507df58a595f3ea0fe
parent95b30665fba2bb6af7d8e4af6ad16be1a855a462 (diff)
downloadrabbitmq-server-4dbd911a7c5ca7e00ec38dc59f60a70f8aad976f.tar.gz
optimisation: don't call reduce_memory_use quite so often
specififcally, on requeue and tx_commit_index only call it once at the end rather than for every single message.
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 20 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 452e0276..50fa0e26 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
- a(limit_ram_index(State1)).
+ a(limit_ram_index(reduce_memory_use(State1))).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
@@ -560,17 +560,18 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end)}.
requeue(AckTags, State) ->
- a(ack(fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
- State2;
- ({IsPersistent, Guid}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
- State3
- end, AckTags, State)).
+ a(reduce_memory_use(
+ ack(fun (#msg_status { msg = Msg }, State1) ->
+ {_SeqId, State2} = publish(Msg, true, false, State1),
+ State2;
+ ({IsPersistent, Guid}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, true, true, State2),
+ State3
+ end, AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -951,7 +952,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
end, {Acks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
- State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
+ reduce_memory_use(
+ State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }).
purge_betas_and_deltas(State = #vqstate { q3 = Q3,
index_state = IndexState }) ->
@@ -1073,12 +1075,11 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, reduce_memory_use(
- State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1})}.
+ {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1}}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->