summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-19 16:37:52 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-19 16:37:52 +0000
commit6d3e84b4d013b0dc306927324de34b0d41133c84 (patch)
tree2b9d4e49a9a2272271c1714ac537a1fdcc3acb46
parenta7c6239ec87cf151c6b24e33f04b665cdbb47c96 (diff)
parentfb7dae6fa8a942cb9e6ec8967e5baf4d25307872 (diff)
downloadrabbitmq-server-6d3e84b4d013b0dc306927324de34b0d41133c84.tar.gz
merging default/21673 into bug 22161
-rw-r--r--src/gen_server2.erl13
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_msg_store.erl479
-rw-r--r--src/random_distributions.erl38
4 files changed, 334 insertions, 202 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 53edf8de..c4806151 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -25,8 +25,11 @@
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
%% called immediately prior to and post hibernation, respectively. If
%% handle_pre_hibernate returns {hibernate, NewState} then the process
-%% will hibernate. If the module does not implement
-%% handle_pre_hibernate/1 then the default action is to hibernate.
+%% will hibernate. If handle_pre_hibernate returns {insomniate,
+%% NewState} then the process will go around again, trying to receive
+%% for up to the current timeout value before attempting to hibernate
+%% again. If the module does not implement handle_pre_hibernate/1 then
+%% the default action is to hibernate.
%%
%% 6) init can return a 4th arg, {backoff, InitialTimeout,
%% MinimumTimeout, DesiredHibernatePeriod} (all in
@@ -36,7 +39,7 @@
%% InitialTimeout supplied from init). After this timeout has
%% occurred, hibernation will occur as normal. Upon awaking, a new
%% current timeout value will be calculated.
-%%
+%%
%% The purpose is that the gen_server2 takes care of adjusting the
%% current timeout value such that the process will increase the
%% timeout value repeatedly if it is unable to sleep for the
@@ -126,6 +129,7 @@
%%% handle_pre_hibernate(State)
%%%
%%% ==> {hibernate, State}
+%%% {insomniate, State}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
@@ -545,6 +549,9 @@ pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
{hibernate, NState} ->
hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
Debug);
+ {insomniate, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, hibernate,
+ TimeoutState, Queue, Debug);
Reply ->
handle_common_termination(Reply, Name, pre_hibernate,
Mod, State, Debug)
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce..23666a5f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -485,7 +485,7 @@ unfold(Fun, Acc, Init) ->
ceil(N) ->
T = trunc(N),
- case N - T of
- 0 -> N;
- _ -> 1 + T
+ case N == T of
+ true -> T;
+ false -> 1 + T
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f139fc45..c8d27ba6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -39,7 +39,7 @@
-export([sync/0]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, handle_pre_hibernate/1]).
-define(SERVER, ?MODULE).
@@ -83,26 +83,38 @@
sync_timer_ref, %% TRef for our interval timer
message_cache, %% ets message cache
sum_valid_data, %% sum of valid data in all files
- sum_file_size %% sum of file sizes
- }).
+ sum_file_size, %% sum of file sizes
+ pending_gc_completion, %% things to do once GC completes
+ gc_pid %% pid of the GC process
+ }).
-record(msg_location,
{msg_id, ref_count, file, offset, total_size}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size}).
+ {file, valid_total_size, contiguous_top, left, right, file_size,
+ locked}).
+
+-record(gcstate,
+ {dir
+ }).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
+-define(GARBAGE_FRACTION, 0.5).
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
-define(WRITE_MODE, [write]).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
%% The components:
%%
%% MsgLocation: this is an ets table which contains:
@@ -248,11 +260,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
MsgLocations = ets:new(?MSG_LOC_NAME,
- [set, private, {keypos, #msg_location.msg_id}]),
+ [set, protected, {keypos, #msg_location.msg_id}]),
InitFile = 0,
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [set, private, {keypos, #file_summary.file}]),
+ [ordered_set, protected,
+ {keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
State =
#msstate { dir = Dir,
@@ -266,7 +279,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
sync_timer_ref = undefined,
message_cache = MessageCache,
sum_valid_data = 0,
- sum_file_size = 0
+ sum_file_size = 0,
+ pending_gc_completion = [],
+ gc_pid = undefined
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -287,54 +302,15 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
- {ok, State1 #msstate { current_file_handle = FileHdl }}.
-
-handle_call({read, MsgId}, _From, State =
- #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
- {Result, State1} =
- case index_lookup(MsgId, State) of
- not_found -> {not_found, State};
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- ok = case CurFile =:= File andalso {ok, Offset} >=
- file_handle_cache:current_raw_offset(CurHdl) of
- true -> file_handle_cache:flush(CurHdl);
- false -> ok
- end,
- {Hdl, State2} = get_read_handle(File, State),
- {ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error, {misread, [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}
- ]}})
- end,
- ok = case RefCount > 1 of
- true ->
- insert_into_cache(MsgId, Msg, State2);
- false ->
- %% it's not in the cache and we
- %% only have one reference to the
- %% message. So don't bother
- %% putting it in the cache.
- ok
- end,
- {{ok, Msg}, State2};
- {Msg, _RefCount} ->
- {{ok, Msg}, State}
- end
- end,
- reply(Result, State1);
+ {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({read, MsgId}, From, State) ->
+ case read_message(MsgId, State) of
+ {ok, Msg, State1} -> reply({ok, Msg}, State1);
+ {blocked, State1} -> noreply(add_to_pending_gc_completion(
+ {read, MsgId, From}, State1))
+ end;
handle_call({contains, MsgId}, _From, State) ->
reply(case index_lookup(MsgId, State) of
@@ -360,6 +336,7 @@ handle_cast({write, MsgId, Msg},
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined,
+ locked = false,
file_size = FileSize }] =
ets:lookup(FileSummary, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
@@ -373,10 +350,11 @@ handle_cast({write, MsgId, Msg},
contiguous_top = ContiguousTop1,
file_size = FileSize + TotalSize }),
NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset,
- State #msstate { sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
+ noreply(maybe_compact(maybe_roll_to_new_file(
+ NextOffset, State #msstate
+ { sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }
+ )));
StoreEntry = #msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter
ok = index_update(StoreEntry #msg_location {
@@ -384,20 +362,11 @@ handle_cast({write, MsgId, Msg},
noreply(State)
end;
-handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
- {Files, State1} =
- lists:foldl(
- fun (MsgId, {Files1, State2}) ->
- case remove_message(MsgId, State2) of
- {compact, File, State3} ->
- {if CurFile =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end, State3};
- {no_compact, State3} ->
- {Files1, State3}
- end
- end, {sets:new(), State}, MsgIds),
- noreply(compact(sets:to_list(Files), State1));
+handle_cast({remove, MsgIds}, State) ->
+ State1 = lists:foldl(
+ fun (MsgId, State2) -> remove_message(MsgId, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(State1));
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
@@ -421,6 +390,14 @@ handle_cast({sync, MsgIds, K},
handle_cast(sync, State) ->
noreply(sync(State)).
+%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations},
+%% State = #msstate { file_summary = FileSummary,
+%% gc_pid = GCPid }) ->
+%% true = ets:delete(FileSummary, DeletedFile),
+%% true = ets:insert(FileSummary, RemainingFile),
+%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations),
+%% noreply(maybe_compact(run_pending(State1))).
+
handle_info(timeout, State) ->
noreply(sync(State));
@@ -450,6 +427,13 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_pre_hibernate(State) ->
+ {Result, State1} = maybe_compact1(State),
+ {case Result of
+ true -> insomniate;
+ false -> hibernate
+ end, State1}.
+
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
@@ -463,11 +447,11 @@ reply(Reply, State) ->
{reply, Reply, State1, Timeout}.
next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, infinity};
+ {State, hibernate};
next_state(State = #msstate { sync_timer_ref = undefined }) ->
{start_sync_timer(State), 0};
next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), infinity};
+ {stop_sync_timer(State), hibernate};
next_state(State) ->
{State, 0}.
@@ -513,6 +497,63 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
+read_message(MsgId, State =
+ #msstate { current_file = CurFile,
+ current_file_handle = CurHdl,
+ file_summary = FileSummary }) ->
+ case index_lookup(MsgId, State) of
+ not_found -> {ok, not_found, State};
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ [#file_summary { locked = Locked }] =
+ ets:lookup(FileSummary, File),
+ case Locked of
+ true ->
+ {blocked, State};
+ false ->
+ ok = case CurFile =:= File andalso {ok, Offset} >=
+ file_handle_cache:current_raw_offset(
+ CurHdl) of
+ true -> file_handle_cache:flush(CurHdl);
+ false -> ok
+ end,
+ {Hdl, State1} = get_read_handle(File, State),
+ {ok, Offset} =
+ file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error, {misread,
+ [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ok = case RefCount > 1 of
+ true ->
+ insert_into_cache(MsgId, Msg, State1);
+ false ->
+ %% it's not in the cache and
+ %% we only have one reference
+ %% to the message. So don't
+ %% bother putting it in the
+ %% cache.
+ ok
+ end,
+ {ok, Msg, State1}
+ end;
+ {Msg, _RefCount} ->
+ {ok, Msg, State}
+ end
+ end.
+
remove_message(MsgId, State = #msstate { file_summary = FileSummary,
sum_valid_data = SumValid }) ->
StoreEntry = #msg_location { ref_count = RefCount, file = File,
@@ -520,25 +561,50 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary,
index_lookup(MsgId, State),
case RefCount of
1 ->
- ok = index_delete(MsgId, State),
ok = remove_cache_entry(MsgId, State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop }] =
+ contiguous_top = ContiguousTop,
+ locked = Locked }] =
ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- {compact, File, State #msstate {
- sum_valid_data = SumValid - TotalSize }};
+ case Locked of
+ true ->
+ add_to_pending_gc_completion({remove, MsgId}, State);
+ false ->
+ ok = index_delete(MsgId, State),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ ValidTotalSize1 = ValidTotalSize - TotalSize,
+ true = ets:insert(
+ FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ State1 = delete_file_if_empty(File, State),
+ State1 #msstate { sum_valid_data = SumValid - TotalSize }
+ end;
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount - 1 }, State),
- {no_compact, State}
+ ok = index_update(StoreEntry #msg_location
+ { ref_count = RefCount - 1 }, State),
+ State
end.
+add_to_pending_gc_completion(
+ Op, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion = [Op, Pending] }.
+
+run_pending(State = #msstate { pending_gc_completion = Pending }) ->
+ State1 = State #msstate { pending_gc_completion = [] },
+ lists:foldl(fun run_pending/2, State1, Pending).
+
+run_pending({read, MsgId, From}, State) ->
+ case read_message(MsgId, State) of
+ {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}),
+ State1;
+ {blocked, State1} -> add_to_pending_gc_completion(
+ {read, MsgId, From}, State1)
+ end;
+run_pending({remove, MsgId}, State) ->
+ remove_message(MsgId, State).
+
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(Key, FHC) of
{ok, Hdl} ->
@@ -794,20 +860,19 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- build_index(undefined, [State #msstate.current_file], [], State);
+ build_index(undefined, [State #msstate.current_file], State);
build_index(Files, State) ->
- build_index(undefined, Files, [], State).
+ {Offset, State1} = build_index(undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}.
-build_index(Left, [], FilesToCompact, State =
- #msstate { file_summary = FileSummary }) ->
+build_index(Left, [], State = #msstate { file_summary = FileSummary }) ->
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummary, Left) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
end,
- {Offset, compact(FilesToCompact, %% this never includes the current file
- State #msstate { current_file = Left })};
-build_index(Left, [File|Files], FilesToCompact,
+ {Offset, State #msstate { current_file = Left }};
+build_index(Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary,
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
@@ -845,14 +910,9 @@ build_index(Left, [File|Files], FilesToCompact,
true =
ets:insert_new(FileSummary, #file_summary {
file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
+ contiguous_top = ContiguousTop, locked = false,
left = Left, right = Right, file_size = FileSize1 }),
- FilesToCompact1 =
- case FileSize1 == ContiguousTop orelse Right =:= undefined of
- true -> FilesToCompact;
- false -> [File | FilesToCompact]
- end,
- build_index(File, Files, FilesToCompact1,
+ build_index(File, Files,
State #msstate { sum_valid_data = SumValid + ValidTotalSize,
sum_file_size = SumFileSize + FileSize1 }).
@@ -876,13 +936,31 @@ maybe_roll_to_new_file(Offset,
true = ets:insert_new(
FileSummary, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
- left = CurFile, right = undefined, file_size = 0 }),
- State2 = State1 #msstate { current_file_handle = NextHdl,
- current_file = NextFile },
- compact([CurFile], State2);
+ left = CurFile, right = undefined, file_size = 0,
+ locked = false }),
+ State1 #msstate { current_file_handle = NextHdl,
+ current_file = NextFile };
maybe_roll_to_new_file(_, State) ->
State.
+maybe_compact(State) ->
+ {_Bool, State1} = maybe_compact1(State),
+ State1.
+
+maybe_compact1(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_pid = undefined,
+ file_summary = FileSummary })
+ when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
+ %% Pid = spawn_link(fun() ->
+ %% io:format("GC process!~n")
+ %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}),
+ %% end),
+ %% State #msstate { gc_pid = Pid };
+ {true, State};
+maybe_compact1(State) ->
+ {false, State}.
+
compact(Files, State) ->
%% smallest number, hence eldest, hence left-most, first
SortedFiles = lists:sort(Files),
@@ -935,30 +1013,25 @@ combine_file(File, State = #msstate { file_summary = FileSummary,
adjust_meta_and_combine(
LeftObj = #file_summary {
file = LeftFile, valid_total_size = LeftValidData, right = RightFile,
- file_size = LeftFileSize },
+ file_size = LeftFileSize, locked = true },
RightObj = #file_summary {
file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight, file_size = RightFileSize },
- State = #msstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary,
- sum_file_size = SumFileSize }) ->
+ right = RightRight, file_size = RightFileSize, locked = true },
+ State) ->
TotalValidData = LeftValidData + RightValidData,
- if FileSizeLimit >= TotalValidData ->
- State1 = combine_files(RightObj, LeftObj, State),
- %% this could fail if RightRight is undefined
- ets:update_element(FileSummary, RightRight,
- {#file_summary.left, LeftFile}),
- true = ets:insert(FileSummary, LeftObj #file_summary {
- valid_total_size = TotalValidData,
- contiguous_top = TotalValidData,
- file_size = TotalValidData,
- right = RightRight }),
- true = ets:delete(FileSummary, RightFile),
- {true, State1 #msstate { sum_file_size =
- SumFileSize - LeftFileSize - RightFileSize
- + TotalValidData }};
- true -> {false, State}
- end.
+ {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State),
+ %% %% this could fail if RightRight is undefined
+ %% ets:update_element(FileSummary, RightRight,
+ %% {#file_summary.left, LeftFile}),
+ %% true = ets:delete(FileSummary, RightFile),
+ LeftObj1 = LeftObj #file_summary {
+ valid_total_size = TotalValidData,
+ contiguous_top = TotalValidData,
+ file_size = TotalValidData,
+ right = RightRight },
+ {RightFile, LeftObj1, NewMsgLocs,
+ TotalValidData - LeftFileSize - RightFileSize,
+ State1}.
combine_files(#file_summary { file = Source,
valid_total_size = SourceValid,
@@ -967,7 +1040,7 @@ combine_files(#file_summary { file = Source,
valid_total_size = DestinationValid,
contiguous_top = DestinationContiguousTop,
right = Source },
- State = #msstate { dir = Dir }) ->
+ State = #gcstate { dir = Dir }) ->
State1 = close_handle(Source, close_handle(Destination, State)),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -981,54 +1054,62 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
- true ->
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect
- %% that the list should be naturally sorted
- %% as we require, however, we need to
- %% enforce it anyway
- end, find_unremoved_messages_in_file(Destination, State1)),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State1),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocation has been updated to
- %% reflect compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:close(TmpHdl),
- ok = file:delete(form_filename(Dir, Tmp))
- end,
+ NewDestLocs =
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize),
+ [];
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if
+ %% it was then DestinationContiguousTop
+ %% would have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I
+ %% suspect that the list should be
+ %% naturally sorted as we require,
+ %% however, we need to enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State1)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} =
+ open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ {ok, NewDestLocs1} =
+ copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage
+ %% from Destination, and NewDestLocs1 contains
+ %% msg_locations reflecting the compaction of
+ %% Destination so truncate Destination and copy from
+ %% Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(form_filename(Dir, Tmp)),
+ NewDestLocs1
+ end,
SourceWorkList = find_unremoved_messages_in_file(Source, State1),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State1),
+ {ok, NewSourceLocs} =
+ copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination),
%% tidy up
ok = file_handle_cache:close(SourceHdl),
ok = file_handle_cache:close(DestinationHdl),
ok = file:delete(form_filename(Dir, SourceName)),
- State1.
+ {[NewDestLocs, NewSourceLocs], State1}.
-find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
+find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) ->
%% Msgs here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -1042,37 +1123,41 @@ find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
end, [], Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, State) ->
- {FinalOffset, BlockStart1, BlockEnd1} =
+ Destination) ->
+ {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} =
lists:foldl(
fun (StoreEntry = #msg_location { offset = Offset,
total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
+ {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = index_update(StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset }, State),
+ NewMsgLocs1 =
+ [StoreEntry #msg_location {
+ file = Destination,
+ offset = CurOffset } | NewMsgLocs],
NextOffset = CurOffset + TotalSize,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the next
- %% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work for
- %% the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl, BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + TotalSize}
- end
- end, {InitOffset, undefined, undefined}, WorkList),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1}
+ end, {InitOffset, undefined, undefined, []}, WorkList),
case WorkList of
[] ->
ok;
@@ -1085,8 +1170,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
ok = file_handle_cache:sync(DestinationHdl)
end,
- ok.
+ {ok, NewMsgLocations}.
+delete_file_if_empty(File, State = #msstate { current_file = File }) ->
+ State;
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
sum_file_size = SumFileSize } = State) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
@@ -1111,6 +1198,6 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
true = ets:delete(FileSummary, File),
State1 = close_handle(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }};
- _ -> {false, State}
+ State1 #msstate { sum_file_size = SumFileSize - FileSize };
+ _ -> State
end.
diff --git a/src/random_distributions.erl b/src/random_distributions.erl
new file mode 100644
index 00000000..dfcdc834
--- /dev/null
+++ b/src/random_distributions.erl
@@ -0,0 +1,38 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(random_distributions).
+
+-export([geometric/1]).
+
+geometric(P) when 0.0 < P andalso P < 1.0 ->
+ U = 1.0 - random:uniform(),
+ rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)).