diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-03 15:52:49 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-03 15:52:49 +0000 |
commit | d869defa7db1d94eb3dcfeef4b46d3a706a59390 (patch) | |
tree | 83b8381df55df4d435639bc4e426865f89fad283 | |
parent | d205a4285ca02d12493ac0dd0f5256af4294723c (diff) | |
download | rabbitmq-server-d869defa7db1d94eb3dcfeef4b46d3a706a59390.tar.gz |
Hacked-together implementation of persisting messages in the QI. Currently does it for all messages, but in reality we'd only want to do this for small ones (and make it configurable). Confirms are probably broken, maybe some other things.
-rw-r--r-- | src/rabbit_queue_index.erl | 223 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 56 |
2 files changed, 181 insertions, 98 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0a2c88d4..90729e33 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -128,8 +128,8 @@ -define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits -%% of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of +%% size and then 128 bits of md5sum msg id. -define(PUB_PREFIX, 1). -define(PUB_PREFIX_BITS, 1). @@ -140,26 +140,37 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +%% This is the size of the message body content, for stats -define(SIZE_BYTES, 4). -define(SIZE_BITS, (?SIZE_BYTES * 8)). +%% This is the size of the message record embedded in the queue +%% index. If 0, the message can be found in the message store. +-define(MSG_IN_INDEX_SIZE_BYTES, 4). +-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)). + %% 16 bytes for md5sum + 8 for expiry + 4 for size --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES + + ?MSG_IN_INDEX_SIZE_BYTES)). %% + 2 for seq, bits and prefix --define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). +-define(PUB_RECORD_PREFIX_BYTES, 2). + +-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)). -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). +%% %% 1 publish, 1 deliver, 1 ack per msg +%% -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * +%% (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). %% ---- misc ---- -define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). --define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). +-define(READ_AHEAD_MODE, ?READ_MODE). -define(WRITE_MODE, [write | ?READ_MODE]). +-define(READ_BUFFER_SIZE, 1048576). %% 1MB + %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, @@ -219,7 +230,8 @@ -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), - boolean(), boolean()}], qistate()}). + boolean(), boolean()}], + non_neg_integer(), non_neg_integer(), qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -267,9 +279,12 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unconfirmed = Unconfirmed }) - when is_binary(MsgId) -> +publish(MsgOrId, SeqId, MsgProps, IsPersistent, + State = #qistate { unconfirmed = Unconfirmed }) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( @@ -284,9 +299,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]), + create_pub_record_body(MsgOrId, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). + add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -323,11 +338,12 @@ read(Start, End, State = #qistate { segments = Segments, %% Start is inclusive, End is exclusive. LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), - {Messages, Segments1} = + {Messages, {BodiesRead, BodyBytesRead}, Segments1} = lists:foldr(fun (Seg, Acc) -> read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) - end, {[], Segments}, lists:seq(StartSeg, EndSeg)), - {Messages, State #qistate { segments = Segments1 }}. + end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)), + {Messages, BodiesRead, BodyBytesRead, + State #qistate { segments = Segments1 }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -541,7 +557,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) + when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> @@ -555,9 +572,9 @@ scan_segments(Fun, Acc, State) -> Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) end, Acc, all_segment_nums(State1)), @@ -568,24 +585,42 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry, - size = Size }) -> - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. +create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, + size = Size }) -> + case MsgOrId of + MsgId when is_binary(MsgId) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, + <<0:?MSG_IN_INDEX_SIZE_BITS>>]; + #basic_message{id = MsgId} -> + MsgBin = term_to_binary(MsgOrId), + MsgBinSize = size(MsgBin), + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, + <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin] + end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS>>) -> +read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>, + Hdl) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties { expiry = Exp, - size = Size }}. + Props = #message_properties{expiry = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + size = Size}, + case IndexSize of + 0 -> {MsgId, Props}; + _ -> case file_handle_cache:read(Hdl, IndexSize) of + {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = + binary_to_term(MsgBin), + {Msg, Props}; + _ -> exit(could_not_read) + end + end. %%---------------------------------------------------------------------------- %% journal manipulation @@ -719,14 +754,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> State; {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgId, MsgProps} = parse_pub_record_body(Bin), + {MsgOrId, Props} = read_pub_record_body(Bin, Hdl), IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end, load_journal_entries( add_to_journal( - SeqId, {MsgId, MsgProps, IsPersistent}, State)); + SeqId, {MsgOrId, Props, IsPersistent}, State)); _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -829,12 +864,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {MsgId, MsgProps, IsPersistent} -> + {MsgOrId, MsgProps, IsPersistent} -> + %% Body = create_pub_record_body(MsgOrId, MsgProps), + %% io:format("pub ~p~n", + %% [[{persist, IsPersistent}, + %% {relseq, RelSeq}, + %% {body, Body}]]), + %% io:format("write ~p~n", + %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS, + %% (bool_to_int(IsPersistent)):1, + %% RelSeq:?REL_SEQ_BITS>>, + %% Body])]), file_handle_cache:append( Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]) + create_pub_record_body(MsgOrId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -851,18 +896,21 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> Hdl. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, - {Messages, Segments}, Dir) -> + {Messages, BodyReadCounts, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), - {segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) - when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso - (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, - IsPersistent, IsDelivered == del} | Acc ]; - (_RelSeq, _Value, Acc) -> - Acc - end, Messages, Segment), - segment_store(Segment, Segments)}. + {Messages1, BodyReadCounts1} = + segment_entries_foldr( + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, + {Acc, BodyReadAcc}) + when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso + (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> + {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + IsPersistent, IsDelivered == del} | Acc], + incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)}; + (_RelSeq, _Value, Acc) -> + Acc + end, {Messages, BodyReadCounts}, Segment), + {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}. segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> @@ -870,6 +918,12 @@ segment_entries_foldr(Fun, Init, {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldr(Fun, Init, SegEntries1). +incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) -> + Counts; +incr_body_read_counts(#basic_message{}, #message_properties{size = Size}, + {BodiesRead, BodyBytesRead}) -> + {BodiesRead + 1, BodyBytesRead + Size}. + %% Loading segments %% %% Does not do any combining with the journal at all. @@ -877,44 +931,50 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), + true -> {ok, Hdl} = file_handle_cache:open( + Path, ?READ_AHEAD_MODE, + [{read_buffer, ?READ_BUFFER_SIZE}]), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of - {ok, SegData} -> load_segment_entries( - KeepAcked, SegData, Empty); - eof -> Empty - end, + Res = load_segment_entries(Hdl, KeepAcked, Empty), ok = file_handle_cache:close(Hdl), Res end. -load_segment_entries(KeepAcked, - <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, - PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, - SegData/binary>>, - {SegEntries, UnackedCount}) -> - {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); -load_segment_entries(KeepAcked, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, SegData/binary>>, - {SegEntries, UnackedCount}) -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, SegData, - {SegEntries1, UnackedCount + UnackedCountDelta}); -load_segment_entries(_KeepAcked, _SegData, Res) -> - Res. +load_segment_entries(Hdl, KeepAcked, Acc) -> + case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of + {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} -> + load_segment_entries( + Hdl, KeepAcked, + load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc)); + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>} -> + load_segment_entries( + Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); + eof -> %% TODO or maybe _ + Acc + end. + +load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> + case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of + {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> + {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl), + Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + {SegEntries1, Unacked + 1}; + _ -> + {SegEntries, Unacked} + end. + +add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; + {Pub, del, no_ack} when KeepAcked -> + {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; + {_Pub, del, no_ack} -> + {array:reset(RelSeq, SegEntries), Unacked - 1} + end. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1124,6 +1184,8 @@ store_msg_size_segment(_) -> %%---------------------------------------------------------------------------- +%% TODO here? + foreach_queue_index(Funs) -> QueuesDir = queues_dir(), QueueDirNames = all_queue_directory_names(QueuesDir), @@ -1157,7 +1219,8 @@ transform_file(Path, Fun) when is_function(Fun)-> [{write_buffer, infinity}]), {ok, PathHdl} = file_handle_cache:open( - Path, [{read_ahead, Size} | ?READ_MODE], []), + Path, [{read_ahead, Size} | ?READ_MODE], + [{read_buffer, ?READ_BUFFER_SIZE}]), {ok, Content} = file_handle_cache:read(PathHdl, Size), ok = file_handle_cache:close(PathHdl), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d076b534..41c55652 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -362,7 +362,7 @@ out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), - msgs_on_disk :: gb_sets:set(), + msgs_on_disk :: gb_sets:set(), %% TODO fix confirms! msg_indices_on_disk :: gb_sets:set(), unconfirmed :: gb_sets:set(), confirmed :: gb_sets:set(), @@ -645,7 +645,7 @@ fetch(AckRequired, State) -> %% at this point, so read it in. {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(AckRequired, MsgStatus, State2), - {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} + {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, State3} end. drop(AckRequired, State) -> @@ -963,9 +963,19 @@ msg_status(IsPersistent, IsDelivered, SeqId, index_on_disk = false, msg_props = MsgProps}. +beta_msg_status({Msg = #basic_message{id = MsgId}, + SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = Msg}; + beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = undefined}. + +beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> #msg_status{seq_id = SeqId, - msg_id = MsgId, msg = undefined, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -973,7 +983,7 @@ beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. +trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1002,21 +1012,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> - rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> - rabbit_msg_store:read(MsgId, MSCState1) + exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1) end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, fun (MCSState1) -> - rabbit_msg_store:remove(MsgIds, MCSState1) + ok %% rabbit_msg_store:remove(MsgIds, MCSState1) end). msg_store_close_fds(MSCState, IsPersistent) -> @@ -1038,7 +1048,7 @@ maybe_write_delivered(true, SeqId, IndexState) -> betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, {Filtered1, Delivers1, Acks1} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -1308,11 +1318,11 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), + %% Msg1 = Msg #basic_message { + %% %% don't persist any recoverable decoded properties + %% content = rabbit_binary_parser:clear_decoded_content( + %% Msg #basic_message.content)}, + %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), MsgStatus #msg_status { msg_on_disk = true }; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. @@ -1322,6 +1332,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, @@ -1329,8 +1340,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, IndexState1 = rabbit_queue_index:publish( - MsgId, SeqId, MsgProps, IsPersistent, IndexState), + Msg1, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1575,7 +1590,8 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + {List, _, _, IndexState1} = + rabbit_queue_index:read(SeqId, SeqId1, IndexState), next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); @@ -1744,6 +1760,8 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, transient_threshold = TransientThreshold }) -> @@ -1753,11 +1771,13 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), + {List, RamCountDelta, RamBytesDelta, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, + State1 = State #vqstate { index_state = IndexState2, + ram_msg_count = RamMsgCount + RamCountDelta, + ram_bytes = RamBytes + RamBytesDelta }, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being |