summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4cd21c0a..4a0ccf81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -519,13 +519,11 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ BQ:is_empty(BQS), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -559,15 +557,27 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1064,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1137,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,