summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 28 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6bf290de..0250902f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,30 +717,38 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- BQS1 = case DLXFun of
- undefined -> {undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- BQS2;
- _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
Msgs),
- BQS2
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
+ false -> After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,