summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl780
1 files changed, 439 insertions, 341 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index fef44037..ac8fcfee 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,9 +32,9 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
- tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
+ tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -247,7 +247,8 @@
is_persistent,
is_delivered,
msg_on_disk,
- index_on_disk
+ index_on_disk,
+ msg_props
}).
-record(delta,
@@ -293,7 +294,8 @@
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
acks_all :: [[seq_id()]],
- pubs :: [[rabbit_guid:guid()]],
+ pubs :: [{message_properties_transformer(),
+ [rabbit_types:basic_message()]}],
funs :: [fun (() -> any())] }).
-type(state() :: #vqstate {
@@ -366,16 +368,17 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- {DeltaCount, Terms, IndexState} =
- rabbit_queue_index:init(
- QueueName, Recover,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (Guid) ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end),
- {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+init(QueueName, IsDurable, false) ->
+ IndexState = rabbit_queue_index:init(QueueName),
+ init(IsDurable, IndexState, 0, [],
+ case IsDurable of
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE);
+ false -> undefined
+ end,
+ msg_store_client_init(?TRANSIENT_MSG_STORE));
+init(QueueName, true, true) ->
+ Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
[] -> {proplists:get_value(persistent_ref, Terms),
@@ -383,63 +386,32 @@ init(QueueName, IsDurable, Recover) ->
Terms};
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
- DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount),
- Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
- true -> ?BLANK_DELTA;
- false -> #delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
- end,
- Now = now(),
- PersistentClient =
- case IsDurable of
- true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
- false -> undefined
- end,
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
- State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- index_state = IndexState1,
- msg_store_clients = {{PersistentClient, PRef},
- {TransientClient, TRef}},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_msg_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- rates = #rates { egress = {Now, 0},
- ingress = {Now, DeltaCount1},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Now } },
- a(maybe_deltas_to_betas(State)).
+ PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
+ PRef),
+ TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
+ TRef),
+ {DeltaCount, IndexState} =
+ rabbit_queue_index:recover(
+ QueueName, Terms1,
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ fun (Guid) ->
+ rabbit_msg_store:contains(Guid, PersistentClient)
+ end),
+ init(true, IndexState, DeltaCount, Terms1,
+ PersistentClient, TransientClient).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef},
- {MSCStateT, TRef}} } =
+ msg_store_clients = {MSCStateP, MSCStateT} } =
remove_pending_ack(true, tx_commit_index(State)),
- case MSCStateP of
- undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE)
- end,
- rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
+ PRef = case MSCStateP of
+ undefined -> undefined;
+ _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_ref(MSCStateP)
+ end,
+ ok = rabbit_msg_store:client_terminate(MSCStateT),
+ TRef = rabbit_msg_store:client_ref(MSCStateT),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -455,191 +427,247 @@ delete_and_terminate(State) ->
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef},
- {MSCStateT, TRef}} } =
+ msg_store_clients = {MSCStateP, MSCStateT} } =
remove_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
+ _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
end,
- rabbit_msg_store:client_delete_and_terminate(
- MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
+ rabbit_msg_store:client_delete_and_terminate(MSCStateT),
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
-purge(State = #vqstate { q4 = Q4,
- index_state = IndexState,
- len = Len,
- persistent_count = PCount }) ->
+purge(State = #vqstate { q4 = Q4,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
fun rabbit_misc:queue_fold/3, Q4,
- orddict:new(), IndexState),
- {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} =
+ orddict:new(), IndexState, MSCState),
+ {LensByStore1, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
fun rabbit_misc:queue_fold/3, Q1,
- LensByStore1, IndexState2),
+ LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
- index_state = IndexState3,
- len = 0,
- ram_msg_count = 0,
- ram_index_count = 0,
- persistent_count = PCount1 })}.
-
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+ {Len, a(State1 #vqstate { q1 = queue:new(),
+ index_state = IndexState3,
+ len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
+ persistent_count = PCount1 })}.
+
+publish(Msg, MsgProps, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
- State = #vqstate { len = 0,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- pending_ack = PA,
- durable = IsDurable }) ->
+ MsgProps,
+ State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ pending_ack = PA,
+ durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
-
-fetch(AckRequired, State = #vqstate { q4 = Q4,
- ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
+dropwhile(Pred, State) ->
+ {_OkOrEmpty, State1} = dropwhile1(Pred, State),
+ State1.
+
+dropwhile1(Pred, State) ->
+ internal_queue_out(
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
+ case Pred(MsgProps) of
+ true ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile1(Pred, State2);
+ false ->
+ %% message needs to go back into Q4 (or maybe go
+ %% in for the first time if it was loaded from
+ %% Q3). Also the msg contents might not be in
+ %% RAM, so read them in now
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ read_msg(MsgStatus, State1),
+ {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }}
+ end
+ end, State).
+
+fetch(AckRequired, State) ->
+ internal_queue_out(
+ fun(MsgStatus, State1) ->
+ %% it's possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ internal_fetch(AckRequired, MsgStatus1, State2)
+ end, State).
+
+internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- case fetch_from_q3_to_q4(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, State1} -> fetch(AckRequired, State1)
+ case fetch_from_q3(State) of
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1)
end;
- {{value, MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
- Q4a} ->
-
- %% 1. Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% 2. Remove from msg_store and queue index, if necessary
- MsgStore = find_msg_store(IsPersistent),
- Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
- end,
-
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
- end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
- {{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { q4 = Q4a,
- ram_msg_count = RamMsgCount - 1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}
+ {{value, MsgStatus}, Q4a} ->
+ Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ guid = Guid,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
+
+internal_fetch(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ guid = Guid,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount,
+ pending_ack = PA }) ->
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ Rem = fun () ->
+ ok = msg_store_remove(MSCState, IsPersistent, [Guid])
+ end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, PA1} = case AckRequired of
+ true -> PA2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, PA),
+ {SeqId, PA2};
+ false -> {blank_ack, PA}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+
+ {{Msg, IsDelivered, AckTag, Len1},
+ a(State #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
ack(AckTags, State) ->
- a(ack(fun rabbit_msg_store:remove/2,
+ a(ack(fun msg_store_remove/3,
fun (_AckEntry, State1) -> State1 end,
AckTags, State)).
-tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- State #vqstate { msg_store_clients = MSCState1 };
- false -> State
- end).
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
+ case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
+ #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState);
+ false -> ok
+ end,
+ a(State).
tx_ack(Txn, AckTags, State) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
State.
-tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
+tx_rollback(Txn, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE,
- persistent_guids(Pubs));
+ true -> msg_store_remove(MSCState, true, persistent_guids(Pubs));
false -> ok
end,
{lists:append(AckTags), a(State)}.
-tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+tx_commit(Txn, Fun, MsgPropsFun,
+ State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:reverse(Pubs),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(PubsOrdered),
+ PersistentGuids = persistent_guids(Pubs),
HasPersistentPubs = PersistentGuids =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids,
- PubsOrdered, AckTags1, Fun)),
+ true -> ok = msg_store_sync(
+ MSCState, true, PersistentGuids,
+ msg_store_callback(PersistentGuids, Pubs, AckTags1,
+ Fun, MsgPropsFun)),
State;
- false -> tx_commit_post_msg_store(
- HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
+ false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
+ Fun, MsgPropsFun, State)
end)}.
-requeue(AckTags, State) ->
+requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
- ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
State2;
- ({IsPersistent, Guid}, State1) ->
+ ({IsPersistent, Guid, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -783,27 +811,54 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+ MsgProps) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
- msg_on_disk = false, index_on_disk = false }.
+ msg_on_disk = false, index_on_disk = false,
+ msg_props = MsgProps }.
+
+with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(MSCStateP),
+ {Result, {MSCStateP1, MSCStateT}};
+with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(MSCStateT),
+ {Result, {MSCStateP, MSCStateT1}}.
+
+with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
+ {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
+ fun (MSCState1) ->
+ {Fun(MSCState1), MSCState1}
+ end),
+ Res.
+
+msg_store_client_init(MsgStore) ->
+ rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()).
+
+msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
-find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
-find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
+msg_store_read(MSCState, IsPersistent, Guid) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end).
-with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
- {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
- {Result, {{MSCStateP1, PRef}, MSCStateT}};
-with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
- {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
- {Result, {MSCStateP, {MSCStateT1, TRef}}}.
+msg_store_remove(MSCState, IsPersistent, Guids) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end).
-read_from_msg_store(MSCState, IsPersistent, Guid) ->
- with_msg_store_state(
+msg_store_release(MSCState, IsPersistent, Guids) ->
+ with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MsgStore, MSCState1) ->
- rabbit_msg_store:read(MsgStore, Guid, MSCState1)
- end).
+ fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end).
+
+msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
@@ -821,12 +876,13 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
+ [Guid || {#basic_message { guid = Guid,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, IsPersistent, IsDelivered},
+ fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -838,7 +894,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = true,
- index_on_disk = true
+ index_on_disk = true,
+ msg_props = MsgProps
}) | Filtered1],
Delivers1,
Acks1}
@@ -885,22 +942,69 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
+init(IsDurable, IndexState, DeltaCount, Terms,
+ PersistentClient, TransientClient) ->
+ {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+
+ DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> #delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
+ end,
+ Now = now(),
+ State = #vqstate {
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_msg_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = #rates { egress = {Now, 0},
+ ingress = {Now, DeltaCount1},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Now } },
+ a(maybe_deltas_to_betas(State)).
+
+msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags, Fun, StateN)
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> rabbit_msg_store:remove(
- ?PERSISTENT_MSG_STORE,
+ fun () -> remove_persistent_messages(
PersistentGuids)
end, F)
end)
end.
-tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
+remove_persistent_messages(Guids) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE),
+ ok = rabbit_msg_store:remove(Guids, PersistentClient),
+ rabbit_msg_store:client_delete_and_terminate(PersistentClient).
+
+tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
State = #vqstate {
on_sync = OnSync = #sync {
acks_persistent = SPAcks,
@@ -913,23 +1017,27 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
case IsDurable of
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
- #msg_status {} -> false;
- {IsPersistent, _Guid} -> IsPersistent
+ #msg_status {} ->
+ false;
+ {IsPersistent, _Guid, _MsgProps} ->
+ IsPersistent
end];
false -> []
end,
case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
- true -> State #vqstate { on_sync = #sync {
- acks_persistent = [PersistentAcks | SPAcks],
- acks_all = [AckTags | SAcks],
- pubs = [Pubs | SPubs],
- funs = [Fun | SFuns] }};
+ true -> State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [PersistentAcks | SPAcks],
+ acks_all = [AckTags | SAcks],
+ pubs = [{MsgPropsFun, Pubs} | SPubs],
+ funs = [Fun | SFuns] }};
false -> State1 = tx_commit_index(
- State #vqstate { on_sync = #sync {
- acks_persistent = [],
- acks_all = [AckTags],
- pubs = [Pubs],
- funs = [Fun] } }),
+ State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [],
+ acks_all = [AckTags],
+ pubs = [{MsgPropsFun, Pubs}],
+ funs = [Fun] } }),
State1 #vqstate { on_sync = OnSync }
end.
@@ -943,13 +1051,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- Pubs = lists:append(lists:reverse(SPubs)),
+ Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
+ {Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProps},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} =
+ publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -958,13 +1069,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
- State = #vqstate { q3 = Q3,
- index_state = IndexState }) ->
+ State = #vqstate { q3 = Q3,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
case bpqueue:is_empty(Q3) of
true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} = remove_queue_entries(
- fun beta_fold/3, Q3,
- LensByStore, IndexState),
+ false -> {LensByStore1, IndexState1} =
+ remove_queue_entries(fun beta_fold/3, Q3,
+ LensByStore, IndexState, MSCState),
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
@@ -972,11 +1084,11 @@ purge_betas_and_deltas(LensByStore,
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState) ->
+remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
{GuidsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
- ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
+ ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
+ msg_store_remove(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
{sum_guids_by_store_to_len(LensByStore, GuidsByStore),
rabbit_queue_index:ack(Acks,
@@ -988,8 +1100,7 @@ remove_queue_entries1(
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
{GuidsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid,
- GuidsByStore);
+ true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore);
false -> GuidsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -997,8 +1108,8 @@ remove_queue_entries1(
sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
orddict:fold(
- fun (MsgStore, Guids, LensByStore1) ->
- orddict:update_counter(MsgStore, length(Guids), LensByStore1)
+ fun (IsPersistent, Guids, LensByStore1) ->
+ orddict:update_counter(IsPersistent, length(Guids), LensByStore1)
end, LensByStore, GuidsByStore).
%%----------------------------------------------------------------------------
@@ -1006,7 +1117,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+ MsgProps, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1015,8 +1126,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
+ #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1030,38 +1141,35 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount + 1}}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, MSCState) ->
- {MsgStatus, MSCState};
+ msg_on_disk = true }, _MSCState) ->
+ MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, guid = Guid,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- {ok, MSCState1} =
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MsgStore, MSCState2) ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2)
- end),
- {MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
- {MsgStatus, MSCState}.
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ MsgStatus #msg_status { msg_on_disk = true };
+maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
+ MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid, seq_id = SeqId,
+ guid = Guid,
+ seq_id = SeqId,
is_persistent = IsPersistent,
- is_delivered = IsDelivered }, IndexState)
+ is_delivered = IsDelivered,
+ msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
- IndexState),
+ IndexState1 = rabbit_queue_index:publish(
+ Guid, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1070,43 +1178,44 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
- ForceMsg, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} = maybe_write_index_to_disk(
- ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }}.
+ MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
+ {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+record_pending_ack(#msg_status { seq_id = SeqId,
+ guid = Guid,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk,
+ msg_props = MsgProps } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
+ true -> {IsPersistent, Guid, MsgProps};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState }) ->
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
State1 = State #vqstate { pending_ack = dict:new() },
case KeepPersistent of
- true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ true -> case orddict:find(false, GuidsByStore) of
error -> State1;
- {ok, Guids} -> ok = rabbit_msg_store:remove(
- ?TRANSIENT_MSG_STORE, Guids),
+ {ok, Guids} -> ok = msg_store_remove(MSCState, false,
+ Guids),
State1
end;
false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(
- fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
+ fun (IsPersistent, Guids, ok) ->
+ msg_store_remove(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
State1 #vqstate { index_state = IndexState1 }
end.
@@ -1114,8 +1223,10 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
+ {{SeqIds, GuidsByStore},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount }} =
lists:foldl(
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
@@ -1124,8 +1235,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA) })}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- MsgStoreFun(MsgStore, Guids)
+ ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
+ MsgStoreFun(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
@@ -1136,12 +1247,12 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
find_persistent_count(LensByStore) ->
- case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of
+ case orddict:find(true, LensByStore) of
error -> 0;
{ok, Len} -> Len
end.
@@ -1238,40 +1349,33 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3_to_q4(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- msg_store_clients = MSCState }) ->
+fetch_from_q3(State = #vqstate {
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4,
+ ram_index_count = RamIndexCount}) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, IndexOnDisk, MsgStatus = #msg_status {
- msg = undefined, guid = Guid,
- is_persistent = IsPersistent }}, Q3a} ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4),
+ {{value, IndexOnDisk, MsgStatus}, Q3a} ->
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- q4 = Q4a,
- ram_msg_count = RamMsgCount + 1,
- ram_index_count = RamIndexCount1,
- msg_store_clients = MSCState1 },
+ State1 = State #vqstate { q3 = Q3a,
+ ram_index_count = RamIndexCount1 },
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and q1
- %% can now be joined onto q4
+ %% still empty. So q2 must be empty, and we
+ %% know q4 is empty otherwise we wouldn't be
+ %% loading from q3. As such, we can just set
+ %% q4 to Q1.
true = bpqueue:is_empty(Q2), %% ASSERTION
+ true = queue:is_empty(Q4), %% ASSERTION
State1 #vqstate { q1 = queue:new(),
- q4 = queue:join(Q4a, Q1) };
+ q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
{false, _} ->
@@ -1280,7 +1384,7 @@ fetch_from_q3_to_q4(State = #vqstate {
%% delta and q3 are maintained
State1
end,
- {loaded, State2}
+ {loaded, {MsgStatus, State2}}
end.
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
@@ -1290,46 +1394,40 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- target_ram_msg_count = TargetRamMsgCount,
transient_threshold = TransientThreshold }) ->
- case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of
- false ->
- State;
- true ->
- #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd } = Delta,
- DeltaSeqId1 =
- lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(
- List, TransientThreshold, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
- case bpqueue:len(Q3a) of
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaSeqId1 =
+ lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
+ {Q3a, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2 },
+ case bpqueue:len(Q3a) of
+ 0 ->
+ %% we ignored every message in the segment due to it being
+ %% transient and below the threshold
+ maybe_deltas_to_betas(
+ State1 #vqstate {
+ delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
+ Q3aLen ->
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - Q3aLen of
0 ->
- %% we ignored every message in the segment due to
- %% it being transient and below the threshold
- maybe_deltas_to_betas(
- State #vqstate {
- delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
- Q3aLen ->
- Q3b = bpqueue:join(Q3, Q3a),
- case DeltaCount - Q3aLen of
- 0 ->
- %% delta is now empty, but it wasn't
- %% before, so can now join q2 onto q3
- State1 #vqstate { q2 = bpqueue:new(),
- delta = ?BLANK_DELTA,
- q3 = bpqueue:join(Q3b, Q2) };
- N when N > 0 ->
- Delta1 = #delta { start_seq_id = DeltaSeqId1,
- count = N,
- end_seq_id = DeltaSeqIdEnd },
- State1 #vqstate { delta = Delta1,
- q3 = Q3b }
- end
+ %% delta is now empty, but it wasn't before, so
+ %% can now join q2 onto q3
+ State1 #vqstate { q2 = bpqueue:new(),
+ delta = ?BLANK_DELTA,
+ q3 = bpqueue:join(Q3b, Q2) };
+ N when N > 0 ->
+ Delta1 = #delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd },
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3b }
end
end.