From a0a63748403dc5ea0691ffcadedf404f71c624c3 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 2 Apr 2012 19:30:41 +0100 Subject: handle confirms and dlx When a dlx is configured we fall back to the normal go-via-the-backing-queue route since the dlx logic assumes that any dead-lettered message is present in the bq. Indeed the bq is the only place in which such messages are kept until they have been confirmed by the dead-letter destinations. --- src/rabbit_amqqueue_process.erl | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2de020bb..c58af698 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -544,19 +544,26 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case {Delivered, State2#q.ttl} of - {true, _} -> State2; - {false, 0} -> State3 = discard_delivery(Delivery, State2), - %% TODO: handle confirms and dlx - State3; - {false, _} -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + case Delivered of + true -> + maybe_record_confirm_message(Confirm, State1); + %% optimisation + false when State1#q.ttl == 0 andalso State1#q.dlx == undefined -> + case Confirm of + never -> ok; + _ -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]) + end, + discard_delivery(Delivery, State1); + false -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> -- cgit v1.2.1