diff options
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 102 |
1 files changed, 70 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290c..6568aa70 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -141,14 +141,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -200,15 +205,16 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(), + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:msg_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -252,15 +258,19 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProperties, IsPersistent, State) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]), + maybe_flush_journal( + add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -289,7 +299,7 @@ flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], State}; -read(Start, End, State = #qistate { segments = Segments, +read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), @@ -453,7 +463,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, + {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, + Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -506,7 +518,9 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, + {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -516,6 +530,34 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> + <<?NO_EXPIRY:?EXPIRY_BITS>>; +expiry_to_binary(Expiry) -> + <<Expiry:?EXPIRY_BITS>>. + +read_pub_record_body(Hdl) -> + case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + {ok, Bin} -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {Guid, #msg_properties{expiry = Exp}}; + Error -> + Error + end. + +%%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -636,14 +678,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of + case read_pub_record_body(Hdl) of + {Guid, MsgProperties} -> + Publish = {Guid, MsgProperties, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -744,11 +781,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, MsgProperties, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -768,10 +806,12 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, + {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -801,10 +841,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProperties} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); |