summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-16 19:14:53 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-16 19:14:53 +0000
commitd7424357ff5d05d3307ff491c761befdf18ad95c (patch)
tree80a49eae9205188674fe69313268e0307b57c60e
parent2cef7bbf99a623f978a6fe7f987f01b2460f0306 (diff)
downloadrabbitmq-server-d7424357ff5d05d3307ff491c761befdf18ad95c.tar.gz
Added accountancy for file size and sums thereof plus sums of valid data. This simplifies build_index slightly and complicates remove_message slightly. Also found that in delete_if_empty, was failing to close any file handle open on the file prior to it being deleted.
-rw-r--r--src/rabbit_msg_file.erl6
-rw-r--r--src/rabbit_msg_store.erl176
2 files changed, 107 insertions, 75 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index c0826159..bf367ede 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -59,7 +59,7 @@
-spec(read/2 :: (io_device(), msg_size()) ->
({'ok', {msg_id(), msg()}} | {'error', any()})).
-spec(scan/1 :: (io_device()) ->
- {'ok', [{msg_id(), msg_size(), position()}]}).
+ {'ok', [{msg_id(), msg_size(), position()}], position()}).
-endif.
@@ -95,14 +95,14 @@ scan(FileHdl) -> scan(FileHdl, 0, []).
scan(FileHdl, Offset, Acc) ->
case read_next(FileHdl, Offset) of
- eof -> {ok, Acc};
+ eof -> {ok, Acc, Offset};
{corrupted, NextOffset} ->
scan(FileHdl, NextOffset, Acc);
{ok, {MsgId, TotalSize, NextOffset}} ->
scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
- {ok, Acc}
+ {ok, Acc, Offset}
end.
read_next(FileHdl, Offset) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 798e9d65..b9bffef6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,14 +81,16 @@
file_handle_cache, %% file handle cache
on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
- message_cache %% ets message cache
+ message_cache, %% ets message cache
+ sum_valid_data, %% sum of valid data in all files
+ sum_file_size %% sum of file sizes
}).
-record(msg_location,
{msg_id, ref_count, file, offset, total_size}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right}).
+ {file, valid_total_size, contiguous_top, left, right, file_size}).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
@@ -262,7 +264,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
file_handle_cache = dict:new(),
on_sync = [],
sync_timer_ref = undefined,
- message_cache = MessageCache
+ message_cache = MessageCache,
+ sum_valid_data = 0,
+ sum_file_size = 0
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -341,7 +345,9 @@ handle_call({contains, MsgId}, _From, State) ->
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
- file_summary = FileSummary }) ->
+ file_summary = FileSummary,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
@@ -353,7 +359,8 @@ handle_cast({write, MsgId, Msg},
State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
- right = undefined }] =
+ right = undefined,
+ file_size = FileSize }] =
ets:lookup(FileSummary, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
@@ -363,9 +370,13 @@ handle_cast({write, MsgId, Msg},
end,
true = ets:insert(FileSummary, FSEntry #file_summary {
valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
+ contiguous_top = ContiguousTop1,
+ file_size = FileSize + TotalSize }),
NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(NextOffset, State));
+ noreply(maybe_roll_to_new_file(
+ NextOffset,
+ State #msstate { sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }));
StoreEntry = #msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter
ok = index_update(StoreEntry #msg_location {
@@ -374,19 +385,19 @@ handle_cast({write, MsgId, Msg},
end;
handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
- noreply(
- compact(sets:to_list(
- lists:foldl(
- fun (MsgId, Files1) ->
- case remove_message(MsgId, State) of
- {compact, File} ->
- if CurFile =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end;
- no_compact -> Files1
- end
- end, sets:new(), MsgIds)),
- State));
+ {Files, State1} =
+ lists:foldl(
+ fun (MsgId, {Files1, State2}) ->
+ case remove_message(MsgId, State2) of
+ {compact, File, State3} ->
+ {if CurFile =:= File -> Files1;
+ true -> sets:add_element(File, Files1)
+ end, State3};
+ {no_compact, State3} ->
+ {Files1, State3}
+ end
+ end, {sets:new(), State}, MsgIds),
+ noreply(compact(sets:to_list(Files), State1));
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
@@ -499,7 +510,8 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
+remove_message(MsgId, State = #msstate { file_summary = FileSummary,
+ sum_valid_data = SumValid }) ->
StoreEntry = #msg_location { ref_count = RefCount, file = File,
offset = Offset, total_size = TotalSize } =
index_lookup(MsgId, State),
@@ -515,12 +527,13 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
true = ets:insert(FileSummary, FSEntry #file_summary {
valid_total_size = ValidTotalSize1,
contiguous_top = ContiguousTop1 }),
- {compact, File};
+ {compact, File, State #msstate {
+ sum_valid_data = SumValid - TotalSize }};
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
ok = index_update(StoreEntry #msg_location {
ref_count = RefCount - 1 }, State),
- no_compact
+ {no_compact, State}
end.
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
@@ -752,7 +765,7 @@ is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
- {ok, Messages} = scan_file_for_valid_messages(Dir, FileName),
+ {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
scan_file_for_valid_messages(Dir, FileName) ->
@@ -763,7 +776,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
%% but ignore
file_handle_cache:close(Hdl),
Valid;
- {error, enoent} -> {ok, []};
+ {error, enoent} -> {ok, [], 0};
{error, Reason} -> throw({error,
{unable_to_scan_file, FileName, Reason}})
end.
@@ -790,50 +803,63 @@ build_index([], State) ->
build_index(Files, State) ->
build_index(undefined, Files, [], State).
-build_index(Left, [], FilesToCompact, State) ->
+build_index(Left, [], FilesToCompact, State =
+ #msstate { file_summary = FileSummary }) ->
ok = index_delete_by_file(undefined, State),
- Offset = case index_search_by_file(Left, State) of
- [] -> 0;
- List -> #msg_location { offset = MaxOffset,
- total_size = TotalSize } =
- lists:last(List),
- MaxOffset + TotalSize
+ Offset = case ets:lookup(FileSummary, Left) of
+ [] -> 0;
+ [#file_summary { file_size = FileSize }] -> FileSize
end,
{Offset, compact(FilesToCompact, %% this never includes the current file
State #msstate { current_file = Left })};
build_index(Left, [File|Files], FilesToCompact,
- State = #msstate { dir = Dir, file_summary = FileSummary }) ->
- {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- {ValidMessages, ValidTotalSize, AllValid} =
+ State = #msstate { dir = Dir, file_summary = FileSummary,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
+ {ok, Messages, FileSize} =
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ {ValidMessages, ValidTotalSize} =
lists:foldl(
- fun (Obj = {MsgId, TotalSize, Offset},
- {VMAcc, VTSAcc, AVAcc}) ->
+ fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(MsgId, State) of
- not_found -> {VMAcc, VTSAcc, false};
+ not_found -> {VMAcc, VTSAcc};
StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
- {[Obj | VMAcc], VTSAcc + TotalSize, AVAcc}
+ {[Obj | VMAcc], VTSAcc + TotalSize}
end
- end, {[], 0, Messages =/= []}, Messages),
+ end, {[], 0}, Messages),
%% foldl reverses lists, find_contiguous_block_prefix needs
%% msgs eldest first, so, ValidMessages is the right way round
{ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
- Right = case Files of
- [] -> undefined;
- [F|_] -> F
- end,
- true = ets:insert_new(FileSummary, #file_summary {
- file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- left = Left, right = Right }),
- FilesToCompact1 = case AllValid orelse Right =:= undefined of
- true -> FilesToCompact;
- false -> [File | FilesToCompact]
- end,
- build_index(File, Files, FilesToCompact1, State).
+ {Right, FileSize1} =
+ case Files of
+ %% if it's the last file, we'll truncate to remove any
+ %% rubbish above the last valid message. This affects the
+ %% file size.
+ [] -> {undefined, case ValidMessages of
+ [] -> 0;
+ _ -> {_MsgId, TotalSize, Offset} =
+ lists:last(ValidMessages),
+ Offset + TotalSize
+ end};
+ [F|_] -> {F, FileSize}
+ end,
+ true =
+ ets:insert_new(FileSummary, #file_summary {
+ file = File, valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ left = Left, right = Right, file_size = FileSize1 }),
+ FilesToCompact1 =
+ case FileSize1 == ContiguousTop orelse Right =:= undefined of
+ true -> FilesToCompact;
+ false -> [File | FilesToCompact]
+ end,
+ build_index(File, Files, FilesToCompact1,
+ State #msstate { sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize1 }).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation
@@ -855,7 +881,7 @@ maybe_roll_to_new_file(Offset,
true = ets:insert_new(
FileSummary, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
- left = CurFile, right = undefined }),
+ left = CurFile, right = undefined, file_size = 0 }),
State2 = State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile },
compact([CurFile], State2);
@@ -866,14 +892,14 @@ compact(Files, State) ->
%% smallest number, hence eldest, hence left-most, first
SortedFiles = lists:sort(Files),
%% foldl reverses, so now youngest/right-most first
- RemainingFiles =
- lists:foldl(fun (File, Acc) ->
- case delete_file_if_empty(File, State) of
- true -> Acc;
- false -> [File | Acc]
+ {RemainingFiles, State1} =
+ lists:foldl(fun (File, {Acc, State2}) ->
+ case delete_file_if_empty(File, State2) of
+ {true, State3} -> {Acc, State3};
+ {false, State3} -> {[File | Acc], State3}
end
- end, [], SortedFiles),
- lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
+ end, {[], State}, SortedFiles),
+ lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)).
%% At this stage, we simply know that the file has had msgs removed
%% from it. However, we don't know if we need to merge it left (which
@@ -913,12 +939,14 @@ combine_file(File, State = #msstate { file_summary = FileSummary,
adjust_meta_and_combine(
LeftObj = #file_summary {
- file = LeftFile, valid_total_size = LeftValidData, right = RightFile },
+ file = LeftFile, valid_total_size = LeftValidData, right = RightFile,
+ file_size = LeftFileSize },
RightObj = #file_summary {
file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight },
+ right = RightRight, file_size = RightFileSize },
State = #msstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary }) ->
+ file_summary = FileSummary,
+ sum_file_size = SumFileSize }) ->
TotalValidData = LeftValidData + RightValidData,
if FileSizeLimit >= TotalValidData ->
State1 = combine_files(RightObj, LeftObj, State),
@@ -928,9 +956,12 @@ adjust_meta_and_combine(
true = ets:insert(FileSummary, LeftObj #file_summary {
valid_total_size = TotalValidData,
contiguous_top = TotalValidData,
+ file_size = TotalValidData,
right = RightRight }),
true = ets:delete(FileSummary, RightFile),
- {true, State1};
+ {true, State1 #msstate { sum_file_size =
+ SumFileSize - LeftFileSize - RightFileSize
+ + TotalValidData }};
true -> {false, State}
end.
@@ -980,7 +1011,7 @@ combine_files(#file_summary { file = Source,
DestinationHdl, TmpHdl, Destination, State1),
TmpSize = DestinationValid - DestinationContiguousTop,
%% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocationDets has been updated to
+ %% Destination, and MsgLocation has been updated to
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
{ok, 0} = file_handle_cache:position(TmpHdl, 0),
@@ -1011,7 +1042,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
- %% update MsgLocationDets to reflect change of file and offset
+ %% update MsgLocation to reflect change of file and offset
ok = index_update(StoreEntry #msg_location {
file = Destination,
offset = CurOffset }, State),
@@ -1041,9 +1072,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
ok = file_handle_cache:sync(DestinationHdl),
ok.
-delete_file_if_empty(File,
- #msstate { dir = Dir, file_summary = FileSummary }) ->
- [#file_summary { valid_total_size = ValidData,
+delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
+ sum_file_size = SumFileSize } = State) ->
+ [#file_summary { valid_total_size = ValidData, file_size = FileSize,
left = Left, right = Right }] =
ets:lookup(FileSummary, File),
case ValidData of
@@ -1063,7 +1094,8 @@ delete_file_if_empty(File,
{#file_summary.right, Right})
end,
true = ets:delete(FileSummary, File),
+ State1 = close_handle(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- true;
- _ -> false
+ {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }};
+ _ -> {false, State}
end.