diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-12-16 19:14:53 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-12-16 19:14:53 +0000 |
commit | d7424357ff5d05d3307ff491c761befdf18ad95c (patch) | |
tree | 80a49eae9205188674fe69313268e0307b57c60e | |
parent | 2cef7bbf99a623f978a6fe7f987f01b2460f0306 (diff) | |
download | rabbitmq-server-d7424357ff5d05d3307ff491c761befdf18ad95c.tar.gz |
Added accountancy for file size and sums thereof plus sums of valid data. This simplifies build_index slightly and complicates remove_message slightly. Also found that in delete_if_empty, was failing to close any file handle open on the file prior to it being deleted.
-rw-r--r-- | src/rabbit_msg_file.erl | 6 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 176 |
2 files changed, 107 insertions, 75 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index c0826159..bf367ede 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -59,7 +59,7 @@ -spec(read/2 :: (io_device(), msg_size()) -> ({'ok', {msg_id(), msg()}} | {'error', any()})). -spec(scan/1 :: (io_device()) -> - {'ok', [{msg_id(), msg_size(), position()}]}). + {'ok', [{msg_id(), msg_size(), position()}], position()}). -endif. @@ -95,14 +95,14 @@ scan(FileHdl) -> scan(FileHdl, 0, []). scan(FileHdl, Offset, Acc) -> case read_next(FileHdl, Offset) of - eof -> {ok, Acc}; + eof -> {ok, Acc, Offset}; {corrupted, NextOffset} -> scan(FileHdl, NextOffset, Acc); {ok, {MsgId, TotalSize, NextOffset}} -> scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]); _KO -> %% bad message, but we may still have recovered some valid messages - {ok, Acc} + {ok, Acc, Offset} end. read_next(FileHdl, Offset) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 798e9d65..b9bffef6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,14 +81,16 @@ file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer - message_cache %% ets message cache + message_cache, %% ets message cache + sum_valid_data, %% sum of valid data in all files + sum_file_size %% sum of file sizes }). -record(msg_location, {msg_id, ref_count, file, offset, total_size}). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right}). + {file, valid_total_size, contiguous_top, left, right, file_size}). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). @@ -262,7 +264,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> file_handle_cache = dict:new(), on_sync = [], sync_timer_ref = undefined, - message_cache = MessageCache + message_cache = MessageCache, + sum_valid_data = 0, + sum_file_size = 0 }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -341,7 +345,9 @@ handle_call({contains, MsgId}, _From, State) -> handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, - file_summary = FileSummary }) -> + file_summary = FileSummary, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> case index_lookup(MsgId, State) of not_found -> %% New message, lots to do @@ -353,7 +359,8 @@ handle_cast({write, MsgId, Msg}, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, - right = undefined }] = + right = undefined, + file_size = FileSize }] = ets:lookup(FileSummary, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> @@ -363,9 +370,13 @@ handle_cast({write, MsgId, Msg}, end, true = ets:insert(FileSummary, FSEntry #file_summary { valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), + contiguous_top = ContiguousTop1, + file_size = FileSize + TotalSize }), NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file(NextOffset, State)); + noreply(maybe_roll_to_new_file( + NextOffset, + State #msstate { sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize })); StoreEntry = #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter ok = index_update(StoreEntry #msg_location { @@ -374,19 +385,19 @@ handle_cast({write, MsgId, Msg}, end; handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> - noreply( - compact(sets:to_list( - lists:foldl( - fun (MsgId, Files1) -> - case remove_message(MsgId, State) of - {compact, File} -> - if CurFile =:= File -> Files1; - true -> sets:add_element(File, Files1) - end; - no_compact -> Files1 - end - end, sets:new(), MsgIds)), - State)); + {Files, State1} = + lists:foldl( + fun (MsgId, {Files1, State2}) -> + case remove_message(MsgId, State2) of + {compact, File, State3} -> + {if CurFile =:= File -> Files1; + true -> sets:add_element(File, Files1) + end, State3}; + {no_compact, State3} -> + {Files1, State3} + end + end, {sets:new(), State}, MsgIds), + noreply(compact(sets:to_list(Files), State1)); handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), @@ -499,7 +510,8 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> +remove_message(MsgId, State = #msstate { file_summary = FileSummary, + sum_valid_data = SumValid }) -> StoreEntry = #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), @@ -515,12 +527,13 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> true = ets:insert(FileSummary, FSEntry #file_summary { valid_total_size = ValidTotalSize1, contiguous_top = ContiguousTop1 }), - {compact, File}; + {compact, File, State #msstate { + sum_valid_data = SumValid - TotalSize }}; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), ok = index_update(StoreEntry #msg_location { ref_count = RefCount - 1 }, State), - no_compact + {no_compact, State} end. close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> @@ -752,7 +765,7 @@ is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). scan_file_for_valid_messages_msg_ids(Dir, FileName) -> - {ok, Messages} = scan_file_for_valid_messages(Dir, FileName), + {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. scan_file_for_valid_messages(Dir, FileName) -> @@ -763,7 +776,7 @@ scan_file_for_valid_messages(Dir, FileName) -> %% but ignore file_handle_cache:close(Hdl), Valid; - {error, enoent} -> {ok, []}; + {error, enoent} -> {ok, [], 0}; {error, Reason} -> throw({error, {unable_to_scan_file, FileName, Reason}}) end. @@ -790,50 +803,63 @@ build_index([], State) -> build_index(Files, State) -> build_index(undefined, Files, [], State). -build_index(Left, [], FilesToCompact, State) -> +build_index(Left, [], FilesToCompact, State = + #msstate { file_summary = FileSummary }) -> ok = index_delete_by_file(undefined, State), - Offset = case index_search_by_file(Left, State) of - [] -> 0; - List -> #msg_location { offset = MaxOffset, - total_size = TotalSize } = - lists:last(List), - MaxOffset + TotalSize + Offset = case ets:lookup(FileSummary, Left) of + [] -> 0; + [#file_summary { file_size = FileSize }] -> FileSize end, {Offset, compact(FilesToCompact, %% this never includes the current file State #msstate { current_file = Left })}; build_index(Left, [File|Files], FilesToCompact, - State = #msstate { dir = Dir, file_summary = FileSummary }) -> - {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), - {ValidMessages, ValidTotalSize, AllValid} = + State = #msstate { dir = Dir, file_summary = FileSummary, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> + {ok, Messages, FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {MsgId, TotalSize, Offset}, - {VMAcc, VTSAcc, AVAcc}) -> + fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case index_lookup(MsgId, State) of - not_found -> {VMAcc, VTSAcc, false}; + not_found -> {VMAcc, VTSAcc}; StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, total_size = TotalSize }, State), - {[Obj | VMAcc], VTSAcc + TotalSize, AVAcc} + {[Obj | VMAcc], VTSAcc + TotalSize} end - end, {[], 0, Messages =/= []}, Messages), + end, {[], 0}, Messages), %% foldl reverses lists, find_contiguous_block_prefix needs %% msgs eldest first, so, ValidMessages is the right way round {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), - Right = case Files of - [] -> undefined; - [F|_] -> F - end, - true = ets:insert_new(FileSummary, #file_summary { - file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - left = Left, right = Right }), - FilesToCompact1 = case AllValid orelse Right =:= undefined of - true -> FilesToCompact; - false -> [File | FilesToCompact] - end, - build_index(File, Files, FilesToCompact1, State). + {Right, FileSize1} = + case Files of + %% if it's the last file, we'll truncate to remove any + %% rubbish above the last valid message. This affects the + %% file size. + [] -> {undefined, case ValidMessages of + [] -> 0; + _ -> {_MsgId, TotalSize, Offset} = + lists:last(ValidMessages), + Offset + TotalSize + end}; + [F|_] -> {F, FileSize} + end, + true = + ets:insert_new(FileSummary, #file_summary { + file = File, valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + left = Left, right = Right, file_size = FileSize1 }), + FilesToCompact1 = + case FileSize1 == ContiguousTop orelse Right =:= undefined of + true -> FilesToCompact; + false -> [File | FilesToCompact] + end, + build_index(File, Files, FilesToCompact1, + State #msstate { sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize1 }). %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation @@ -855,7 +881,7 @@ maybe_roll_to_new_file(Offset, true = ets:insert_new( FileSummary, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined }), + left = CurFile, right = undefined, file_size = 0 }), State2 = State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }, compact([CurFile], State2); @@ -866,14 +892,14 @@ compact(Files, State) -> %% smallest number, hence eldest, hence left-most, first SortedFiles = lists:sort(Files), %% foldl reverses, so now youngest/right-most first - RemainingFiles = - lists:foldl(fun (File, Acc) -> - case delete_file_if_empty(File, State) of - true -> Acc; - false -> [File | Acc] + {RemainingFiles, State1} = + lists:foldl(fun (File, {Acc, State2}) -> + case delete_file_if_empty(File, State2) of + {true, State3} -> {Acc, State3}; + {false, State3} -> {[File | Acc], State3} end - end, [], SortedFiles), - lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). + end, {[], State}, SortedFiles), + lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)). %% At this stage, we simply know that the file has had msgs removed %% from it. However, we don't know if we need to merge it left (which @@ -913,12 +939,14 @@ combine_file(File, State = #msstate { file_summary = FileSummary, adjust_meta_and_combine( LeftObj = #file_summary { - file = LeftFile, valid_total_size = LeftValidData, right = RightFile }, + file = LeftFile, valid_total_size = LeftValidData, right = RightFile, + file_size = LeftFileSize }, RightObj = #file_summary { file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight }, + right = RightRight, file_size = RightFileSize }, State = #msstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary }) -> + file_summary = FileSummary, + sum_file_size = SumFileSize }) -> TotalValidData = LeftValidData + RightValidData, if FileSizeLimit >= TotalValidData -> State1 = combine_files(RightObj, LeftObj, State), @@ -928,9 +956,12 @@ adjust_meta_and_combine( true = ets:insert(FileSummary, LeftObj #file_summary { valid_total_size = TotalValidData, contiguous_top = TotalValidData, + file_size = TotalValidData, right = RightRight }), true = ets:delete(FileSummary, RightFile), - {true, State1}; + {true, State1 #msstate { sum_file_size = + SumFileSize - LeftFileSize - RightFileSize + + TotalValidData }}; true -> {false, State} end. @@ -980,7 +1011,7 @@ combine_files(#file_summary { file = Source, DestinationHdl, TmpHdl, Destination, State1), TmpSize = DestinationValid - DestinationContiguousTop, %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocationDets has been updated to + %% Destination, and MsgLocation has been updated to %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file_handle_cache:position(TmpHdl, 0), @@ -1011,7 +1042,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile - %% update MsgLocationDets to reflect change of file and offset + %% update MsgLocation to reflect change of file and offset ok = index_update(StoreEntry #msg_location { file = Destination, offset = CurOffset }, State), @@ -1041,9 +1072,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, ok = file_handle_cache:sync(DestinationHdl), ok. -delete_file_if_empty(File, - #msstate { dir = Dir, file_summary = FileSummary }) -> - [#file_summary { valid_total_size = ValidData, +delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, + sum_file_size = SumFileSize } = State) -> + [#file_summary { valid_total_size = ValidData, file_size = FileSize, left = Left, right = Right }] = ets:lookup(FileSummary, File), case ValidData of @@ -1063,7 +1094,8 @@ delete_file_if_empty(File, {#file_summary.right, Right}) end, true = ets:delete(FileSummary, File), + State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), - true; - _ -> false + {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }}; + _ -> {false, State} end. |