diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-13 21:25:44 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-13 21:25:44 +0000 |
commit | 65491f7748c0c79029fe4f601638d101d75b8d56 (patch) | |
tree | dd637043e7032140609db3aea44a7f1cf948a8f3 | |
parent | f994b25859adf71955c64e20a74e85c076977c26 (diff) | |
download | rabbitmq-server-bug25451.tar.gz |
drop_expired_msgs only when the queue head changesbug25451
And on recover. And when the timer goes off. That's all we need.
new call sites:
- in deliver_or_enqueue/3, when enqueuing a message (that we couldn't
deliver to a consumer straight away) with an expiry to the head.
the queue. NB: Previously we were always (re)setting a timer when
enqueuing a message with an expiry, which is wasteful when the new
message isn't at the head (i.e. the queue was non-empty) or when it
needs expiring immediately.
- requeue_and_run/2, since a message may get requeued to the
head. This call site arises due to removal of the
run_message_queue/1 call site (see below).
unchanged call sites:
- init_ttl/2 - this is the recovery case
- fetch/2, after fetching - this is the basic "queue head changes"
case
- handle_info/drop_expired - this is the message expiry timer
removed call sites:
- run_message_queue/1 - this internally calls fetch/2 (see above) but
also invoking drop_expired_msgs at the beginning. This now happens
at the call sites, where it is necessary. Which actually only is in
requeue_and_run, and not the others, none of which change the queue
content prior to calling run_message_queue/1
- possibly_unblock/3 - unblocking of consumers
- handle_call/basic_consumer - adding a consumer
- handle_call/basic_get, prior to the call to fetch/2.
- handle_call/stat
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4cd21c0a..4a0ccf81 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -519,13 +519,11 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + BQ:is_empty(BQS), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -559,15 +557,27 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3 = State2#q{backing_queue_state = BQS1}, + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that IFF the new message ends up at the head + %% of the queue (because the queue was empty) and has an + %% expiry. Only then may it need expiring straight away, + %% or, if expiry is not due yet, the expiry timer may need + %% (re)scheduling. + case {IsEmpty, Props#message_properties.expiry} of + {false, _} -> State3; + {true, undefined} -> State3; + {true, _} -> drop_expired_msgs(State3) + end end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1064,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> @@ -1137,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), + ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, |