summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-03 15:52:49 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-03 15:52:49 +0000
commitd869defa7db1d94eb3dcfeef4b46d3a706a59390 (patch)
tree83b8381df55df4d435639bc4e426865f89fad283
parentd205a4285ca02d12493ac0dd0f5256af4294723c (diff)
downloadrabbitmq-server-d869defa7db1d94eb3dcfeef4b46d3a706a59390.tar.gz
Hacked-together implementation of persisting messages in the QI. Currently does it for all messages, but in reality we'd only want to do this for small ones (and make it configurable). Confirms are probably broken, maybe some other things.
-rw-r--r--src/rabbit_queue_index.erl223
-rw-r--r--src/rabbit_variable_queue.erl56
2 files changed, 181 insertions, 98 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d4..90729e33 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -128,8 +128,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,26 +140,37 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(MSG_IN_INDEX_SIZE_BYTES, 4).
+-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)).
+
%% 16 bytes for md5sum + 8 for expiry + 4 for size
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES +
+ ?MSG_IN_INDEX_SIZE_BYTES)).
%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+-define(PUB_RECORD_PREFIX_BYTES, 2).
+
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+%% %% 1 publish, 1 deliver, 1 ack per msg
+%% -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
+%% (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
%% ---- misc ----
-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(READ_AHEAD_MODE, ?READ_MODE).
-define(WRITE_MODE, [write | ?READ_MODE]).
+-define(READ_BUFFER_SIZE, 1048576). %% 1MB
+
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
@@ -219,7 +230,8 @@
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
- boolean(), boolean()}], qistate()}).
+ boolean(), boolean()}],
+ non_neg_integer(), non_neg_integer(), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -267,9 +279,12 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unconfirmed = Unconfirmed }) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
@@ -284,9 +299,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ create_pub_record_body(MsgOrId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -323,11 +338,12 @@ read(Start, End, State = #qistate { segments = Segments,
%% Start is inclusive, End is exclusive.
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
- {Messages, Segments1} =
+ {Messages, {BodiesRead, BodyBytesRead}, Segments1} =
lists:foldr(fun (Seg, Acc) ->
read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
- end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
- {Messages, State #qistate { segments = Segments1 }}.
+ end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, BodiesRead, BodyBytesRead,
+ State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -541,7 +557,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +572,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +585,42 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<0:?MSG_IN_INDEX_SIZE_BITS>>];
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ MsgBinSize = size(MsgBin),
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin]
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>,
+ Hdl) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case IndexSize of
+ 0 -> {MsgId, Props};
+ _ -> case file_handle_cache:read(Hdl, IndexSize) of
+ {ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
+ binary_to_term(MsgBin),
+ {Msg, Props};
+ _ -> exit(could_not_read)
+ end
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -719,14 +754,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
{ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ {MsgOrId, Props} = read_pub_record_body(Bin, Hdl),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end,
load_journal_entries(
add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
+ SeqId, {MsgOrId, Props, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -829,12 +864,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {MsgId, MsgProps, IsPersistent} ->
+ {MsgOrId, MsgProps, IsPersistent} ->
+ %% Body = create_pub_record_body(MsgOrId, MsgProps),
+ %% io:format("pub ~p~n",
+ %% [[{persist, IsPersistent},
+ %% {relseq, RelSeq},
+ %% {body, Body}]]),
+ %% io:format("write ~p~n",
+ %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ %% (bool_to_int(IsPersistent)):1,
+ %% RelSeq:?REL_SEQ_BITS>>,
+ %% Body])]),
file_handle_cache:append(
Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
+ create_pub_record_body(MsgOrId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -851,18 +896,21 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
Hdl.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
- {Messages, Segments}, Dir) ->
+ {Messages, BodyReadCounts, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
- {segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
- when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
- (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
- (_RelSeq, _Value, Acc) ->
- Acc
- end, Messages, Segment),
- segment_store(Segment, Segments)}.
+ {Messages1, BodyReadCounts1} =
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ {Acc, BodyReadAcc})
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc],
+ incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)};
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, {Messages, BodyReadCounts}, Segment),
+ {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}.
segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
@@ -870,6 +918,12 @@ segment_entries_foldr(Fun, Init,
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
array:sparse_foldr(Fun, Init, SegEntries1).
+incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) ->
+ Counts;
+incr_body_read_counts(#basic_message{}, #message_properties{size = Size},
+ {BodiesRead, BodyBytesRead}) ->
+ {BodiesRead + 1, BodyBytesRead + Size}.
+
%% Loading segments
%%
%% Does not do any combining with the journal at all.
@@ -877,44 +931,50 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(
+ Path, ?READ_AHEAD_MODE,
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ Res = load_segment_entries(Hdl, KeepAcked, Empty),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+load_segment_entries(Hdl, KeepAcked, Acc) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
+ {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked,
+ load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
+ {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+ eof -> %% TODO or maybe _
+ Acc
+ end.
+
+load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
+ {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl),
+ Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ {SegEntries1, Unacked + 1};
+ _ ->
+ {SegEntries, Unacked}
+ end.
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1124,6 +1184,8 @@ store_msg_size_segment(_) ->
%%----------------------------------------------------------------------------
+%% TODO here?
+
foreach_queue_index(Funs) ->
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
@@ -1157,7 +1219,8 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, [{read_ahead, Size} | ?READ_MODE],
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534..41c55652 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -362,7 +362,7 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
- msgs_on_disk :: gb_sets:set(),
+ msgs_on_disk :: gb_sets:set(), %% TODO fix confirms!
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
@@ -645,7 +645,7 @@ fetch(AckRequired, State) ->
%% at this point, so read it in.
{Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
- {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, State3}
end.
drop(AckRequired, State) ->
@@ -963,9 +963,19 @@ msg_status(IsPersistent, IsDelivered, SeqId,
index_on_disk = false,
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -973,7 +983,7 @@ beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1002,21 +1012,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:read(MsgId, MSCState1)
+ exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1)
end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
- rabbit_msg_store:remove(MsgIds, MCSState1)
+ ok %% rabbit_msg_store:remove(MsgIds, MCSState1)
end).
msg_store_close_fds(MSCState, IsPersistent) ->
@@ -1038,7 +1048,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -1308,11 +1318,11 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
+ %% Msg1 = Msg #basic_message {
+ %% %% don't persist any recoverable decoded properties
+ %% content = rabbit_binary_parser:clear_decoded_content(
+ %% Msg #basic_message.content)},
+ %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1322,6 +1332,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
@@ -1329,8 +1340,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
+ Msg1, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1575,7 +1590,8 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ {List, _, _, IndexState1} =
+ rabbit_queue_index:read(SeqId, SeqId1, IndexState),
next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
@@ -1744,6 +1760,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
@@ -1753,11 +1771,13 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
+ {List, RamCountDelta, RamBytesDelta, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountDelta,
+ ram_bytes = RamBytes + RamBytesDelta },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being