summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-26 18:19:34 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-26 18:19:34 +0100
commit9dde46a3958bcf4dab1fa0b2c8b7f2ae7ffde623 (patch)
tree6dfc3174c2c6a26b09aeae52b1b0cc4e013f8044
parentf5b1b57521a7c2a79f60879fd0895916f0697b27 (diff)
downloadrabbitmq-server-bug20470.tar.gz
Had been thinking about this optimisation for a while but someone mentioned it to me yesterday at the Erlang Factory conference.bug20470
When you sync, you know that everything up to the current state of the file is sync'd. Given that we're always appending, we know that any message before the current length of the file is available. Thus when we're reading messages from the current write file, even if the file is dirty, we don't need to sync unless the message we're reading is beyond the length of the file at the last sync. This can be very effective, for example, if there are a few hundred messages in the queue and then you're reading and writing to the queue at the same rate, then this will mean that rather than doing a sync for every read, we now only sync once per size of queue (altitude or ramp size). Sure enough, my publish_one_in_one_out_receive(1000) (altitude of 1000, then 5000 @ one in, one out) reduces from 6089 calls to fsync to 21, and from 15.4 seconds to 3.6. It's also possible to apply the same optimisation in tx_commit - not only do we now return immediately if the current file is not dirty or if none of the messages in the txn are in the current file, but we can also return immediately if the current file is dirty and messages are in the current file, but they're all below the last sync file size. Surprising very little extra code needed.
-rw-r--r--src/rabbit_disk_queue.erl42
1 files changed, 26 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ac58d89d..2a7505a7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -86,7 +86,8 @@
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
on_sync_froms, %% list of commiters to run on sync (reversed)
- timer_ref %% TRef for our interval timer
+ timer_ref, %% TRef for our interval timer
+ last_sync_offset %% current_offset at the last time we sync'd
}).
%% The components:
@@ -394,7 +395,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
on_sync_froms = [],
- timer_ref = undefined
+ timer_ref = undefined,
+ last_sync_offset = 0
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -648,13 +650,14 @@ determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
when NextWrite >= CurrentWrite ->
CurrentRead.
-get_read_handle(File, State =
+get_read_handle(File, Offset, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
- current_dirty = IsDirty
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset
}) ->
- State1 = if CurName =:= File andalso IsDirty ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
sync_current_file_handle(State);
true -> State
end,
@@ -727,15 +730,19 @@ sync_current_file_handle(State = #dqstate { current_dirty = false,
State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
- on_sync_froms = Froms
+ current_offset = CurOffset,
+ on_sync_froms = Froms,
+ last_sync_offset = SyncOffset
}) ->
- ok = case IsDirty of
- true -> file:sync(CurHdl);
- false -> ok
- end,
+ SyncOffset1 = case IsDirty of
+ true -> ok = file:sync(CurHdl),
+ CurOffset;
+ false -> SyncOffset
+ end,
lists:map(fun (From) -> gen_server2:reply(From, ok) end,
lists:reverse(Froms)),
- State #dqstate { current_dirty = false, on_sync_froms = [] }.
+ State #dqstate { current_dirty = false, on_sync_froms = [],
+ last_sync_offset = SyncOffset1 }.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -776,7 +783,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
end,
case ReadMsg of
true ->
- {FileHdl, State1} = get_read_handle(File, State),
+ {FileHdl, State1} = get_read_handle(File, Offset, State),
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
{ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
@@ -883,7 +890,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
State = #dqstate { sequences = Sequences,
current_file_name = CurFile,
current_dirty = IsDirty,
- on_sync_froms = SyncFroms
+ on_sync_froms = SyncFroms,
+ last_sync_offset = SyncOffset
}) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
@@ -909,7 +917,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
{InCurFileAcc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset,
+ [{MsgId, _RefCount, File, Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
@@ -924,7 +932,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
next_seq_id = NextSeqId1
},
write),
- {InCurFileAcc orelse File =:= CurFile,
+ {InCurFileAcc orelse (File =:= CurFile andalso
+ Offset >= SyncOffset),
NextSeqId1}
end, {false, PubAcc}, PubList),
{ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
@@ -1126,7 +1135,8 @@ maybe_roll_to_new_file(Offset,
State2 = State1 #dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
- current_offset = 0
+ current_offset = 0,
+ last_sync_offset = 0
},
{ok, compact(sets:from_list([CurName]), State2)};
maybe_roll_to_new_file(_, State) ->