summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl60
1 files changed, 16 insertions, 44 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 18cab48b..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -772,24 +768,18 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case TargetRamCount of
- infinity -> false;
- _ -> case
- reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -883,8 +873,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
Msg = #basic_message {id = MsgId}, MsgProps) ->
@@ -1011,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1037,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1338,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);