summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-11 17:17:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-11 17:17:16 +0000
commit1f20a76b85ff502efac5b40f322173e8ac55e695 (patch)
tree0e2edde2b0530f9077ed43026b864f4e39a11790
parent919d601182560e9c55e4c4e5786e150fc57df7c2 (diff)
parent9d3539e908b3266fdb7c010bb2d7f32401f36a2b (diff)
downloadrabbitmq-server-1f20a76b85ff502efac5b40f322173e8ac55e695.tar.gz
Merge in default
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_queue_index.erl303
-rw-r--r--src/rabbit_variable_queue.erl368
-rw-r--r--test/src/rabbit_tests.erl40
4 files changed, 464 insertions, 248 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9e5584a1..5ebef608 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -29,6 +29,7 @@
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
+ {queue_index_embed_msgs_below, 1024},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d4..75b13e02 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,19 +16,25 @@
-module(rabbit_queue_index).
--export([erase/1, init/2, recover/5,
+-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
%% The queue index is responsible for recording the order of messages
-%% within a queue on disk.
+%% within a queue on disk. As such it contains records of messages
+%% being published, delivered and acknowledged. The publish record
+%% includes the sequence ID, message ID and a small quantity of
+%% metadata about the message; the delivery and acknowledgement
+%% records just contain the sequence ID. A publish record may also
+%% contain the complete message if provided to publish/5; this allows
+%% the message store to be avoided altogether for small messages.
%%
%% Because of the fact that the queue can decide at any point to send
%% a queue entry to disk, you can not rely on publishes appearing in
@@ -36,7 +42,7 @@
%% then delivered, then ack'd.
%%
%% In order to be able to clean up ack'd messages, we write to segment
-%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT
+%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT
%% publishes, delivers and acknowledgements. They are numbered, and so
%% it is known that the 0th segment contains messages 0 ->
%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages
@@ -128,8 +134,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,32 +146,40 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(MSG_IN_INDEX_SIZE_BYTES, 4).
+-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)).
+
%% 16 bytes for md5sum + 8 for expiry + 4 for size
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES +
+ ?MSG_IN_INDEX_SIZE_BYTES)).
%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+-define(PUB_RECORD_PREFIX_BYTES, 2).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)).
%% ---- misc ----
-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(READ_AHEAD_MODE, ?READ_MODE).
-define(WRITE_MODE, [write | ?READ_MODE]).
+-define(READ_BUFFER_SIZE, 1048576). %% 1MB
+
%%----------------------------------------------------------------------------
--record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unconfirmed }).
+-record(qistate, {dir, segments, journal_handle, dirty_count,
+ max_journal_entries, on_sync, on_sync_msg,
+ unconfirmed, unconfirmed_msg}).
--record(segment, { num, path, journal_entries, unacked }).
+-record(segment, {num, path, journal_entries, unacked}).
-include("rabbit.hrl").
@@ -174,6 +188,7 @@
-rabbit_upgrade({add_queue_ttl, local, []}).
-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
+-rabbit_upgrade({store_msg, local, [store_msg_size]}).
-ifdef(use_specs).
@@ -193,7 +208,9 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unconfirmed :: gb_sets:set()
+ on_sync_msg :: on_sync_fun(),
+ unconfirmed :: gb_sets:set(),
+ unconfirmed_msg :: gb_sets:set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -201,9 +218,11 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
--spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- contains_predicate(), on_sync_fun()) ->
+-spec(init/3 :: (rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate()).
+-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -241,14 +260,17 @@ erase(Name) ->
false -> ok
end.
-init(Name, OnSyncFun) ->
+init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
- State #qistate { on_sync = OnSyncFun }.
+ State#qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
+ OnSyncFun, OnSyncMsgFun) ->
State = blank_state(Name),
- State1 = State #qistate { on_sync = OnSyncFun },
+ State1 = State #qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
@@ -267,16 +289,22 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- case MsgProps#message_properties.needs_confirming of
- true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
- State #qistate { unconfirmed = Unconfirmed1 };
- false -> State
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
@@ -284,9 +312,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ create_pub_record_body(MsgOrId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -302,10 +330,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-needs_sync(#qistate { journal_handle = undefined }) ->
+needs_sync(#qistate{journal_handle = undefined}) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
- case gb_sets:is_empty(UC) of
+needs_sync(#qistate{journal_handle = JournalHdl,
+ unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
true -> case file_handle_cache:needs_sync(JournalHdl) of
true -> other;
false -> false
@@ -409,7 +439,9 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unconfirmed = gb_sets:new() }.
+ on_sync_msg = fun (_) -> ok end,
+ unconfirmed = gb_sets:new(),
+ unconfirmed_msg = gb_sets:new() }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -541,7 +573,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +588,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +601,42 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<0:?MSG_IN_INDEX_SIZE_BITS>>];
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ MsgBinSize = size(MsgBin),
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin]
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>,
+ Hdl) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case IndexSize of
+ 0 -> {MsgId, Props};
+ _ -> case file_handle_cache:read(Hdl, IndexSize) of
+ {ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
+ binary_to_term(MsgBin),
+ {Msg, Props};
+ _ -> exit(could_not_read) %% TODO
+ end
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -719,14 +770,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
{ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ {MsgOrId, Props} = read_pub_record_body(Bin, Hdl),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end,
load_journal_entries(
add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
+ SeqId, {MsgOrId, Props, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -746,11 +797,19 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
- case gb_sets:is_empty(UC) of
- true -> State;
- false -> OnSyncFun(UC),
- State #qistate { unconfirmed = gb_sets:new() }
+notify_sync(State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}) ->
+ State1 = case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State#qistate{unconfirmed = gb_sets:new()}
+ end,
+ case gb_sets:is_empty(UCM) of
+ true -> State1;
+ false -> OnSyncMsgFun(UCM),
+ State1#qistate{unconfirmed_msg = gb_sets:new()}
end.
%%----------------------------------------------------------------------------
@@ -829,12 +888,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {MsgId, MsgProps, IsPersistent} ->
+ {MsgOrId, MsgProps, IsPersistent} ->
+ %% Body = create_pub_record_body(MsgOrId, MsgProps),
+ %% io:format("pub ~p~n",
+ %% [[{persist, IsPersistent},
+ %% {relseq, RelSeq},
+ %% {body, Body}]]),
+ %% io:format("write ~p~n",
+ %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ %% (bool_to_int(IsPersistent)):1,
+ %% RelSeq:?REL_SEQ_BITS>>,
+ %% Body])]),
file_handle_cache:append(
Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
+ create_pub_record_body(MsgOrId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -854,11 +923,12 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
+ [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc];
(_RelSeq, _Value, Acc) ->
Acc
end, Messages, Segment),
@@ -877,44 +947,50 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(
+ Path, ?READ_AHEAD_MODE,
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ Res = load_segment_entries(Hdl, KeepAcked, Empty),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+load_segment_entries(Hdl, KeepAcked, Acc) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
+ {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked,
+ load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
+ {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+ eof -> %% TODO or maybe _
+ Acc
+ end.
+
+load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
+ {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl),
+ Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ {SegEntries1, Unacked + 1};
+ _ ->
+ {SegEntries, Unacked}
+ end.
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1121,6 +1197,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
store_msg_size_segment(_) ->
stop.
+store_msg() ->
+ foreach_queue_index({fun store_msg_journal/1,
+ fun store_msg_segment/1}).
+
+store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+store_msg_journal(_) ->
+ stop.
+
+store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_segment(_) ->
+ stop.
+
+
+
%%----------------------------------------------------------------------------
@@ -1157,7 +1267,8 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, [{read_ahead, Size} | ?READ_MODE],
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1da3de26..9711673c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -28,7 +28,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -248,8 +248,9 @@
q3,
q4,
next_seq_id,
- ram_pending_ack,
- disk_pending_ack,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
index_state,
msg_store_clients,
durable,
@@ -285,7 +286,7 @@
msg,
is_persistent,
is_delivered,
- msg_on_disk,
+ msg_in_store,
index_on_disk,
msg_props
}).
@@ -341,6 +342,7 @@
next_seq_id :: seq_id(),
ram_pending_ack :: gb_trees:tree(),
disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -426,16 +428,19 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback) ->
- init(Queue, Recover, AsyncCallback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
@@ -446,13 +451,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
- {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end};
+ {C, fun (MsgId) when is_binary(MsgId) ->
+ rabbit_msg_store:contains(MsgId, C);
+ (#basic_message{is_persistent = Persistent}) ->
+ Persistent
+ end};
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
@@ -461,7 +470,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- ContainsCheckFun, MsgIdxOnDiskFun),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -517,26 +526,25 @@ delete_crashed(#amqqueue{name = QName}) ->
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
- len = Len,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }) ->
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {RamBytes, PCount, PBytes},
+ Stats = {0, 0, 0},
{Stats1, IndexState1} =
remove_queue_entries(Q4, Stats, IndexState, MSCState),
{Stats2, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
-
+ msg_store_clients = MSCState1,
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }} =
purge_betas_and_deltas(
Stats1, State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
- {{RamBytes3, PCount3, PBytes3}, IndexState3} =
+ {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} =
remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
@@ -544,9 +552,9 @@ purge(State = #vqstate { q4 = Q4,
len = 0,
bytes = 0,
ram_msg_count = 0,
- ram_bytes = RamBytes3,
- persistent_count = PCount3,
- persistent_bytes = PBytes3 })}.
+ ram_bytes = RamBytes - RamBytesDec,
+ persistent_count = PCount - PCountDec,
+ persistent_bytes = PBytes - PBytesDec })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -664,7 +672,7 @@ ack([], State) ->
ack([SeqId], State) ->
{#msg_status { msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -674,7 +682,7 @@ ack([SeqId], State) ->
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
end,
- case MsgOnDisk of
+ case MsgInStore of
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
@@ -733,15 +741,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
{Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
[msg_iterator(State),
disk_ack_iterator(State),
- ram_ack_iterator(State)]),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
+depth(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -807,10 +818,11 @@ ram_duration(State) ->
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
ram_ack_count_prev = RamAckCountPrev } =
update_rates(State),
- RamAckCount = gb_trees:size(RPA),
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case lists:all(fun (X) -> X < 0.01 end,
@@ -846,8 +858,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
- gb_trees:size(RPA);
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
@@ -935,12 +948,10 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
m(MsgStatus = #msg_status { msg = Msg,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk }) ->
true = (not IsPersistent) or IndexOnDisk,
- true = (not IndexOnDisk) or MsgOnDisk,
- true = (Msg =/= undefined) or MsgOnDisk,
-
+ true = (Msg =/= undefined) or MsgInStore,
MsgStatus.
one_if(true ) -> 1;
@@ -959,21 +970,36 @@ msg_status(IsPersistent, IsDelivered, SeqId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = false,
+ msg_in_store = false,
index_on_disk = false,
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ msg_in_store = false};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ msg_in_store = true}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = true,
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) ->
+ case persist_to(MsgProps) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1035,26 +1061,36 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
- {Filtered, Delivers, Acks} =
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1} = Acc) ->
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1]};
- false -> case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {?QUEUE:in_r(m(beta_msg_status(M)),
- Filtered1),
- Delivers1, Acks1};
- true -> Acc
- end
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = MsgStatus#msg_status.msg =/= undefined,
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
end
- end, {?QUEUE:new(), [], []}, List),
- {Filtered, rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes,
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
@@ -1101,6 +1137,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
next_seq_id = NextSeqId,
ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1141,11 +1178,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes(
+ upd_ram_bytes_count(
1, MsgStatus,
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }))
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1193,6 +1229,9 @@ upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
unacked_bytes = UBytes + SignUnacked * S,
persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
+upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) ->
+ upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}).
+
upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
@@ -1206,7 +1245,7 @@ remove(AckRequired, MsgStatus = #msg_status {
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State = #vqstate {ram_msg_count = RamMsgCount,
out_counter = OutCount,
@@ -1224,10 +1263,11 @@ remove(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
+ IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ {false, false, true} -> Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1267,35 +1307,31 @@ purge_betas_and_deltas(Stats,
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, {RamBytes, PCount, PBytes},
+remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
IndexState, MSCState) ->
- {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+ {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytes1,
- PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
- PBytes1},
+ {{RamBytesDec1, PCountDec1, PBytesDec1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ is_delivered = IsDelivered, msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
- {case MsgOnDisk of
+ {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
+ {case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytes - Size * one_if(Msg =/= undefined),
- PBytes - Size * one_if(IsPersistent),
+ RamBytesDec + Size * one_if(Msg =/= undefined),
+ PCountDec + one_if(IsPersistent),
+ PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
@@ -1304,36 +1340,39 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, _MSCState) ->
+ msg_in_store = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
+ msg_props = MsgProps,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
- MsgStatus #msg_status { msg_on_disk = true };
+ case persist_to(MsgProps) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ MsgStatus#msg_status{msg_in_store = true};
+ queue_index -> MsgStatus
+ end;
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
- {MsgStatus #msg_status { index_on_disk = true },
+ case persist_to(MsgProps) of
+ msg_store -> MsgId;
+ queue_index -> prepare_to_store(Msg)
+ end, SeqId, MsgProps, IsPersistent, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.
@@ -1346,28 +1385,53 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
{MsgStatus2, State #vqstate { index_state = IndexState1 }}.
+persist_to(#message_properties{size = Size}) ->
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ %% This is >= so that you can set the env to 0 and never persist
+ %% to the index.
+ case Size >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
+
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg,
+ msg_props = MsgProps } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
ack_in_counter = AckInCount}) ->
- {RPA1, DPA1} =
- case Msg of
- undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
- _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {Msg, persist_to(MsgProps)} of
+ {undefined, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
ack_in_counter = AckInCount + 1}.
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> V;
- none -> gb_trees:get(SeqId, DPA)
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
end.
%% First parameter = UpdatePersistentCount
@@ -1377,27 +1441,38 @@ remove_pending_ack(true, SeqId, State) ->
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
{MsgStatus, upd_bytes(0, -1, MsgStatus,
State1 # vqstate{ persistent_count = PCount1 })};
-remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> DPA1 = gb_trees:delete(SeqId, DPA),
- {gb_trees:get(SeqId, DPA),
- State #vqstate { disk_pending_ack = DPA1 }}
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {gb_trees:get(SeqId, QPA),
+ State#vqstate{qi_pending_ack = QPA1}}
+ end
end.
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty() },
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1418,11 +1493,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgOnDisk of
+ case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
@@ -1469,6 +1544,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
gb_sets:union(MIOD, Confirmed) })
end).
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
%%----------------------------------------------------------------------------
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
@@ -1476,7 +1555,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
{MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
+ upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
%% [1] We increase the ram_bytes here because we paged the message in
@@ -1488,10 +1567,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
- true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
- _ -> {MsgStatus2, State1}
- end.
+ {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of
+ {true, false} -> upd_ram_bytes(-1, MsgStatus, State1);
+ {_, true} -> inc_ram_msg_count(State1);
+ _ -> State1
+ end}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1563,6 +1643,9 @@ ram_ack_iterator(State) ->
disk_ack_iterator(State) ->
{ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
@@ -1586,13 +1669,15 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ {List, IndexState1} =
+ rabbit_queue_index:read(SeqId, SeqId1, IndexState),
next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1755,8 +1840,11 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1764,11 +1852,14 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ {List, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
+ {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, QPA, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
@@ -1826,18 +1917,17 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ {MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(
- MsgStatus2, Qa,
- upd_ram_bytes(
- -1, MsgStatus2,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1})),
+ State2 = case MsgStatus2#msg_status.msg of
+ undefined -> upd_ram_bytes_count(
+ -1, MsgStatus2, State1);
+ _ -> State1
+ end,
+ State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
+ Qa, State3)
end
end.
@@ -1845,7 +1935,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
delta = Delta,
q3 = Q3,
index_state = IndexState }) ->
- PushState = {Quota, Delta, IndexState},
+ PushState = {Quota, Delta, IndexState, State},
{Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
@@ -1854,11 +1944,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
Q2, PushState1),
- {_, Delta1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1 }.
+ {_, Delta1, IndexState1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
@@ -1875,10 +1965,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end.
push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _IndexState} = PushState) ->
+ {0, _Delta, _IndexState, _State} = PushState) ->
{Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, IndexState} = PushState) ->
+ {Quota, Delta, IndexState, State} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1888,9 +1978,13 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
+ State1 = case MsgStatus#msg_status.msg of
+ undefined -> State;
+ _ -> upd_ram_bytes_count(-1, MsgStatus, State)
+ end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, IndexState1})
+ {Quota - 1, Delta1, IndexState1, State1})
end.
%%----------------------------------------------------------------------------
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index dcbec8f6..870dfdd9 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -1888,11 +1888,15 @@ test_backing_queue() ->
passed = test_msg_store(),
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit),
- passed = test_queue_index(),
- passed = test_queue_index_props(),
- passed = test_variable_queue(),
- passed = test_variable_queue_delete_msg_store_files_callback(),
- passed = test_queue_recover(),
+ [begin
+ application:set_env(
+ rabbit, queue_index_embed_msgs_below, Bytes),
+ passed = test_queue_index(),
+ passed = test_queue_index_props(),
+ passed = test_variable_queue(),
+ passed = test_variable_queue_delete_msg_store_files_callback(),
+ passed = test_queue_recover()
+ end || Bytes <- [0, 1024]],
application:set_env(rabbit, queue_index_max_journal_entries,
MaxJournal),
%% We will have restarted the message store, and thus changed
@@ -2219,7 +2223,7 @@ init_test_queue() ->
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
- fun nop/1),
+ fun nop/1, fun nop/1),
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
Res.
@@ -2422,7 +2426,7 @@ variable_queue_init(Q, Recover) ->
Q, case Recover of
true -> non_clean_shutdown;
false -> new
- end, fun nop/2, fun nop/2, fun nop/1).
+ end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
@@ -2462,7 +2466,10 @@ variable_queue_set_ram_duration_target(Duration, VQ) ->
rabbit_variable_queue:set_ram_duration_target(Duration, VQ)).
assert_prop(List, Prop, Value) ->
- Value = proplists:get_value(Prop, List).
+ case proplists:get_value(Prop, List)of
+ Value -> ok;
+ _ -> {exit, Prop, exp, Value, List}
+ end.
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
@@ -2485,12 +2492,18 @@ with_fresh_variable_queue(Fun) ->
{delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(
- shutdown, Fun(VQ)),
- Me ! Ref
+ try
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ catch
+ Type:Error ->
+ Me ! {Ref, Type, Error, erlang:get_stacktrace()}
+ end
end),
receive
- Ref -> ok
+ Ref -> ok;
+ {Ref, Type, Error, ST} -> exit({Type, Error, ST})
end,
passed.
@@ -2787,8 +2800,6 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
- io:format("~p:~n~p~n",
- [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2849,7 +2860,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
S = variable_queue_status(VQ1),
- io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.