summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-13 21:25:44 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-13 21:25:44 +0000
commit65491f7748c0c79029fe4f601638d101d75b8d56 (patch)
treedd637043e7032140609db3aea44a7f1cf948a8f3
parentf994b25859adf71955c64e20a74e85c076977c26 (diff)
downloadrabbitmq-server-bug25451.tar.gz
drop_expired_msgs only when the queue head changesbug25451
And on recover. And when the timer goes off. That's all we need. new call sites: - in deliver_or_enqueue/3, when enqueuing a message (that we couldn't deliver to a consumer straight away) with an expiry to the head. the queue. NB: Previously we were always (re)setting a timer when enqueuing a message with an expiry, which is wasteful when the new message isn't at the head (i.e. the queue was non-empty) or when it needs expiring immediately. - requeue_and_run/2, since a message may get requeued to the head. This call site arises due to removal of the run_message_queue/1 call site (see below). unchanged call sites: - init_ttl/2 - this is the recovery case - fetch/2, after fetching - this is the basic "queue head changes" case - handle_info/drop_expired - this is the message expiry timer removed call sites: - run_message_queue/1 - this internally calls fetch/2 (see above) but also invoking drop_expired_msgs at the beginning. This now happens at the call sites, where it is necessary. Which actually only is in requeue_and_run, and not the others, none of which change the queue content prior to calling run_message_queue/1 - possibly_unblock/3 - unblocking of consumers - handle_call/basic_consumer - adding a consumer - handle_call/basic_get, prior to the call to fetch/2. - handle_call/stat
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4cd21c0a..4a0ccf81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -519,13 +519,11 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ BQ:is_empty(BQS), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -559,15 +557,27 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1064,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1137,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,