summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl187
1 files changed, 97 insertions, 90 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index be6691e9..1b29756b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,9 +16,9 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/5, terminate/1, delete_and_terminate/1,
+ purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -27,7 +27,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/7]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -238,6 +238,9 @@
durable,
transient_threshold,
+ async_callback,
+ sync_callback,
+
len,
persistent_count,
@@ -252,6 +255,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -332,11 +336,14 @@
{any(), binary()}},
on_sync :: sync(),
durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+
+ async_callback :: async_callback(),
+ sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
@@ -347,6 +354,7 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
@@ -397,25 +405,26 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- Self = self(),
- init(QueueName, IsDurable, Recover,
+init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
+ init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(Self, MsgIds, ActionTaken)
+ msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
- fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun);
+ MsgOnDiskFun, AsyncCallback);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, true, true, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -425,9 +434,9 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
+ MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
- undefined),
+ undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -436,7 +445,7 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
terminate(State) ->
@@ -511,9 +520,9 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0 }) ->
+ State = #vqstate { async_callback = Callback, len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(MsgId));
+ true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
false -> ok
end,
{undefined, a(State)};
@@ -542,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
a(State1).
@@ -685,6 +697,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
tx_commit(Txn, Fun, MsgPropsFun,
State = #vqstate { durable = IsDurable,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
@@ -693,10 +707,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = msg_store_sync(
- MSCState, true, PersistentMsgIds,
- msg_store_callback(PersistentMsgIds, Pubs, AckTags1,
- Fun, MsgPropsFun)),
+ true -> MsgStoreCallback =
+ fun () -> msg_store_callback(
+ PersistentMsgIds, Pubs, AckTags1, Fun,
+ MsgPropsFun, AsyncCallback, SyncCallback)
+ end,
+ ok = msg_store_sync(MSCState, true, PersistentMsgIds,
+ fun () -> spawn(MsgStoreCallback) end),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -929,13 +946,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
+ MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -968,15 +985,9 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- Self = self(),
- fun () ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
- end)
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -1062,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1088,6 +1099,9 @@ init(IsDurable, IndexState, DeltaCount, Terms,
durable = IsDurable,
transient_threshold = NextSeqId,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
+
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1102,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1114,23 +1129,19 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
- Self = self(),
- F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
- end,
- fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> remove_persistent_messages(
- PersistentMsgIds)
- end, F)
- end)
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback) ->
+ case SyncCallback(fun (StateN) ->
+ tx_commit_post_msg_store(true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
end.
-remove_persistent_messages(MsgIds) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
+remove_persistent_messages(MsgIds, AsyncCallback) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
+ undefined, AsyncCallback),
ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
@@ -1417,12 +1428,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet) }.
+ unconfirmed = gb_sets:difference(UC, MsgIdSet),
+ confirmed = gb_sets:union (C, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1439,38 +1452,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(MsgIdSet, State) ->
- {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}.
-
-blind_confirm(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
-
-msgs_written_to_disk(QPid, MsgIdSet, removed) ->
- blind_confirm(QPid, MsgIdSet);
-msgs_written_to_disk(QPid, MsgIdSet, written) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(MOD, Written) })
- end).
-
-msg_indices_written_to_disk(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(MIOD, Written) })
- end).
+blind_confirm(Callback, MsgIdSet) ->
+ Callback(fun (State) -> record_confirms(MsgIdSet, State) end).
+
+msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+ blind_confirm(Callback, MsgIdSet);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes