path: root/src/rabbit_msg_store_gc.erl
diff options
authorMatthew Sackman <>2009-12-20 03:43:21 +0000
committerMatthew Sackman <>2009-12-20 03:43:21 +0000
commit5307e4c6a7f21458fda7fb97020dd838005046e2 (patch)
tree7989c1dcdd689a11903ea8475cf74776039c486d /src/rabbit_msg_store_gc.erl
parent6d3e84b4d013b0dc306927324de34b0d41133c84 (diff)
background lazy GC in and working
Diffstat (limited to 'src/rabbit_msg_store_gc.erl')
1 files changed, 249 insertions, 0 deletions
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
new file mode 100644
index 00000000..729cd287
--- /dev/null
+++ b/src/rabbit_msg_store_gc.erl
@@ -0,0 +1,249 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% All Rights Reserved.
+%% Contributor(s): ______________________________________.
+-export([start_link/4, gc/2, stop/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+ {dir,
+ index_state,
+ file_summary,
+ index_module
+ }).
+-define(SERVER, ?MODULE).
+start_link(Dir, IndexState, FileSummary, IndexModule) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [Dir, IndexState, FileSummary, IndexModule],
+ [{timeout, infinity}]).
+gc(Source, Destination) ->
+ gen_server2:cast(?SERVER, {gc, Source, Destination}).
+stop() ->
+ gen_server2:call(?SERVER, stop).
+init([Dir, IndexState, FileSummary, IndexModule]) ->
+ {ok, #gcstate { dir = Dir, index_state = IndexState,
+ file_summary = FileSummary, index_module = IndexModule },
+ hibernate,
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+handle_cast({gc, Source, Destination}, State) ->
+ Reclaimed = adjust_meta_and_combine(Source, Destination, State),
+ ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
+ {noreply, State, hibernate}.
+handle_info(Info, State) ->
+ {stop, {unhandled_info, Info}, State}.
+terminate(_Reason, State) ->
+ State.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+adjust_meta_and_combine(SourceFile, DestFile,
+ State = #gcstate { file_summary = FileSummary }) ->
+ [SourceObj = #file_summary {
+ valid_total_size = SourceValidData, left = DestFile,
+ file_size = SourceFileSize, locked = true }] =
+ ets:lookup(FileSummary, SourceFile),
+ [DestObj = #file_summary {
+ valid_total_size = DestValidData, right = SourceFile,
+ file_size = DestFileSize, locked = true }] =
+ ets:lookup(FileSummary, DestFile),
+ TotalValidData = DestValidData + SourceValidData,
+ ok = combine_files(SourceObj, DestObj, State),
+ %% don't update dest.right, because it could be changing at the same time
+ true =
+ ets:update_element(FileSummary, DestFile,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.contiguous_top, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+ SourceFileSize + DestFileSize - TotalValidData.
+combine_files(#file_summary { file = Source,
+ valid_total_size = SourceValid,
+ left = Destination },
+ #file_summary { file = Destination,
+ valid_total_size = DestinationValid,
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
+ State = #gcstate { dir = Dir }) ->
+ SourceName = rabbit_msg_store_misc:filenum_to_name(Source),
+ DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination),
+ {ok, SourceHdl} =
+ rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE),
+ {ok, DestinationHdl} =
+ rabbit_msg_store_misc:open_file(Dir, DestinationName,
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationValid, ExpectedSize);
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp))
+ end,
+ SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file_handle_cache:close(SourceHdl),
+ ok = file_handle_cache:close(DestinationHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)),
+ ok.
+find_unremoved_messages_in_file(File, #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ rabbit_msg_store_misc:scan_file_for_valid_messages(
+ Dir, rabbit_msg_store_misc:filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case Index:lookup(MsgId, IndexState) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, #gcstate { index_module = Index,
+ index_state = IndexState }) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
+ total_size = TotalSize },
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ %% update MsgLocation to reflect change of file and offset
+ ok = Index:update_fields(MsgId,
+ [{#msg_location.file, Destination},
+ {#msg_location.offset, CurOffset}],
+ IndexState),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {CurOffset + TotalSize, BlockStart2, BlockEnd2}
+ end, {InitOffset, undefined, undefined}, WorkList),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
+ ok.