From dd65549abcab4554f46ff1f41057b81c3fa00812 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 12 Feb 2013 22:34:33 +0000 Subject: drop expired messages post basic_get ...so messages with an expiry that are at the head of the queue after a basic.get do not get stuck there in the absence of other queue activity. Rather than simply adding a call to drop_expired_messages/1 after the call to fetch/1 in the basic_get code, we insert the call into fetch/1, which allows us to remove it from the other call site. Thus fetch/1 preserves the invariant we are after, namely that whenever a queue has a message at the head with an expiry, there is a timer set to drop said message. Note that the message count returned by basic.get does not reflect the dropping of expired messages after the fetched message. That's ok since we make no guarantee that messages are expired straight away. And note that on 'default' (rather than 'stable') the behaviour is actually different; due to various other changes there we will in fact return the reduced count. --- src/rabbit_amqqueue_process.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 49fcf070..e3885644 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -478,11 +478,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = + {{Message, IsDelivered, AckTag, _Remaining}, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State1}. confirm_messages([], State) -> State; @@ -579,7 +578,7 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + {Result, drop_expired_messages(State#q{backing_queue_state = BQS1})}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, -- cgit v1.2.1