summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-12-09 14:19:52 +0000
committerTim Watson <tim@rabbitmq.com>2013-12-09 14:19:52 +0000
commit87c78c3029b4b0e5deadfbd2c6e1f1849ff3a832 (patch)
treec3cbfc38aaecae0e0ac73865e926f758d31574a3 /src/rabbit_queue_index.erl
parent79c905c57216c2128a74f9f976a35686a586d7c2 (diff)
downloadrabbitmq-server-87c78c3029b4b0e5deadfbd2c6e1f1849ff3a832.tar.gz
Track queue index recovery data using dets
We now hold a single dets table with all the queue recovery data. This is synchronised after we've started up (at which point all recovery data should be deleted) and just before shutting down, after the queue indexes have writen their recovery data to it.
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl42
1 files changed, 16 insertions, 26 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f69d8355..3bd54d88 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -244,24 +244,26 @@ init(Name, OnSyncFun) ->
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case read_shutdown_terms(Dir) of
- {error, _} -> [];
- {ok, Terms1} -> Terms1
+ case rabbit_recovery_indexes:read_recovery_terms(Dir) of
+ {error, _} -> [];
+ {ok, {_, Terms1}} -> Terms1
end.
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown = detect_clean_shutdown(Dir),
+ CleanShutdown =
+ rabbit_recovery_indexes:had_clean_shutdown(Dir),
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State) ->
- {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+terminate(Terms, State = #qistate { dir = Dir }) ->
+ {SegmentCounts, State1} = terminate(State),
+ rabbit_recovery_indexes:store_recovery_terms(
+ Dir, [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -358,8 +360,9 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
+ ok = rabbit_recovery_indexes:recover(),
+ DurableDict = dict:from_list([{queue_name_to_dir_name(Queue), Queue} ||
+ Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
@@ -370,7 +373,8 @@ recover(DurableQueues) ->
case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case read_shutdown_terms(QueueDirPath) of
+ case rabbit_recovery_indexes:read_recovery_terms(
+ QueueDirPath) of
{error, _} -> TermsAcc;
{ok, Terms} -> [Terms | TermsAcc]
end,
@@ -378,6 +382,8 @@ recover(DurableQueues) ->
TermsAcc1};
false ->
ok = rabbit_file:recursive_delete([QueueDirPath]),
+ rabbit_recovery_indexes:remove_recovery_terms(
+ QueueDirPath),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
@@ -410,22 +416,6 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unconfirmed = gb_sets:new() }.
-clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
-
-detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_filename(Dir)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_filename(Dir)).
-
-store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_filename(Dir),
- ok = rabbit_file:ensure_dir(CleanFileName),
- rabbit_file:write_term_file(CleanFileName, Terms).
-
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.