summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
commit62b6ce3e78a42a69a336049e5872bf4686a8543a (patch)
tree7a55098c74ec108cdb5d094ed16ccd9eb661e06c /src/rabbit_msg_store.erl
parentc6248e4437c04032e9231d265181e2e87d615ef5 (diff)
parente2c57c78fcc0281eeb78dd1914287e539265244c (diff)
downloadrabbitmq-server-62b6ce3e78a42a69a336049e5872bf4686a8543a.tar.gz
merge default into bug23504
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl615
1 files changed, 300 insertions, 315 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 75ca0b8b..6c5035a0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,14 +21,16 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, release/2, sync/3]).
+ write/3, read/2, contains/2, remove/2, sync/3]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
+-export([transform_dir/3, force_recovery/2]). %% upgrade
+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/3, prioritise_cast/2,
- prioritise_info/2]).
+ prioritise_info/2, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -37,6 +39,7 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(TRANSFORM_TMP, "transform_tmp").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -65,15 +68,14 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- cref_to_guids %% client ref to synced messages mapping
- }).
+ cref_to_msg_ids %% client ref to synced messages mapping
+ }).
-record(client_msstate,
{ server,
@@ -85,9 +87,8 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
- }).
+ }).
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
@@ -128,38 +129,39 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
--type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
- A}).
--type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
+-type(msg_ref_delta_gen(A) ::
+ fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
+-type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
- startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+ {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(),
+-spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(),
maybe_close_fds_fun()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
--spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
--spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
--spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
- 'ok').
+-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
+-spec(sync/3 ::
+ ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
-endif.
@@ -171,8 +173,8 @@
%% The components:
%%
-%% Index: this is a mapping from Guid to #msg_location{}:
-%% {Guid, RefCount, File, Offset, TotalSize}
+%% Index: this is a mapping from MsgId to #msg_location{}:
+%% {MsgId, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
@@ -273,7 +275,7 @@
%% alternating full files and files with only one tiny message in
%% them).
%%
-%% Messages are reference-counted. When a message with the same guid
+%% Messages are reference-counted. When a message with the same msg id
%% is written several times we only store it once, and only remove it
%% from the store when it has been removed the same number of times.
%%
@@ -390,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -402,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -416,44 +417,31 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(Guid, Msg,
+write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
- ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- ok = server_cast(CState, {write, CRef, Guid}).
-
-read(Guid,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, Guid) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, Guid}), CState}
- end,
- case index_lookup_positive_ref_count(Guid, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{Guid, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId}).
+
+read(MsgId,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
-contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
-remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
- server_cast(CState, {remove, CRef, Guids}).
-release([], _CState) -> ok;
-release(Guids, CState) -> server_cast(CState, {release, Guids}).
-sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
+remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, MsgIds}).
+sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -468,11 +456,11 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
-client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
+client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
@@ -494,7 +482,7 @@ client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% the safest and simplest thing to do.
Defer();
client_read2(false, _Right,
- MsgLocation = #msg_location { guid = Guid, file = File },
+ MsgLocation = #msg_location { msg_id = MsgId, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
%% It's entirely possible that everything we're doing from here on
@@ -503,12 +491,11 @@ client_read2(false, _Right,
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
fun (_) -> client_read3(MsgLocation, Defer, CState) end,
- fun () -> read(Guid, CState) end).
+ fun () -> read(MsgId, CState) end).
-client_read3(#msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -530,7 +517,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -540,8 +527,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% GC ends, we +1 readers, msg_store ets:deletes (and
%% unlocks the dest)
try Release(),
- Defer()
- catch error:badarg -> read(Guid, CState)
+ Defer()
+ catch error:badarg -> read(MsgId, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -554,7 +541,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% us doing the lookup and the +1 on the readers. (Same as
%% badarg scenario above, but we don't have a missing file
%% - we just have the /wrong/ file).
- case index_lookup(Guid, CState) of
+ case index_lookup(MsgId, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
{ok, CState1} = close_all_indicated(CState),
@@ -565,8 +552,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -580,9 +567,9 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
-clear_client(CRef, State = #msstate { cref_to_guids = CTG,
+clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
- State #msstate { cref_to_guids = dict:erase(CRef, CTG),
+ State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
@@ -630,13 +617,21 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(
+ #gc_state { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
+ msg_store = self()
+ }),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -648,17 +643,16 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
sum_valid_data = 0,
sum_file_size = 0,
pending_gc_completion = orddict:new(),
- gc_pid = undefined,
+ gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_guids = dict:new()
- },
+ cref_to_msg_ids = dict:new()
+ },
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
@@ -671,17 +665,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, Offset} = file_handle_cache:position(CurHdl, Offset),
ok = file_handle_cache:truncate(CurHdl),
- {ok, GCPid} = rabbit_msg_store_gc:start_link(
- #gc_state { dir = Dir,
- index_module = IndexModule,
- index_state = IndexState,
- file_summary_ets = FileSummaryEts,
- file_handles_ets = FileHandlesEts,
- msg_store = self()
- }),
-
- {ok, maybe_compact(
- State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
+ {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }),
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -689,7 +673,7 @@ prioritise_call(Msg, _From, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _Guid} -> 2;
+ {read, _MsgId} -> 2;
_ -> 0
end.
@@ -712,29 +696,27 @@ handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- clients = Clients,
- gc_pid = GCPid }) ->
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients,
+ gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
+handle_call({read, MsgId}, From, State) ->
+ State1 = read_message(MsgId, From, State),
noreply(State1);
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
+handle_call({contains, MsgId}, From, State) ->
+ State1 = contains_message(MsgId, From, State),
noreply(State1).
handle_cast({client_dying, CRef},
@@ -747,53 +729,47 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, Guid},
+handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
- true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
- [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
+ [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(
- case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
{write, State1} ->
- write_message(CRef, Guid, Msg, State1);
+ write_message(CRef, MsgId, Msg, State1);
{ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
State1;
{ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
State1;
{confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, Guid, State1);
+ record_pending_confirm(CRef, MsgId, State1);
{confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
end, CRef, State1)
end);
-handle_cast({remove, CRef, Guids}, State) ->
+handle_cast({remove, CRef, MsgIds}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
- State, Guids),
- noreply(maybe_compact(
- client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
-
-handle_cast({release, Guids}, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
- lists:foreach(
- fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
- noreply(State);
+ fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
+ removed, State1)));
-handle_cast({sync, Guids, K},
+handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
on_sync = Syncs }) ->
{ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (Guid) ->
+ case lists:any(fun (MsgId) ->
#msg_location { file = File, offset = Offset } =
- index_lookup(Guid, State),
+ index_lookup(MsgId, State),
File =:= CurFile andalso Offset >= SyncOffset
- end, Guids) of
+ end, MsgIds) of
false -> K(),
noreply(State);
true -> noreply(State #msstate { on_sync = [K | Syncs] })
@@ -837,7 +813,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -847,22 +822,24 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State1 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
- file_handle_cache:close(CurHdl),
+ ok = file_handle_cache:close(CurHdl),
State2
end,
State3 = close_all_handles(State1),
- store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ ok = store_file_summary(FileSummaryEts, Dir),
+ [true = ets:delete(T) ||
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
- {index_module, IndexModule}], Dir),
+ ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ {index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
@@ -875,16 +852,16 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -901,66 +878,69 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
- cref_to_guids = CTG }) ->
+ cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (CRef, Guids, NS) ->
- case gb_sets:is_empty(Guids) of
+ CGs = dict:fold(fun (CRef, MsgIds, NS) ->
+ case gb_sets:is_empty(MsgIds) of
true -> NS;
- false -> [{CRef, Guids} | NS]
+ false -> [{CRef, MsgIds} | NS]
end
- end, [], CTG),
- case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
- end,
+ end, [], CTM),
+ ok = case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
+ end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
- State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+ State2 = lists:foldl(
+ fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs),
+ State2 #msstate { on_sync = [] }.
-write_action({true, not_found}, _Guid, State) ->
+write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
-write_action({true, #msg_location { file = File }}, _Guid, State) ->
+write_action({true, #msg_location { file = File }}, _MsgId, State) ->
{ignore, File, State};
-write_action({false, not_found}, _Guid, State) ->
+write_action({false, not_found}, _MsgId, State) ->
{write, State};
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
- Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
+ ok = index_delete(MsgId, State),
{write, State};
{false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
+ %% The msg for MsgId is older than the client death
%% message, but as it is being GC'd currently we'll have
%% to write a new copy, which will then be younger, so
%% ignore this write.
{ignore, File, State};
{_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
+ ok = index_update_ref_count(MsgId, 1, State),
State1 = adjust_valid_total_size(File, TotalSize, State),
{confirm, File, State1}
end;
write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
- Guid, State) ->
- ok = index_update_ref_count(Guid, RefCount + 1, State),
+ MsgId, State) ->
+ ok = index_update_ref_count(MsgId, RefCount + 1, State),
%% We already know about it, just update counter. Only update
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, Guid, Msg, State) ->
- write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
+write_message(CRef, MsgId, Msg, State) ->
+ write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
-write_message(Guid, Msg,
+write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary_ets = FileSummaryEts }) ->
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(
- #msg_location { guid = Guid, ref_count = 1, file = CurFile,
+ #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
[#file_summary { right = undefined, locked = false }] =
ets:lookup(FileSummaryEts, CurFile),
@@ -972,31 +952,23 @@ write_message(Guid, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(Guid, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
- case index_lookup_positive_ref_count(Guid, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+read_message(MsgId, From, State) ->
+ case index_lookup_positive_ref_count(MsgId, State) of
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
%% can return [] if msg in file existed on startup
- case ets:lookup(CurFileCacheEts, Guid) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
{ok, RawOffSet} =
file_handle_cache:current_raw_offset(CurHdl),
@@ -1004,10 +976,8 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
- [{Guid, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, Guid, Msg1),
+ read_from_disk(MsgLoc, State);
+ [{MsgId, Msg1, _CacheRefCount}] ->
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1015,56 +985,51 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
false -> [#file_summary { locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
- true -> add_to_pending_gc_completion({read, Guid, From},
+ true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {Guid, Msg}} =
+ {ok, {MsgId, Msg}} =
case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {Guid, _}} = Obj ->
+ {ok, {MsgId, _}} = Obj ->
Obj;
Rest ->
{error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {guid, Guid},
+ {msg_id, MsgId},
{read, Rest},
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{Msg, State1}.
-contains_message(Guid, From,
+contains_message(MsgId, From,
State = #msstate { pending_gc_completion = Pending }) ->
- case index_lookup_positive_ref_count(Guid, State) of
+ case index_lookup_positive_ref_count(MsgId, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
case orddict:is_key(File, Pending) of
true -> add_to_pending_gc_completion(
- {contains, Guid, From}, File, State);
+ {contains, MsgId, From}, File, State);
false -> gen_server2:reply(From, true),
State
end
end.
-remove_message(Guid, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- case should_mask_action(CRef, Guid, State) of
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
{false_if_increment, #msg_location { ref_count = 0 }} ->
@@ -1077,25 +1042,24 @@ remove_message(Guid, CRef,
total_size = TotalSize }} when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
- Dec =
- fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ Dec = fun () ->
+ index_update_ref_count(MsgId, RefCount - 1, State)
+ end,
case RefCount of
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
- {remove, Guid, CRef}, File, State);
+ {remove, MsgId, CRef}, File, State);
[#file_summary {}] ->
ok = Dec(),
delete_file_if_empty(
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1115,12 +1079,12 @@ run_pending(Files, State) ->
lists:reverse(orddict:fetch(File, Pending)))
end, State, Files).
-run_pending_action({read, Guid, From}, State) ->
- read_message(Guid, From, State);
-run_pending_action({contains, Guid, From}, State) ->
- contains_message(Guid, From, State);
-run_pending_action({remove, Guid, CRef}, State) ->
- remove_message(Guid, CRef, State).
+run_pending_action({read, MsgId, From}, State) ->
+ read_message(MsgId, From, State);
+run_pending_action({contains, MsgId, From}, State) ->
+ contains_message(MsgId, From, State);
+run_pending_action({remove, MsgId, CRef}, State) ->
+ remove_message(MsgId, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1142,44 +1106,46 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients,
- cref_to_guids = CTG }) ->
+update_pending_confirms(Fun, CRef,
+ State = #msstate { clients = Clients,
+ cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
{undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG),
- State #msstate { cref_to_guids = CTG1 }
+ {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
-record_pending_confirm(CRef, Guid, State) ->
+record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
- fun (_MsgOnDiskFun, CTG) ->
- dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
- gb_sets:singleton(Guid), CTG)
+ fun (_MsgOnDiskFun, CTM) ->
+ dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
+ gb_sets:singleton(MsgId), CTM)
end, CRef, State).
-client_confirm(CRef, Guids, ActionTaken, State) ->
+client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(Guids, ActionTaken),
- case dict:find(CRef, CTG) of
- {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(MsgIds, ActionTaken),
+ case dict:find(CRef, CTM) of
+ {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ case gb_sets:is_empty(MsgIds1) of
+ true -> dict:erase(CRef, CTM);
+ false -> dict:store(CRef, MsgIds1, CTM)
end;
- error -> CTG
+ error -> CTM
end
end, CRef, State).
-%% Detect whether the Guid is older or younger than the client's death
+%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
%% rewrite the msg - rewriting it would make it younger than the death
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
-should_mask_action(CRef, Guid,
+should_mask_action(CRef, MsgId,
State = #msstate { dying_clients = DyingClients }) ->
- case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
{true, not_found} ->
@@ -1252,7 +1218,7 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
{ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -1316,48 +1282,14 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, Guid, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
- ok.
-
-update_msg_cache(CacheEts, Guid, Msg) ->
- case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
+update_msg_cache(CacheEts, MsgId, Msg) ->
+ case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
false -> safe_ets_update_counter_ok(
- CacheEts, Guid, {3, +1},
- fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
+ CacheEts, MsgId, {3, +1},
+ fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, Guid) ->
- true = ets:delete(DedupCacheEts, Guid),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, Guid) ->
- case ets:lookup(DedupCacheEts, Guid) of
- [] ->
- not_found;
- [{_Guid, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, Guid, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, Guid) ->
- true = safe_ets_update_counter(
- DedupCacheEts, Guid, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
- (_N) -> true
- end,
- %% Guid is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
@@ -1460,8 +1392,8 @@ recover_file_summary(false, _Dir) ->
recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
- {true, Tid};
+ {ok, Tid} -> ok = file:delete(Path),
+ {true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1469,19 +1401,19 @@ count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished ->
ok;
- {_Guid, 0, Next} ->
+ {_MsgId, 0, Next} ->
count_msg_refs(Gen, Next, State);
- {Guid, Delta, Next} ->
- ok = case index_lookup(Guid, State) of
+ {MsgId, Delta, Next} ->
+ ok = case index_lookup(MsgId, State) of
not_found ->
- index_insert(#msg_location { guid = Guid,
+ index_insert(#msg_location { msg_id = MsgId,
file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
NewRefCount = RefCount + Delta,
case NewRefCount of
- 0 -> index_delete(Guid, State);
+ 0 -> index_delete(MsgId, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
@@ -1525,15 +1457,17 @@ scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName))),
- %% if something really bad has happened,
- %% the close could fail, but ignore
- file_handle_cache:close(Hdl),
+ form_filename(Dir, FileName)),
+ fun scan_fun/2, []),
+ ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
+scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
+ [{MsgId, TotalSize, Offset} | Acc].
+
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1581,8 +1515,8 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
@@ -1611,8 +1545,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
- fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case index_lookup(Guid, State) of
+ fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case index_lookup(MsgId, State) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
@@ -1630,7 +1564,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
%% file size.
[] -> {undefined, case ValidMessages of
[] -> 0;
- _ -> {_Guid, TotalSize, Offset} =
+ _ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end};
@@ -1685,8 +1619,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
pending_gc_completion = Pending,
file_summary_ets = FileSummaryEts,
file_size_limit = FileSizeLimit })
- when (SumFileSize > 2 * FileSizeLimit andalso
- (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
+ when SumFileSize > 2 * FileSizeLimit andalso
+ (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
case ets:first(FileSummaryEts) of
@@ -1749,10 +1683,10 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- 0 -> %% don't delete the file_summary_ets entry for File here
- %% because we could have readers which need to be able to
- %% decrement the readers count.
- true = ets:update_element(FileSummaryEts, File,
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ 0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
Pending1 = orddict_store(File, [], Pending),
@@ -1805,17 +1739,17 @@ combine_files(Source, Destination,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
- readers = 0,
- left = Destination,
- valid_total_size = SourceValid,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
[#file_summary {
- readers = 0,
- right = Source,
- valid_total_size = DestinationValid,
- file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -1895,8 +1829,8 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir,
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
- case Index:lookup(Guid, IndexState) of
+ fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
+ case Index:lookup(MsgId, IndexState) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = Index:delete_object(Entry, IndexState),
@@ -1921,13 +1855,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
end,
case
lists:foldl(
- fun (#msg_location { guid = Guid, offset = Offset,
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
total_size = TotalSize },
{CurOffset, Block = {BlockStart, BlockEnd}}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(Guid,
+ ok = Index:update_fields(MsgId,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
IndexState),
@@ -1958,3 +1892,54 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{got, FinalOffsetZ},
{destination, Destination}]}
end.
+
+force_recovery(BaseDir, Store) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> ok;
+ {error, enoent} -> ok
+ end,
+ recover_crashed_compactions(BaseDir),
+ ok.
+
+foreach_file(D, Fun, Files) ->
+ [ok = Fun(filename:join(D, File)) || File <- Files].
+
+foreach_file(D1, D2, Fun, Files) ->
+ [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+
+transform_dir(BaseDir, Store, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
+ TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
+ case filelib:is_dir(TmpDir) of
+ true -> throw({error, transform_failed_previously});
+ false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ foreach_file(Dir, TmpDir, TransformFile, FileList),
+ foreach_file(Dir, fun file:delete/1, FileList),
+ foreach_file(TmpDir, Dir, CopyFile, FileList),
+ foreach_file(TmpDir, fun file:delete/1, FileList),
+ ok = file:del_dir(TmpDir)
+ end.
+
+transform_msg_file(FileOld, FileNew, TransformFun) ->
+ ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
+ [{write_buffer,
+ ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, _Acc, _IgnoreSize} =
+ rabbit_msg_file:scan(
+ RefOld, filelib:file_size(FileOld),
+ fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = case binary_to_term(BinMsg) of
+ <<>> -> {ok, <<>>}; %% dying client marker
+ Msg -> TransformFun(Msg)
+ end,
+ {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
+ ok
+ end, ok),
+ ok = file_handle_cache:close(RefOld),
+ ok = file_handle_cache:close(RefNew),
+ ok.