From 0cc5117167d0857cf6aaf3f53f58f372394b34c2 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Dec 2009 18:42:12 +0000 Subject: All sorts of stuff. See the bug. GC is off here (other that deleting empty files). File locking is in. Some other machinery is in wrt background GC. Lots of reworking of the GC code to get it into a more useful state (everything from adjust_meta downwards). Tests do actually pass, but with GC otherwise off, you'd be mad to run this. --- src/rabbit_misc.erl | 6 +- src/rabbit_msg_store.erl | 459 +++++++++++++++++++++++++++-------------------- 2 files changed, 271 insertions(+), 194 deletions(-) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 97c96fc7..9f74f604 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -487,9 +487,9 @@ unfold(Fun, Acc, Init) -> ceil(N) -> T = trunc(N), - case N - T of - 0 -> N; - _ -> 1 + T + case N == T of + true -> T; + false -> 1 + T end. %% Sorts a list of AMQP table fields as per the AMQP spec diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 0702cf36..b8373fd1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -83,20 +83,30 @@ sync_timer_ref, %% TRef for our interval timer message_cache, %% ets message cache sum_valid_data, %% sum of valid data in all files - sum_file_size %% sum of file sizes - }). + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_pid %% pid of the GC process + }). -record(msg_location, {msg_id, ref_count, file, offset, total_size}). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size}). + {file, valid_total_size, contiguous_top, left, right, file_size, + locked}). + +-record(gcstate, + {dir + }). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +%% We run GC whenever the amount of garbage is >= GARBAGE_FRACTION * +%% Total Valid Data +-define(GARBAGE_FRACTION, 1.0). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -248,11 +258,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), MsgLocations = ets:new(?MSG_LOC_NAME, - [set, private, {keypos, #msg_location.msg_id}]), + [set, protected, {keypos, #msg_location.msg_id}]), InitFile = 0, FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private, {keypos, #file_summary.file}]), + [ordered_set, protected, + {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), State = #msstate { dir = Dir, @@ -266,7 +277,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> sync_timer_ref = undefined, message_cache = MessageCache, sum_valid_data = 0, - sum_file_size = 0 + sum_file_size = 0, + pending_gc_completion = [], + gc_pid = undefined }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -289,52 +302,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, State1 #msstate { current_file_handle = FileHdl }}. -handle_call({read, MsgId}, _From, State = - #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> - {Result, State1} = - case index_lookup(MsgId, State) of - not_found -> {not_found, State}; - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - ok = case CurFile =:= File andalso {ok, Offset} >= - file_handle_cache:current_raw_offset(CurHdl) of - true -> file_handle_cache:flush(CurHdl); - false -> ok - end, - {Hdl, State2} = get_read_handle(File, State), - {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, {misread, [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()} - ]}}) - end, - ok = case RefCount > 1 of - true -> - insert_into_cache(MsgId, Msg, State2); - false -> - %% it's not in the cache and we - %% only have one reference to the - %% message. So don't bother - %% putting it in the cache. - ok - end, - {{ok, Msg}, State2}; - {Msg, _RefCount} -> - {{ok, Msg}, State} - end - end, - reply(Result, State1); +handle_call({read, MsgId}, From, State) -> + case read_message(MsgId, State) of + {ok, Msg, State1} -> reply({ok, Msg}, State1); + {blocked, State1} -> noreply(add_to_pending_gc_completion( + {read, MsgId, From}, State1)) + end; handle_call({contains, MsgId}, _From, State) -> reply(case index_lookup(MsgId, State) of @@ -360,6 +333,7 @@ handle_cast({write, MsgId, Msg}, [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined, + locked = false, file_size = FileSize }] = ets:lookup(FileSummary, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, @@ -373,10 +347,11 @@ handle_cast({write, MsgId, Msg}, contiguous_top = ContiguousTop1, file_size = FileSize + TotalSize }), NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, - State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + noreply(maybe_compact(maybe_roll_to_new_file( + NextOffset, State #msstate + { sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize } + ))); StoreEntry = #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter ok = index_update(StoreEntry #msg_location { @@ -384,20 +359,11 @@ handle_cast({write, MsgId, Msg}, noreply(State) end; -handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> - {Files, State1} = - lists:foldl( - fun (MsgId, {Files1, State2}) -> - case remove_message(MsgId, State2) of - {compact, File, State3} -> - {if CurFile =:= File -> Files1; - true -> sets:add_element(File, Files1) - end, State3}; - {no_compact, State3} -> - {Files1, State3} - end - end, {sets:new(), State}, MsgIds), - noreply(compact(sets:to_list(Files), State1)); +handle_cast({remove, MsgIds}, State) -> + State1 = lists:foldl( + fun (MsgId, State2) -> remove_message(MsgId, State2) end, + State, MsgIds), + noreply(maybe_compact(State1)); handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), @@ -421,11 +387,23 @@ handle_cast({sync, MsgIds, K}, handle_cast(sync, State) -> noreply(sync(State)). +%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations}, +%% State = #msstate { file_summary = FileSummary, +%% gc_pid = GCPid }) -> +%% true = ets:delete(FileSummary, DeletedFile), +%% true = ets:insert(FileSummary, RemainingFile), +%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations), +%% noreply(maybe_compact(run_pending(State1))). + handle_info(timeout, State) -> noreply(sync(State)); handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_info({'EXIT', _Pid, normal}, State) -> + %% this is just the GC process going down noreply(State). terminate(_Reason, State = #msstate { msg_locations = MsgLocations, @@ -510,6 +488,63 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +read_message(MsgId, State = + #msstate { current_file = CurFile, + current_file_handle = CurHdl, + file_summary = FileSummary }) -> + case index_lookup(MsgId, State) of + not_found -> {ok, not_found, State}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + [#file_summary { locked = Locked }] = + ets:lookup(FileSummary, File), + case Locked of + true -> + {blocked, State}; + false -> + ok = case CurFile =:= File andalso {ok, Offset} >= + file_handle_cache:current_raw_offset( + CurHdl) of + true -> file_handle_cache:flush(CurHdl); + false -> ok + end, + {Hdl, State1} = get_read_handle(File, State), + {ok, Offset} = + file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, {misread, + [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg, State1); + false -> + %% it's not in the cache and + %% we only have one reference + %% to the message. So don't + %% bother putting it in the + %% cache. + ok + end, + {ok, Msg, State1} + end; + {Msg, _RefCount} -> + {ok, Msg, State} + end + end. + remove_message(MsgId, State = #msstate { file_summary = FileSummary, sum_valid_data = SumValid }) -> StoreEntry = #msg_location { ref_count = RefCount, file = File, @@ -517,25 +552,50 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary, index_lookup(MsgId, State), case RefCount of 1 -> - ok = index_delete(MsgId, State), ok = remove_cache_entry(MsgId, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop }] = + contiguous_top = ContiguousTop, + locked = Locked }] = ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - {compact, File, State #msstate { - sum_valid_data = SumValid - TotalSize }}; + case Locked of + true -> + add_to_pending_gc_completion({remove, MsgId}, State); + false -> + ok = index_delete(MsgId, State), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:insert( + FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + State1 = delete_file_if_empty(File, State), + State1 #msstate { sum_valid_data = SumValid - TotalSize } + end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount - 1 }, State), - {no_compact, State} + ok = index_update(StoreEntry #msg_location + { ref_count = RefCount - 1 }, State), + State end. +add_to_pending_gc_completion( + Op, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = [Op, Pending] }. + +run_pending(State = #msstate { pending_gc_completion = Pending }) -> + State1 = State #msstate { pending_gc_completion = [] }, + lists:foldl(fun run_pending/2, State1, Pending). + +run_pending({read, MsgId, From}, State) -> + case read_message(MsgId, State) of + {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}), + State1; + {blocked, State1} -> add_to_pending_gc_completion( + {read, MsgId, From}, State1) + end; +run_pending({remove, MsgId}, State) -> + remove_message(MsgId, State). + close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> case dict:find(Key, FHC) of {ok, Hdl} -> @@ -791,20 +851,19 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. build_index([], State) -> - build_index(undefined, [State #msstate.current_file], [], State); + build_index(undefined, [State #msstate.current_file], State); build_index(Files, State) -> - build_index(undefined, Files, [], State). + {Offset, State1} = build_index(undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. -build_index(Left, [], FilesToCompact, State = - #msstate { file_summary = FileSummary }) -> +build_index(Left, [], State = #msstate { file_summary = FileSummary }) -> ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummary, Left) of [] -> 0; [#file_summary { file_size = FileSize }] -> FileSize end, - {Offset, compact(FilesToCompact, %% this never includes the current file - State #msstate { current_file = Left })}; -build_index(Left, [File|Files], FilesToCompact, + {Offset, State #msstate { current_file = Left }}; +build_index(Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> @@ -842,14 +901,9 @@ build_index(Left, [File|Files], FilesToCompact, true = ets:insert_new(FileSummary, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, + contiguous_top = ContiguousTop, locked = false, left = Left, right = Right, file_size = FileSize1 }), - FilesToCompact1 = - case FileSize1 == ContiguousTop orelse Right =:= undefined of - true -> FilesToCompact; - false -> [File | FilesToCompact] - end, - build_index(File, Files, FilesToCompact1, + build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -873,13 +927,27 @@ maybe_roll_to_new_file(Offset, true = ets:insert_new( FileSummary, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined, file_size = 0 }), - State2 = State1 #msstate { current_file_handle = NextHdl, - current_file = NextFile }, - compact([CurFile], State2); + left = CurFile, right = undefined, file_size = 0, + locked = false }), + State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile }; maybe_roll_to_new_file(_, State) -> State. +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = undefined, + file_summary = FileSummary }) + when (SumFileSize - SumValid) > ?GARBAGE_FRACTION * SumValid -> + %% Pid = spawn_link(fun() -> + %% io:format("GC process!~n") + %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}), + %% end), + %% State #msstate { gc_pid = Pid }; + State; +maybe_compact(State) -> + State. + compact(Files, State) -> %% smallest number, hence eldest, hence left-most, first SortedFiles = lists:sort(Files), @@ -932,30 +1000,25 @@ combine_file(File, State = #msstate { file_summary = FileSummary, adjust_meta_and_combine( LeftObj = #file_summary { file = LeftFile, valid_total_size = LeftValidData, right = RightFile, - file_size = LeftFileSize }, + file_size = LeftFileSize, locked = true }, RightObj = #file_summary { file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight, file_size = RightFileSize }, - State = #msstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary, - sum_file_size = SumFileSize }) -> + right = RightRight, file_size = RightFileSize, locked = true }, + State) -> TotalValidData = LeftValidData + RightValidData, - if FileSizeLimit >= TotalValidData -> - State1 = combine_files(RightObj, LeftObj, State), - %% this could fail if RightRight is undefined - ets:update_element(FileSummary, RightRight, - {#file_summary.left, LeftFile}), - true = ets:insert(FileSummary, LeftObj #file_summary { - valid_total_size = TotalValidData, - contiguous_top = TotalValidData, - file_size = TotalValidData, - right = RightRight }), - true = ets:delete(FileSummary, RightFile), - {true, State1 #msstate { sum_file_size = - SumFileSize - LeftFileSize - RightFileSize - + TotalValidData }}; - true -> {false, State} - end. + {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State), + %% %% this could fail if RightRight is undefined + %% ets:update_element(FileSummary, RightRight, + %% {#file_summary.left, LeftFile}), + %% true = ets:delete(FileSummary, RightFile), + LeftObj1 = LeftObj #file_summary { + valid_total_size = TotalValidData, + contiguous_top = TotalValidData, + file_size = TotalValidData, + right = RightRight }, + {RightFile, LeftObj1, NewMsgLocs, + TotalValidData - LeftFileSize - RightFileSize, + State1}. combine_files(#file_summary { file = Source, valid_total_size = SourceValid, @@ -964,7 +1027,7 @@ combine_files(#file_summary { file = Source, valid_total_size = DestinationValid, contiguous_top = DestinationContiguousTop, right = Source }, - State = #msstate { dir = Dir }) -> + State = #gcstate { dir = Dir }) -> State1 = close_handle(Source, close_handle(Destination, State)), SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -978,54 +1041,62 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); - true -> - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect - %% that the list should be naturally sorted - %% as we require, however, we need to - %% enforce it anyway - end, find_unremoved_messages_in_file(Destination, State1)), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State1), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocation has been updated to - %% reflect compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:close(TmpHdl), - ok = file:delete(form_filename(Dir, Tmp)) - end, + NewDestLocs = + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize), + []; + true -> + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if + %% it was then DestinationContiguousTop + %% would have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I + %% suspect that the list should be + %% naturally sorted as we require, + %% however, we need to enforce it anyway + end, + find_unremoved_messages_in_file(Destination, State1)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = + open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + {ok, NewDestLocs1} = + copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and NewDestLocs1 contains + %% msg_locations reflecting the compaction of + %% Destination so truncate Destination and copy from + %% Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:close(TmpHdl), + ok = file:delete(form_filename(Dir, Tmp)), + NewDestLocs1 + end, SourceWorkList = find_unremoved_messages_in_file(Source, State1), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State1), + {ok, NewSourceLocs} = + copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination), %% tidy up ok = file_handle_cache:close(SourceHdl), ok = file_handle_cache:close(DestinationHdl), ok = file:delete(form_filename(Dir, SourceName)), - State1. + {[NewDestLocs, NewSourceLocs], State1}. -find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> +find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) -> %% Msgs here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1039,37 +1110,41 @@ find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> end, [], Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, State) -> - {FinalOffset, BlockStart1, BlockEnd1} = + Destination) -> + {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} = lists:foldl( fun (StoreEntry = #msg_location { offset = Offset, total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> + {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = index_update(StoreEntry #msg_location { - file = Destination, - offset = CurOffset }, State), + NewMsgLocs1 = + [StoreEntry #msg_location { + file = Destination, + offset = CurOffset } | NewMsgLocs], NextOffset = CurOffset + TotalSize, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the next - %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work for - %% the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + TotalSize} - end - end, {InitOffset, undefined, undefined}, WorkList), + {BlockStart2, BlockEnd2} = + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work + %% for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, + BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), + {Offset, Offset + TotalSize} + end, + {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1} + end, {InitOffset, undefined, undefined, []}, WorkList), case WorkList of [] -> ok; @@ -1082,8 +1157,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), ok = file_handle_cache:sync(DestinationHdl) end, - ok. + {ok, NewMsgLocations}. +delete_file_if_empty(File, State = #msstate { current_file = File }) -> + State; delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, sum_file_size = SumFileSize } = State) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, @@ -1108,6 +1185,6 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, true = ets:delete(FileSummary, File), State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), - {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }}; - _ -> {false, State} + State1 #msstate { sum_file_size = SumFileSize - FileSize }; + _ -> State end. -- cgit v1.2.1 From ae3a821babc64cb1c464fec9c9acebb3936ca2ae Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Dec 2009 18:50:02 +0000 Subject: geometric distribution --- src/random_distributions.erl | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/random_distributions.erl diff --git a/src/random_distributions.erl b/src/random_distributions.erl new file mode 100644 index 00000000..dfcdc834 --- /dev/null +++ b/src/random_distributions.erl @@ -0,0 +1,38 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(random_distributions). + +-export([geometric/1]). + +geometric(P) when 0.0 < P andalso P < 1.0 -> + U = 1.0 - random:uniform(), + rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)). -- cgit v1.2.1 From 509de3766e907871aefd699f4439025c9a7e7707 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 18 Dec 2009 11:49:21 +0000 Subject: correct condition for doing GC --- src/rabbit_msg_store.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b8373fd1..0b711b13 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -104,9 +104,8 @@ -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(CACHE_ETS_NAME, rabbit_disk_queue_cache). -%% We run GC whenever the amount of garbage is >= GARBAGE_FRACTION * -%% Total Valid Data --define(GARBAGE_FRACTION, 1.0). +%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION +-define(GARBAGE_FRACTION, 0.5). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -938,7 +937,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, sum_file_size = SumFileSize, gc_pid = undefined, file_summary = FileSummary }) - when (SumFileSize - SumValid) > ?GARBAGE_FRACTION * SumValid -> + when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% Pid = spawn_link(fun() -> %% io:format("GC process!~n") %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}), -- cgit v1.2.1 From 7fd06f90d82f96a74cb3ea90f83baf55e8f74188 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 18 Dec 2009 17:21:43 +0000 Subject: support insomniation --- src/gen_server2.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 53edf8de..c7250827 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -126,6 +126,7 @@ %%% handle_pre_hibernate(State) %%% %%% ==> {hibernate, State} +%%% {insomniate, State} %%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% @@ -545,6 +546,9 @@ pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> {hibernate, NState} -> hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, Debug); + {insomniate, NState} -> + process_next_msg(Parent, Name, NState, Mod, hibernate, + TimeoutState, Queue, Debug); Reply -> handle_common_termination(Reply, Name, pre_hibernate, Mod, State, Debug) -- cgit v1.2.1 From b06aa9199fa6b424fe6a000bef7f785be7acbf06 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 18 Dec 2009 17:32:16 +0000 Subject: document insomniation and also ready the msg_store for such gadgetry --- src/gen_server2.erl | 9 ++++++--- src/rabbit_msg_store.erl | 37 ++++++++++++++++++++++++++----------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c7250827..c4806151 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -25,8 +25,11 @@ %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be %% called immediately prior to and post hibernation, respectively. If %% handle_pre_hibernate returns {hibernate, NewState} then the process -%% will hibernate. If the module does not implement -%% handle_pre_hibernate/1 then the default action is to hibernate. +%% will hibernate. If handle_pre_hibernate returns {insomniate, +%% NewState} then the process will go around again, trying to receive +%% for up to the current timeout value before attempting to hibernate +%% again. If the module does not implement handle_pre_hibernate/1 then +%% the default action is to hibernate. %% %% 6) init can return a 4th arg, {backoff, InitialTimeout, %% MinimumTimeout, DesiredHibernatePeriod} (all in @@ -36,7 +39,7 @@ %% InitialTimeout supplied from init). After this timeout has %% occurred, hibernation will occur as normal. Upon awaking, a new %% current timeout value will be calculated. -%% +%% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the %% timeout value repeatedly if it is unable to sleep for the diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 0b711b13..6306ac32 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -39,7 +39,7 @@ -export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, handle_pre_hibernate/1]). -define(SERVER, ?MODULE). @@ -112,6 +112,9 @@ -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). -define(WRITE_MODE, [write]). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + %% The components: %% %% MsgLocation: this is an ets table which contains: @@ -299,7 +302,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - {ok, State1 #msstate { current_file_handle = FileHdl }}. + {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({read, MsgId}, From, State) -> case read_message(MsgId, State) of @@ -424,6 +428,13 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations, code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate(State) -> + {Result, State1} = maybe_compact1(State), + {case Result of + true -> insomniate; + false -> hibernate + end, State1}. + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -437,11 +448,11 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, infinity}; + {State, hibernate}; next_state(State = #msstate { sync_timer_ref = undefined }) -> {start_sync_timer(State), 0}; next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), infinity}; + {stop_sync_timer(State), hibernate}; next_state(State) -> {State, 0}. @@ -933,19 +944,23 @@ maybe_roll_to_new_file(Offset, maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_pid = undefined, - file_summary = FileSummary }) +maybe_compact(State) -> + {_Bool, State1} = maybe_compact1(State), + State1. + +maybe_compact1(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = undefined, + file_summary = FileSummary }) when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% Pid = spawn_link(fun() -> %% io:format("GC process!~n") %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}), %% end), %% State #msstate { gc_pid = Pid }; - State; -maybe_compact(State) -> - State. + {true, State}; +maybe_compact1(State) -> + {false, State}. compact(Files, State) -> %% smallest number, hence eldest, hence left-most, first -- cgit v1.2.1