summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl85
1 files changed, 49 insertions, 36 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 248c1fbc..76c0a4ef 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, shutdown_terms/1, recover/4,
+-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -174,7 +174,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -195,21 +195,24 @@
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
+-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: on_sync_fun(),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
A}).
-type(shutdown_terms() :: [any()]).
--spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()).
+-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
--spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
+-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
{'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
@@ -227,8 +230,8 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 ::
- ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
+ {[[any()]], startup_fun_state()}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -239,10 +242,10 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name) ->
+init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- State.
+ State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
@@ -251,13 +254,14 @@ shutdown_terms(Name) ->
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
+ State1 = State #qistate { on_sync = OnSyncFun },
CleanShutdown = detect_clean_shutdown(Dir),
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
terminate(Terms, State) ->
@@ -270,9 +274,13 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_guids = UnsyncedGuids })
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} = get_journal_handle(State),
+ {JournalHdl, State1} = get_journal_handle(
+ State #qistate {
+ unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -303,7 +311,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ notify_sync(State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -393,7 +401,9 @@ blank_state(QueueName) ->
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
- max_journal_entries = MaxJournal }.
+ max_journal_entries = MaxJournal,
+ on_sync = fun (_) -> ok end,
+ unsynced_guids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -625,7 +635,7 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
@@ -713,6 +723,10 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+ OnSyncFun(gb_sets:from_list(UG)),
+ State #qistate { unsynced_guids = [] }.
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------
@@ -1039,27 +1053,26 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- Size = filelib:file_size(Path),
-
- {ok, PathTmpHdl} =
- file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
- [{write_buffer, infinity}]),
+ case filelib:file_size(Path) of
+ 0 -> ok;
+ Size -> {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
- {ok, PathHdl} =
- file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
- {ok, Content} = file_handle_cache:read(PathHdl, Size),
- ok = file_handle_cache:close(PathHdl),
+ {ok, PathHdl} = file_handle_cache:open(
+ Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
- ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
- ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path).
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path)
+ end.
drive_transform_fun(Fun, Hdl, Contents) ->
case Fun(Contents) of
- stop ->
- ok;
- {Output, Contents1} ->
- ok = file_handle_cache:append(Hdl, Output),
- drive_transform_fun(Fun, Hdl, Contents1)
+ stop -> ok;
+ {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
end.