diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-11 17:17:16 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-11 17:17:16 +0000 |
commit | 1f20a76b85ff502efac5b40f322173e8ac55e695 (patch) | |
tree | 0e2edde2b0530f9077ed43026b864f4e39a11790 | |
parent | 919d601182560e9c55e4c4e5786e150fc57df7c2 (diff) | |
parent | 9d3539e908b3266fdb7c010bb2d7f32401f36a2b (diff) | |
download | rabbitmq-server-1f20a76b85ff502efac5b40f322173e8ac55e695.tar.gz |
Merge in default
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 303 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 368 | ||||
-rw-r--r-- | test/src/rabbit_tests.erl | 40 |
4 files changed, 464 insertions, 248 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9e5584a1..5ebef608 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,6 +29,7 @@ {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, + {queue_index_embed_msgs_below, 1024}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0a2c88d4..75b13e02 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,19 +16,25 @@ -module(rabbit_queue_index). --export([erase/1, init/2, recover/5, +-export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- %% The queue index is responsible for recording the order of messages -%% within a queue on disk. +%% within a queue on disk. As such it contains records of messages +%% being published, delivered and acknowledged. The publish record +%% includes the sequence ID, message ID and a small quantity of +%% metadata about the message; the delivery and acknowledgement +%% records just contain the sequence ID. A publish record may also +%% contain the complete message if provided to publish/5; this allows +%% the message store to be avoided altogether for small messages. %% %% Because of the fact that the queue can decide at any point to send %% a queue entry to disk, you can not rely on publishes appearing in @@ -36,7 +42,7 @@ %% then delivered, then ack'd. %% %% In order to be able to clean up ack'd messages, we write to segment -%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT +%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT %% publishes, delivers and acknowledgements. They are numbered, and so %% it is known that the 0th segment contains messages 0 -> %% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages @@ -128,8 +134,8 @@ -define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits -%% of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of +%% size and then 128 bits of md5sum msg id. -define(PUB_PREFIX, 1). -define(PUB_PREFIX_BITS, 1). @@ -140,32 +146,40 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +%% This is the size of the message body content, for stats -define(SIZE_BYTES, 4). -define(SIZE_BITS, (?SIZE_BYTES * 8)). +%% This is the size of the message record embedded in the queue +%% index. If 0, the message can be found in the message store. +-define(MSG_IN_INDEX_SIZE_BYTES, 4). +-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)). + %% 16 bytes for md5sum + 8 for expiry + 4 for size --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES + + ?MSG_IN_INDEX_SIZE_BYTES)). %% + 2 for seq, bits and prefix --define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). +-define(PUB_RECORD_PREFIX_BYTES, 2). -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). +-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)). %% ---- misc ---- -define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). --define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). +-define(READ_AHEAD_MODE, ?READ_MODE). -define(WRITE_MODE, [write | ?READ_MODE]). +-define(READ_BUFFER_SIZE, 1048576). %% 1MB + %%---------------------------------------------------------------------------- --record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unconfirmed }). +-record(qistate, {dir, segments, journal_handle, dirty_count, + max_journal_entries, on_sync, on_sync_msg, + unconfirmed, unconfirmed_msg}). --record(segment, { num, path, journal_entries, unacked }). +-record(segment, {num, path, journal_entries, unacked}). -include("rabbit.hrl"). @@ -174,6 +188,7 @@ -rabbit_upgrade({add_queue_ttl, local, []}). -rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). +-rabbit_upgrade({store_msg, local, [store_msg_size]}). -ifdef(use_specs). @@ -193,7 +208,9 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unconfirmed :: gb_sets:set() + on_sync_msg :: on_sync_fun(), + unconfirmed :: gb_sets:set(), + unconfirmed_msg :: gb_sets:set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -201,9 +218,11 @@ -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). --spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), on_sync_fun()) -> +-spec(init/3 :: (rabbit_amqqueue:name(), + on_sync_fun(), on_sync_fun()) -> qistate()). +-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + contains_predicate(), + on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). @@ -241,14 +260,17 @@ erase(Name) -> false -> ok end. -init(Name, OnSyncFun) -> +init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir - State #qistate { on_sync = OnSyncFun }. + State#qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, + OnSyncFun, OnSyncMsgFun) -> State = blank_state(Name), - State1 = State #qistate { on_sync = OnSyncFun }, + State1 = State #qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), @@ -267,16 +289,22 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unconfirmed = Unconfirmed }) - when is_binary(MsgId) -> +publish(MsgOrId, SeqId, MsgProps, IsPersistent, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - case MsgProps#message_properties.needs_confirming of - true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), - State #qistate { unconfirmed = Unconfirmed1 }; - false -> State + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of @@ -284,9 +312,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]), + create_pub_record_body(MsgOrId, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). + add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -302,10 +330,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -needs_sync(#qistate { journal_handle = undefined }) -> +needs_sync(#qistate{journal_handle = undefined}) -> false; -needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> - case gb_sets:is_empty(UC) of +needs_sync(#qistate{journal_handle = JournalHdl, + unconfirmed = UC, + unconfirmed_msg = UCM}) -> + case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of true -> case file_handle_cache:needs_sync(JournalHdl) of true -> other; false -> false @@ -409,7 +439,9 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unconfirmed = gb_sets:new() }. + on_sync_msg = fun (_) -> ok end, + unconfirmed = gb_sets:new(), + unconfirmed_msg = gb_sets:new() }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -541,7 +573,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) + when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> @@ -555,9 +588,9 @@ scan_segments(Fun, Acc, State) -> Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) end, Acc, all_segment_nums(State1)), @@ -568,24 +601,42 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry, - size = Size }) -> - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. +create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, + size = Size }) -> + case MsgOrId of + MsgId when is_binary(MsgId) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, + <<0:?MSG_IN_INDEX_SIZE_BITS>>]; + #basic_message{id = MsgId} -> + MsgBin = term_to_binary(MsgOrId), + MsgBinSize = size(MsgBin), + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, + <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin] + end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS>>) -> +read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>, + Hdl) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties { expiry = Exp, - size = Size }}. + Props = #message_properties{expiry = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + size = Size}, + case IndexSize of + 0 -> {MsgId, Props}; + _ -> case file_handle_cache:read(Hdl, IndexSize) of + {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = + binary_to_term(MsgBin), + {Msg, Props}; + _ -> exit(could_not_read) %% TODO + end + end. %%---------------------------------------------------------------------------- %% journal manipulation @@ -719,14 +770,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> State; {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgId, MsgProps} = parse_pub_record_body(Bin), + {MsgOrId, Props} = read_pub_record_body(Bin, Hdl), IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end, load_journal_entries( add_to_journal( - SeqId, {MsgId, MsgProps, IsPersistent}, State)); + SeqId, {MsgOrId, Props, IsPersistent}, State)); _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -746,11 +797,19 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> - case gb_sets:is_empty(UC) of - true -> State; - false -> OnSyncFun(UC), - State #qistate { unconfirmed = gb_sets:new() } +notify_sync(State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}) -> + State1 = case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State#qistate{unconfirmed = gb_sets:new()} + end, + case gb_sets:is_empty(UCM) of + true -> State1; + false -> OnSyncMsgFun(UCM), + State1#qistate{unconfirmed_msg = gb_sets:new()} end. %%---------------------------------------------------------------------------- @@ -829,12 +888,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {MsgId, MsgProps, IsPersistent} -> + {MsgOrId, MsgProps, IsPersistent} -> + %% Body = create_pub_record_body(MsgOrId, MsgProps), + %% io:format("pub ~p~n", + %% [[{persist, IsPersistent}, + %% {relseq, RelSeq}, + %% {body, Body}]]), + %% io:format("write ~p~n", + %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS, + %% (bool_to_int(IsPersistent)):1, + %% RelSeq:?REL_SEQ_BITS>>, + %% Body])]), file_handle_cache:append( Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]) + create_pub_record_body(MsgOrId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -854,11 +923,12 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, - IsPersistent, IsDelivered == del} | Acc ]; + [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + IsPersistent, IsDelivered == del} | Acc]; (_RelSeq, _Value, Acc) -> Acc end, Messages, Segment), @@ -877,44 +947,50 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), + true -> {ok, Hdl} = file_handle_cache:open( + Path, ?READ_AHEAD_MODE, + [{read_buffer, ?READ_BUFFER_SIZE}]), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of - {ok, SegData} -> load_segment_entries( - KeepAcked, SegData, Empty); - eof -> Empty - end, + Res = load_segment_entries(Hdl, KeepAcked, Empty), ok = file_handle_cache:close(Hdl), Res end. -load_segment_entries(KeepAcked, - <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, - PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, - SegData/binary>>, - {SegEntries, UnackedCount}) -> - {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); -load_segment_entries(KeepAcked, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, SegData/binary>>, - {SegEntries, UnackedCount}) -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, SegData, - {SegEntries1, UnackedCount + UnackedCountDelta}); -load_segment_entries(_KeepAcked, _SegData, Res) -> - Res. +load_segment_entries(Hdl, KeepAcked, Acc) -> + case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of + {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} -> + load_segment_entries( + Hdl, KeepAcked, + load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc)); + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>} -> + load_segment_entries( + Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); + eof -> %% TODO or maybe _ + Acc + end. + +load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> + case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of + {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> + {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl), + Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + {SegEntries1, Unacked + 1}; + _ -> + {SegEntries, Unacked} + end. + +add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; + {Pub, del, no_ack} when KeepAcked -> + {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; + {_Pub, del, no_ack} -> + {array:reset(RelSeq, SegEntries), Unacked - 1} + end. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1121,6 +1197,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, store_msg_size_segment(_) -> stop. +store_msg() -> + foreach_queue_index({fun store_msg_journal/1, + fun store_msg_segment/1}). + +store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_journal(_) -> + stop. + +store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_segment(_) -> + stop. + + + %%---------------------------------------------------------------------------- @@ -1157,7 +1267,8 @@ transform_file(Path, Fun) when is_function(Fun)-> [{write_buffer, infinity}]), {ok, PathHdl} = file_handle_cache:open( - Path, [{read_ahead, Size} | ?READ_MODE], []), + Path, [{read_ahead, Size} | ?READ_MODE], + [{read_buffer, ?READ_BUFFER_SIZE}]), {ok, Content} = file_handle_cache:read(PathHdl, Size), ok = file_handle_cache:close(PathHdl), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1da3de26..9711673c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -28,7 +28,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -248,8 +248,9 @@ q3, q4, next_seq_id, - ram_pending_ack, - disk_pending_ack, + ram_pending_ack, %% msgs using store, still in RAM + disk_pending_ack, %% msgs in store, paged out + qi_pending_ack, %% msgs using qi, *can't* be paged out index_state, msg_store_clients, durable, @@ -285,7 +286,7 @@ msg, is_persistent, is_delivered, - msg_on_disk, + msg_in_store, index_on_disk, msg_props }). @@ -341,6 +342,7 @@ next_seq_id :: seq_id(), ram_pending_ack :: gb_trees:tree(), disk_pending_ack :: gb_trees:tree(), + qi_pending_ack :: gb_trees:tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -426,16 +428,19 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback) -> - init(Queue, Recover, AsyncCallback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). +init(Queue, Recover, Callback) -> + init( + Queue, Recover, Callback, + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(Callback, MsgIds, ActionTaken) + end, + fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, + fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, @@ -446,13 +451,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), - {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + {C, fun (MsgId) when is_binary(MsgId) -> + rabbit_msg_store:contains(MsgId, C); + (#basic_message{is_persistent = Persistent}) -> + Persistent + end}; false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, @@ -461,7 +470,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun), + ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -517,26 +526,25 @@ delete_crashed(#amqqueue{name = QName}) -> purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, - len = Len, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }) -> + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {RamBytes, PCount, PBytes}, + Stats = {0, 0, 0}, {Stats1, IndexState1} = remove_queue_entries(Q4, Stats, IndexState, MSCState), {Stats2, State1 = #vqstate { q1 = Q1, index_state = IndexState2, - msg_store_clients = MSCState1 }} = - + msg_store_clients = MSCState1, + ram_bytes = RamBytes, + persistent_count = PCount, + persistent_bytes = PBytes }} = purge_betas_and_deltas( Stats1, State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), - {{RamBytes3, PCount3, PBytes3}, IndexState3} = + {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} = remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), @@ -544,9 +552,9 @@ purge(State = #vqstate { q4 = Q4, len = 0, bytes = 0, ram_msg_count = 0, - ram_bytes = RamBytes3, - persistent_count = PCount3, - persistent_bytes = PBytes3 })}. + ram_bytes = RamBytes - RamBytesDec, + persistent_count = PCount - PCountDec, + persistent_bytes = PBytes - PBytesDec })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -664,7 +672,7 @@ ack([], State) -> ack([SeqId], State) -> {#msg_status { msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -674,7 +682,7 @@ ack([SeqId], State) -> true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState end, - case MsgOnDisk of + case MsgInStore of true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, @@ -733,15 +741,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, [msg_iterator(State), disk_ack_iterator(State), - ram_ack_iterator(State)]), + ram_ack_iterator(State), + qi_ack_iterator(State)]), ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA). +depth(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -807,10 +818,11 @@ ram_duration(State) -> ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, + qi_pending_ack = QPA, ram_ack_count_prev = RamAckCountPrev } = update_rates(State), - RamAckCount = gb_trees:size(RPA), + RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec case lists:all(fun (X) -> X < 0.01 end, @@ -846,8 +858,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> - gb_trees:size(RPA); +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, + qi_pending_ack = QPA}) -> + gb_trees:size(RPA) + gb_trees:size(QPA); info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> @@ -935,12 +948,10 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) m(MsgStatus = #msg_status { msg = Msg, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }) -> true = (not IsPersistent) or IndexOnDisk, - true = (not IndexOnDisk) or MsgOnDisk, - true = (Msg =/= undefined) or MsgOnDisk, - + true = (Msg =/= undefined) or MsgInStore, MsgStatus. one_if(true ) -> 1; @@ -959,21 +970,36 @@ msg_status(IsPersistent, IsDelivered, SeqId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, + msg_in_store = false, index_on_disk = false, msg_props = MsgProps}. +beta_msg_status({Msg = #basic_message{id = MsgId}, + SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = Msg, + msg_in_store = false}; + beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = undefined, + msg_in_store = true}. + +beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> #msg_status{seq_id = SeqId, - msg_id = MsgId, msg = undefined, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = true, index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. +trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) -> + case persist_to(MsgProps) of + msg_store -> MsgStatus#msg_status{msg = undefined}; + queue_index -> MsgStatus + end. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1035,26 +1061,36 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> - {Filtered, Delivers, Acks} = +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> + {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( - fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1} = Acc) -> + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1]}; - false -> case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {?QUEUE:in_r(m(beta_msg_status(M)), - Filtered1), - Delivers1, Acks1}; - true -> Acc - end + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = MsgStatus#msg_status.msg =/= undefined, + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end end - end, {?QUEUE:new(), [], []}, List), - {Filtered, rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + end, {?QUEUE:new(), [], [], 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. +%% [0] We don't increase RamBytes here, even though it pertains to +%% unacked messages too, since if HaveMsg then the message must have +%% been stored in the QI, thus the message must have been in +%% qi_pending_ack, thus it must already have been in RAM. expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); @@ -1101,6 +1137,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, next_seq_id = NextSeqId, ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1141,11 +1178,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_bytes( + upd_ram_bytes_count( 1, MsgStatus, - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) })) + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1193,6 +1229,9 @@ upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, unacked_bytes = UBytes + SignUnacked * S, persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. +upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) -> + upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}). + upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. @@ -1206,7 +1245,7 @@ remove(AckRequired, MsgStatus = #msg_status { msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State = #vqstate {ram_msg_count = RamMsgCount, out_counter = OutCount, @@ -1224,10 +1263,11 @@ remove(AckRequired, MsgStatus = #msg_status { ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of - {false, true, false} -> Rem(), IndexState1; - {false, true, true} -> Rem(), Ack(); - _ -> IndexState1 + IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + {false, false, true} -> Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1267,35 +1307,31 @@ purge_betas_and_deltas(Stats, index_state = IndexState1 })) end. -remove_queue_entries(Q, {RamBytes, PCount, PBytes}, +remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec}, IndexState, MSCState) -> - {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = + {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytes, PBytes, [], []}, Q), + {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytes1, - PCount - case orddict:find(true, MsgIdsByStore) of - error -> 0; - {ok, Ids} -> length(Ids) - end, - PBytes1}, + {{RamBytesDec1, PCountDec1, PBytesDec1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + is_delivered = IsDelivered, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> - {case MsgOnDisk of + {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) -> + {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytes - Size * one_if(Msg =/= undefined), - PBytes - Size * one_if(IsPersistent), + RamBytesDec + Size * one_if(Msg =/= undefined), + PCountDec + one_if(IsPersistent), + PBytesDec + Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. @@ -1304,36 +1340,39 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, _MSCState) -> + msg_in_store = true }, _MSCState) -> MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, + msg_props = MsgProps, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus #msg_status { msg_on_disk = true }; + case persist_to(MsgProps) of + msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + prepare_to_store(Msg)), + MsgStatus#msg_status{msg_in_store = true}; + queue_index -> MsgStatus + end; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish( - MsgId, SeqId, MsgProps, IsPersistent, IndexState), - {MsgStatus #msg_status { index_on_disk = true }, + case persist_to(MsgProps) of + msg_store -> MsgId; + queue_index -> prepare_to_store(Msg) + end, SeqId, MsgProps, IsPersistent, IndexState), + {MsgStatus#msg_status{index_on_disk = true}, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. @@ -1346,28 +1385,53 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), {MsgStatus2, State #vqstate { index_state = IndexState1 }}. +persist_to(#message_properties{size = Size}) -> + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), + %% This is >= so that you can set the env to 0 and never persist + %% to the index. + case Size >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end. + +prepare_to_store(Msg) -> + Msg#basic_message{ + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}. + %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg, + msg_props = MsgProps } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, ack_in_counter = AckInCount}) -> - {RPA1, DPA1} = - case Msg of - undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)}; - _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA} + Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, + {RPA1, DPA1, QPA1} = + case {Msg, persist_to(MsgProps)} of + {undefined, _} -> {RPA, Insert(DPA), QPA}; + {_, queue_index} -> {RPA, DPA, Insert(QPA)}; + {_, msg_store} -> {Insert(RPA), DPA, QPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, + qi_pending_ack = QPA1, ack_in_counter = AckInCount + 1}. lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> V; - none -> gb_trees:get(SeqId, DPA) + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> V; + none -> gb_trees:get(SeqId, QPA) + end end. %% First parameter = UpdatePersistentCount @@ -1377,27 +1441,38 @@ remove_pending_ack(true, SeqId, State) -> PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), {MsgStatus, upd_bytes(0, -1, MsgStatus, State1 # vqstate{ persistent_count = PCount1 })}; -remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> DPA1 = gb_trees:delete(SeqId, DPA), - {gb_trees:get(SeqId, DPA), - State #vqstate { disk_pending_ack = DPA1 }} + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> + DPA1 = gb_trees:delete(SeqId, DPA), + {V, State#vqstate{disk_pending_ack = DPA1}}; + none -> + QPA1 = gb_trees:delete(SeqId, QPA), + {gb_trees:get(SeqId, QPA), + State#vqstate{qi_pending_ack = QPA1}} + end end. purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, index_state = IndexState, msg_store_clients = MSCState }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA), + F, rabbit_misc:gb_trees_fold( + F, rabbit_misc:gb_trees_fold( + F, accumulate_ack_init(), RPA), DPA), QPA), State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty() }, + disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty()}, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1418,11 +1493,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgOnDisk of + case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, @@ -1469,6 +1544,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> gb_sets:union(MIOD, Confirmed) }) end). +msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). + %%---------------------------------------------------------------------------- %% Internal plumbing for requeue %%---------------------------------------------------------------------------- @@ -1476,7 +1555,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), {MsgStatus#msg_status { msg = Msg }, - upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] + upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1] publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. %% [1] We increase the ram_bytes here because we paged the message in @@ -1488,10 +1567,11 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of - true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; - _ -> {MsgStatus2, State1} - end. + {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of + {true, false} -> upd_ram_bytes(-1, MsgStatus, State1); + {_, true} -> inc_ram_msg_count(State1); + _ -> State1 + end}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1563,6 +1643,9 @@ ram_ack_iterator(State) -> disk_ack_iterator(State) -> {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. +qi_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. + msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; @@ -1586,13 +1669,15 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + {List, IndexState1} = + rabbit_queue_index:read(SeqId, SeqId1, IndexState), next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1755,8 +1840,11 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1764,11 +1852,14 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), + {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + betas_from_index_entries(List, TransientThreshold, + RPA, DPA, QPA, IndexState1), + State1 = State #vqstate { index_state = IndexState2, + ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc }, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being @@ -1826,18 +1917,17 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer( - MsgStatus2, Qa, - upd_ram_bytes( - -1, MsgStatus2, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1})), + State2 = case MsgStatus2#msg_status.msg of + undefined -> upd_ram_bytes_count( + -1, MsgStatus2, State1); + _ -> State1 + end, + State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) + Qa, State3) end end. @@ -1845,7 +1935,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState }) -> - PushState = {Quota, Delta, IndexState}, + PushState = {Quota, Delta, IndexState, State}, {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, @@ -1854,11 +1944,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, Q2, PushState1), - {_, Delta1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1 }. + {_, Delta1, IndexState1, State1} = PushState2, + State1 #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a, + index_state = IndexState1 }. push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of @@ -1875,10 +1965,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end. push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _IndexState} = PushState) -> + {0, _Delta, _IndexState, _State} = PushState) -> {Q, PushState}; push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, IndexState} = PushState) -> + {Quota, Delta, IndexState, State} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1888,9 +1978,13 @@ push_betas_to_deltas1(Generator, Limit, Q, {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus, IndexState), + State1 = case MsgStatus#msg_status.msg of + undefined -> State; + _ -> upd_ram_bytes_count(-1, MsgStatus, State) + end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, IndexState1}) + {Quota - 1, Delta1, IndexState1, State1}) end. %%---------------------------------------------------------------------------- diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index dcbec8f6..870dfdd9 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1888,11 +1888,15 @@ test_backing_queue() -> passed = test_msg_store(), application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit), - passed = test_queue_index(), - passed = test_queue_index_props(), - passed = test_variable_queue(), - passed = test_variable_queue_delete_msg_store_files_callback(), - passed = test_queue_recover(), + [begin + application:set_env( + rabbit, queue_index_embed_msgs_below, Bytes), + passed = test_queue_index(), + passed = test_queue_index_props(), + passed = test_variable_queue(), + passed = test_variable_queue_delete_msg_store_files_callback(), + passed = test_queue_recover() + end || Bytes <- [0, 1024]], application:set_env(rabbit, queue_index_max_journal_entries, MaxJournal), %% We will have restarted the message store, and thus changed @@ -2219,7 +2223,7 @@ init_test_queue() -> fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, - fun nop/1), + fun nop/1, fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -2422,7 +2426,7 @@ variable_queue_init(Q, Recover) -> Q, case Recover of true -> non_clean_shutdown; false -> new - end, fun nop/2, fun nop/2, fun nop/1). + end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2462,7 +2466,10 @@ variable_queue_set_ram_duration_target(Duration, VQ) -> rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). assert_prop(List, Prop, Value) -> - Value = proplists:get_value(Prop, List). + case proplists:get_value(Prop, List)of + Value -> ok; + _ -> {exit, Prop, exp, Value, List} + end. assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. @@ -2485,12 +2492,18 @@ with_fresh_variable_queue(Fun) -> {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate( - shutdown, Fun(VQ)), - Me ! Ref + try + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + catch + Type:Error -> + Me ! {Ref, Type, Error, erlang:get_stacktrace()} + end end), receive - Ref -> ok + Ref -> ok; + {Ref, Type, Error, ST} -> exit({Type, Error, ST}) end, passed. @@ -2787,8 +2800,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> VQ7 = lists:foldl( fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), - io:format("~p:~n~p~n", - [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2849,7 +2860,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), S = variable_queue_status(VQ1), - io:format("~p~n", [S]), assert_props(S, Props), VQ1. |