diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 74717ace..a6b3829b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -725,14 +725,15 @@ drop_expired_messages(State = #q{dlx = DLX, Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, BQS1} = case DLX of - undefined -> {Next, undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> {Next, Msgs, BQS2} = - BQ:dropwhile(ExpirePred, true, BQS), + undefined -> BQ:dropwhile(ExpirePred, BQS); + _ -> {Next, Msgs, BQS2} = + BQ:fetchwhile(ExpirePred, + fun accumulate_msgs/4, + [], BQS), case Msgs of [] -> ok; - _ -> (dead_letter_fun(expired))(Msgs) + _ -> (dead_letter_fun(expired))( + lists:reverse(Msgs)) end, {Next, BQS2} end, @@ -741,6 +742,8 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). +accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. + ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> |