diff options
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 91b19976..d6b8bb28 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -193,7 +193,7 @@ {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). --spec(init/3 :: (rabbit_amqqueue:name(), boolean(), +-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), fun ((rabbit_guid:guid()) -> boolean())) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). @@ -220,8 +220,8 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name, not Recover), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 @@ -356,9 +356,14 @@ recover(DurableQueues) -> %% startup and shutdown %%---------------------------------------------------------------------------- -blank_state(QueueName) -> +blank_state(QueueName, EnsureFresh) -> StrName = queue_name_to_dir_name(QueueName), Dir = filename:join(queues_dir(), StrName), + ok = case EnsureFresh of + true -> false = filelib:is_file(Dir), %% is_file == is file or dir + ok; + false -> ok + end, ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), @@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, + <<Num:128>> = erlang:md5(term_to_binary(Name)), lists:flatten(io_lib:format("~.36B", [Num])). queues_dir() -> @@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), + recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); |