diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-12-19 16:37:52 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-12-19 16:37:52 +0000 |
commit | 6d3e84b4d013b0dc306927324de34b0d41133c84 (patch) | |
tree | 2b9d4e49a9a2272271c1714ac537a1fdcc3acb46 | |
parent | a7c6239ec87cf151c6b24e33f04b665cdbb47c96 (diff) | |
parent | fb7dae6fa8a942cb9e6ec8967e5baf4d25307872 (diff) | |
download | rabbitmq-server-6d3e84b4d013b0dc306927324de34b0d41133c84.tar.gz |
merging default/21673 into bug 22161
-rw-r--r-- | src/gen_server2.erl | 13 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 479 | ||||
-rw-r--r-- | src/random_distributions.erl | 38 |
4 files changed, 334 insertions, 202 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 53edf8de..c4806151 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -25,8 +25,11 @@ %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be %% called immediately prior to and post hibernation, respectively. If %% handle_pre_hibernate returns {hibernate, NewState} then the process -%% will hibernate. If the module does not implement -%% handle_pre_hibernate/1 then the default action is to hibernate. +%% will hibernate. If handle_pre_hibernate returns {insomniate, +%% NewState} then the process will go around again, trying to receive +%% for up to the current timeout value before attempting to hibernate +%% again. If the module does not implement handle_pre_hibernate/1 then +%% the default action is to hibernate. %% %% 6) init can return a 4th arg, {backoff, InitialTimeout, %% MinimumTimeout, DesiredHibernatePeriod} (all in @@ -36,7 +39,7 @@ %% InitialTimeout supplied from init). After this timeout has %% occurred, hibernation will occur as normal. Upon awaking, a new %% current timeout value will be calculated. -%% +%% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the %% timeout value repeatedly if it is unable to sleep for the @@ -126,6 +129,7 @@ %%% handle_pre_hibernate(State) %%% %%% ==> {hibernate, State} +%%% {insomniate, State} %%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% @@ -545,6 +549,9 @@ pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> {hibernate, NState} -> hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, Debug); + {insomniate, NState} -> + process_next_msg(Parent, Name, NState, Mod, hibernate, + TimeoutState, Queue, Debug); Reply -> handle_common_termination(Reply, Name, pre_hibernate, Mod, State, Debug) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 21764fce..23666a5f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -485,7 +485,7 @@ unfold(Fun, Acc, Init) -> ceil(N) -> T = trunc(N), - case N - T of - 0 -> N; - _ -> 1 + T + case N == T of + true -> T; + false -> 1 + T end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f139fc45..c8d27ba6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -39,7 +39,7 @@ -export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, handle_pre_hibernate/1]). -define(SERVER, ?MODULE). @@ -83,26 +83,38 @@ sync_timer_ref, %% TRef for our interval timer message_cache, %% ets message cache sum_valid_data, %% sum of valid data in all files - sum_file_size %% sum of file sizes - }). + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_pid %% pid of the GC process + }). -record(msg_location, {msg_id, ref_count, file, offset, total_size}). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size}). + {file, valid_total_size, contiguous_top, left, right, file_size, + locked}). + +-record(gcstate, + {dir + }). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION +-define(GARBAGE_FRACTION, 0.5). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). -define(WRITE_MODE, [write]). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + %% The components: %% %% MsgLocation: this is an ets table which contains: @@ -248,11 +260,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), MsgLocations = ets:new(?MSG_LOC_NAME, - [set, private, {keypos, #msg_location.msg_id}]), + [set, protected, {keypos, #msg_location.msg_id}]), InitFile = 0, FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private, {keypos, #file_summary.file}]), + [ordered_set, protected, + {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), State = #msstate { dir = Dir, @@ -266,7 +279,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> sync_timer_ref = undefined, message_cache = MessageCache, sum_valid_data = 0, - sum_file_size = 0 + sum_file_size = 0, + pending_gc_completion = [], + gc_pid = undefined }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -287,54 +302,15 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - {ok, State1 #msstate { current_file_handle = FileHdl }}. - -handle_call({read, MsgId}, _From, State = - #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> - {Result, State1} = - case index_lookup(MsgId, State) of - not_found -> {not_found, State}; - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - ok = case CurFile =:= File andalso {ok, Offset} >= - file_handle_cache:current_raw_offset(CurHdl) of - true -> file_handle_cache:flush(CurHdl); - false -> ok - end, - {Hdl, State2} = get_read_handle(File, State), - {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, {misread, [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()} - ]}}) - end, - ok = case RefCount > 1 of - true -> - insert_into_cache(MsgId, Msg, State2); - false -> - %% it's not in the cache and we - %% only have one reference to the - %% message. So don't bother - %% putting it in the cache. - ok - end, - {{ok, Msg}, State2}; - {Msg, _RefCount} -> - {{ok, Msg}, State} - end - end, - reply(Result, State1); + {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({read, MsgId}, From, State) -> + case read_message(MsgId, State) of + {ok, Msg, State1} -> reply({ok, Msg}, State1); + {blocked, State1} -> noreply(add_to_pending_gc_completion( + {read, MsgId, From}, State1)) + end; handle_call({contains, MsgId}, _From, State) -> reply(case index_lookup(MsgId, State) of @@ -360,6 +336,7 @@ handle_cast({write, MsgId, Msg}, [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined, + locked = false, file_size = FileSize }] = ets:lookup(FileSummary, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, @@ -373,10 +350,11 @@ handle_cast({write, MsgId, Msg}, contiguous_top = ContiguousTop1, file_size = FileSize + TotalSize }), NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, - State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + noreply(maybe_compact(maybe_roll_to_new_file( + NextOffset, State #msstate + { sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize } + ))); StoreEntry = #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter ok = index_update(StoreEntry #msg_location { @@ -384,20 +362,11 @@ handle_cast({write, MsgId, Msg}, noreply(State) end; -handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> - {Files, State1} = - lists:foldl( - fun (MsgId, {Files1, State2}) -> - case remove_message(MsgId, State2) of - {compact, File, State3} -> - {if CurFile =:= File -> Files1; - true -> sets:add_element(File, Files1) - end, State3}; - {no_compact, State3} -> - {Files1, State3} - end - end, {sets:new(), State}, MsgIds), - noreply(compact(sets:to_list(Files), State1)); +handle_cast({remove, MsgIds}, State) -> + State1 = lists:foldl( + fun (MsgId, State2) -> remove_message(MsgId, State2) end, + State, MsgIds), + noreply(maybe_compact(State1)); handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), @@ -421,6 +390,14 @@ handle_cast({sync, MsgIds, K}, handle_cast(sync, State) -> noreply(sync(State)). +%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations}, +%% State = #msstate { file_summary = FileSummary, +%% gc_pid = GCPid }) -> +%% true = ets:delete(FileSummary, DeletedFile), +%% true = ets:insert(FileSummary, RemainingFile), +%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations), +%% noreply(maybe_compact(run_pending(State1))). + handle_info(timeout, State) -> noreply(sync(State)); @@ -450,6 +427,13 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations, code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate(State) -> + {Result, State1} = maybe_compact1(State), + {case Result of + true -> insomniate; + false -> hibernate + end, State1}. + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -463,11 +447,11 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, infinity}; + {State, hibernate}; next_state(State = #msstate { sync_timer_ref = undefined }) -> {start_sync_timer(State), 0}; next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), infinity}; + {stop_sync_timer(State), hibernate}; next_state(State) -> {State, 0}. @@ -513,6 +497,63 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +read_message(MsgId, State = + #msstate { current_file = CurFile, + current_file_handle = CurHdl, + file_summary = FileSummary }) -> + case index_lookup(MsgId, State) of + not_found -> {ok, not_found, State}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + [#file_summary { locked = Locked }] = + ets:lookup(FileSummary, File), + case Locked of + true -> + {blocked, State}; + false -> + ok = case CurFile =:= File andalso {ok, Offset} >= + file_handle_cache:current_raw_offset( + CurHdl) of + true -> file_handle_cache:flush(CurHdl); + false -> ok + end, + {Hdl, State1} = get_read_handle(File, State), + {ok, Offset} = + file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, {misread, + [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg, State1); + false -> + %% it's not in the cache and + %% we only have one reference + %% to the message. So don't + %% bother putting it in the + %% cache. + ok + end, + {ok, Msg, State1} + end; + {Msg, _RefCount} -> + {ok, Msg, State} + end + end. + remove_message(MsgId, State = #msstate { file_summary = FileSummary, sum_valid_data = SumValid }) -> StoreEntry = #msg_location { ref_count = RefCount, file = File, @@ -520,25 +561,50 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary, index_lookup(MsgId, State), case RefCount of 1 -> - ok = index_delete(MsgId, State), ok = remove_cache_entry(MsgId, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop }] = + contiguous_top = ContiguousTop, + locked = Locked }] = ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - {compact, File, State #msstate { - sum_valid_data = SumValid - TotalSize }}; + case Locked of + true -> + add_to_pending_gc_completion({remove, MsgId}, State); + false -> + ok = index_delete(MsgId, State), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:insert( + FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + State1 = delete_file_if_empty(File, State), + State1 #msstate { sum_valid_data = SumValid - TotalSize } + end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount - 1 }, State), - {no_compact, State} + ok = index_update(StoreEntry #msg_location + { ref_count = RefCount - 1 }, State), + State end. +add_to_pending_gc_completion( + Op, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = [Op, Pending] }. + +run_pending(State = #msstate { pending_gc_completion = Pending }) -> + State1 = State #msstate { pending_gc_completion = [] }, + lists:foldl(fun run_pending/2, State1, Pending). + +run_pending({read, MsgId, From}, State) -> + case read_message(MsgId, State) of + {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}), + State1; + {blocked, State1} -> add_to_pending_gc_completion( + {read, MsgId, From}, State1) + end; +run_pending({remove, MsgId}, State) -> + remove_message(MsgId, State). + close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> case dict:find(Key, FHC) of {ok, Hdl} -> @@ -794,20 +860,19 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. build_index([], State) -> - build_index(undefined, [State #msstate.current_file], [], State); + build_index(undefined, [State #msstate.current_file], State); build_index(Files, State) -> - build_index(undefined, Files, [], State). + {Offset, State1} = build_index(undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. -build_index(Left, [], FilesToCompact, State = - #msstate { file_summary = FileSummary }) -> +build_index(Left, [], State = #msstate { file_summary = FileSummary }) -> ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummary, Left) of [] -> 0; [#file_summary { file_size = FileSize }] -> FileSize end, - {Offset, compact(FilesToCompact, %% this never includes the current file - State #msstate { current_file = Left })}; -build_index(Left, [File|Files], FilesToCompact, + {Offset, State #msstate { current_file = Left }}; +build_index(Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> @@ -845,14 +910,9 @@ build_index(Left, [File|Files], FilesToCompact, true = ets:insert_new(FileSummary, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, + contiguous_top = ContiguousTop, locked = false, left = Left, right = Right, file_size = FileSize1 }), - FilesToCompact1 = - case FileSize1 == ContiguousTop orelse Right =:= undefined of - true -> FilesToCompact; - false -> [File | FilesToCompact] - end, - build_index(File, Files, FilesToCompact1, + build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -876,13 +936,31 @@ maybe_roll_to_new_file(Offset, true = ets:insert_new( FileSummary, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined, file_size = 0 }), - State2 = State1 #msstate { current_file_handle = NextHdl, - current_file = NextFile }, - compact([CurFile], State2); + left = CurFile, right = undefined, file_size = 0, + locked = false }), + State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile }; maybe_roll_to_new_file(_, State) -> State. +maybe_compact(State) -> + {_Bool, State1} = maybe_compact1(State), + State1. + +maybe_compact1(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = undefined, + file_summary = FileSummary }) + when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> + %% Pid = spawn_link(fun() -> + %% io:format("GC process!~n") + %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}), + %% end), + %% State #msstate { gc_pid = Pid }; + {true, State}; +maybe_compact1(State) -> + {false, State}. + compact(Files, State) -> %% smallest number, hence eldest, hence left-most, first SortedFiles = lists:sort(Files), @@ -935,30 +1013,25 @@ combine_file(File, State = #msstate { file_summary = FileSummary, adjust_meta_and_combine( LeftObj = #file_summary { file = LeftFile, valid_total_size = LeftValidData, right = RightFile, - file_size = LeftFileSize }, + file_size = LeftFileSize, locked = true }, RightObj = #file_summary { file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight, file_size = RightFileSize }, - State = #msstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary, - sum_file_size = SumFileSize }) -> + right = RightRight, file_size = RightFileSize, locked = true }, + State) -> TotalValidData = LeftValidData + RightValidData, - if FileSizeLimit >= TotalValidData -> - State1 = combine_files(RightObj, LeftObj, State), - %% this could fail if RightRight is undefined - ets:update_element(FileSummary, RightRight, - {#file_summary.left, LeftFile}), - true = ets:insert(FileSummary, LeftObj #file_summary { - valid_total_size = TotalValidData, - contiguous_top = TotalValidData, - file_size = TotalValidData, - right = RightRight }), - true = ets:delete(FileSummary, RightFile), - {true, State1 #msstate { sum_file_size = - SumFileSize - LeftFileSize - RightFileSize - + TotalValidData }}; - true -> {false, State} - end. + {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State), + %% %% this could fail if RightRight is undefined + %% ets:update_element(FileSummary, RightRight, + %% {#file_summary.left, LeftFile}), + %% true = ets:delete(FileSummary, RightFile), + LeftObj1 = LeftObj #file_summary { + valid_total_size = TotalValidData, + contiguous_top = TotalValidData, + file_size = TotalValidData, + right = RightRight }, + {RightFile, LeftObj1, NewMsgLocs, + TotalValidData - LeftFileSize - RightFileSize, + State1}. combine_files(#file_summary { file = Source, valid_total_size = SourceValid, @@ -967,7 +1040,7 @@ combine_files(#file_summary { file = Source, valid_total_size = DestinationValid, contiguous_top = DestinationContiguousTop, right = Source }, - State = #msstate { dir = Dir }) -> + State = #gcstate { dir = Dir }) -> State1 = close_handle(Source, close_handle(Destination, State)), SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -981,54 +1054,62 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); - true -> - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect - %% that the list should be naturally sorted - %% as we require, however, we need to - %% enforce it anyway - end, find_unremoved_messages_in_file(Destination, State1)), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State1), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocation has been updated to - %% reflect compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:close(TmpHdl), - ok = file:delete(form_filename(Dir, Tmp)) - end, + NewDestLocs = + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize), + []; + true -> + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if + %% it was then DestinationContiguousTop + %% would have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I + %% suspect that the list should be + %% naturally sorted as we require, + %% however, we need to enforce it anyway + end, + find_unremoved_messages_in_file(Destination, State1)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = + open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + {ok, NewDestLocs1} = + copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and NewDestLocs1 contains + %% msg_locations reflecting the compaction of + %% Destination so truncate Destination and copy from + %% Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:close(TmpHdl), + ok = file:delete(form_filename(Dir, Tmp)), + NewDestLocs1 + end, SourceWorkList = find_unremoved_messages_in_file(Source, State1), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State1), + {ok, NewSourceLocs} = + copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination), %% tidy up ok = file_handle_cache:close(SourceHdl), ok = file_handle_cache:close(DestinationHdl), ok = file:delete(form_filename(Dir, SourceName)), - State1. + {[NewDestLocs, NewSourceLocs], State1}. -find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> +find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) -> %% Msgs here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1042,37 +1123,41 @@ find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> end, [], Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, State) -> - {FinalOffset, BlockStart1, BlockEnd1} = + Destination) -> + {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} = lists:foldl( fun (StoreEntry = #msg_location { offset = Offset, total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> + {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = index_update(StoreEntry #msg_location { - file = Destination, - offset = CurOffset }, State), + NewMsgLocs1 = + [StoreEntry #msg_location { + file = Destination, + offset = CurOffset } | NewMsgLocs], NextOffset = CurOffset + TotalSize, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the next - %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work for - %% the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + TotalSize} - end - end, {InitOffset, undefined, undefined}, WorkList), + {BlockStart2, BlockEnd2} = + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work + %% for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, + BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), + {Offset, Offset + TotalSize} + end, + {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1} + end, {InitOffset, undefined, undefined, []}, WorkList), case WorkList of [] -> ok; @@ -1085,8 +1170,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), ok = file_handle_cache:sync(DestinationHdl) end, - ok. + {ok, NewMsgLocations}. +delete_file_if_empty(File, State = #msstate { current_file = File }) -> + State; delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, sum_file_size = SumFileSize } = State) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, @@ -1111,6 +1198,6 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, true = ets:delete(FileSummary, File), State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), - {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }}; - _ -> {false, State} + State1 #msstate { sum_file_size = SumFileSize - FileSize }; + _ -> State end. diff --git a/src/random_distributions.erl b/src/random_distributions.erl new file mode 100644 index 00000000..dfcdc834 --- /dev/null +++ b/src/random_distributions.erl @@ -0,0 +1,38 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(random_distributions). + +-export([geometric/1]). + +geometric(P) when 0.0 < P andalso P < 1.0 -> + U = 1.0 - random:uniform(), + rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)). |