summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl106
1 files changed, 53 insertions, 53 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 00f5a752..8227e4cd 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -86,7 +86,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -138,10 +138,10 @@
-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
-define(NO_EXPIRY, 0).
--define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(GUID_BITS, (?GUID_BYTES * 8)).
+-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -150,7 +150,7 @@
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -159,7 +159,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_guids }).
+ max_journal_entries, on_sync, unsynced_msg_ids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -187,21 +187,21 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_guids :: [rabbit_guid:guid()]
+ unsynced_msg_ids :: [rabbit_types:msg_id()]
}).
--type(startup_fun_state() ::
- {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
+-type(walker(A) :: fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
-type(shutdown_terms() :: [any()]).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ contains_predicate(), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(), boolean(), qistate())
-> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -209,14 +209,13 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(),
+ {[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
- {non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
- {[[any()]], startup_fun_state()}).
+ {non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -259,22 +258,22 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_guids = UnsyncedGuids })
- when is_binary(Guid) ->
- ?GUID_BYTES = size(Guid),
+publish(MsgId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(
State #qistate {
- unsynced_guids = [Guid | UnsyncedGuids] }),
+ unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(MsgId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -284,8 +283,8 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_guids = Guids }) ->
- sync_if([] =/= Guids, State).
+sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
+ sync_if([] =/= MsgIds, State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
@@ -388,7 +387,7 @@ blank_state(QueueName) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_guids = [] }.
+ unsynced_msg_ids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -470,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
- recover_message(ContainsCheckFun(Guid), CleanShutdown,
+ fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
+ Segment1) ->
+ recover_message(ContainsCheckFun(MsgId), CleanShutdown,
Del, RelSeq, Segment1)
end,
Segment #segment { unacked = UnackedCount + UnackedCountDelta },
@@ -515,17 +515,17 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
ok = gatherer:stop(Gatherer),
ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
- {value, {Guid, Count}} ->
- {Guid, Count, {next, Gatherer}}
+ {value, {MsgId, Count}} ->
+ {MsgId, Count, {next, Gatherer}}
end.
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
- gatherer:in(Gatherer, {Guid, 1});
+ gatherer:in(Gatherer, {MsgId, 1});
(_RelSeq, _Value, Acc) ->
Acc
end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
@@ -537,24 +537,24 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
- [Guid, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) ->
+ [MsgId, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
read_pub_record_body(Hdl) ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of
+ case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of
{ok, Bin} ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
- <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
Exp = case Expiry of
?NO_EXPIRY -> undefined;
X -> X
end,
- {Guid, #message_properties{expiry = Exp}};
+ {MsgId, #message_properties{expiry = Exp}};
Error ->
Error
end.
@@ -681,8 +681,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case read_pub_record_body(Hdl) of
- {Guid, MsgProps} ->
- Publish = {Guid, MsgProps,
+ {MsgId, MsgProps} ->
+ Publish = {MsgId, MsgProps,
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
@@ -716,9 +716,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
- State #qistate { unsynced_guids = [] }.
+ State #qistate { unsynced_msg_ids = [] }.
%%----------------------------------------------------------------------------
%% segment manipulation
@@ -796,12 +796,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, MsgProps, IsPersistent} ->
+ {MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ create_pub_record_body(MsgId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -821,10 +821,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -854,8 +854,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {Guid, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ {MsgId, MsgProps} = read_pub_record_body(Hdl),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
@@ -1002,17 +1002,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
Rest/binary>>) ->
{<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
- Guid:?GUID_BYTES/binary, Rest/binary>>) ->
- {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
expiry_to_binary(undefined)], Rest};
add_queue_ttl_journal(_) ->
stop.
add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
Rest/binary>>) ->
{[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+ RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,