summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-02 19:30:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-02 19:30:41 +0100
commita0a63748403dc5ea0691ffcadedf404f71c624c3 (patch)
tree97ed352df52466cc0e32bfb883828c00a6fb5ab9
parent9e86579e3accb67bd9e604dccc074cbc8043f10d (diff)
downloadrabbitmq-server-a0a63748403dc5ea0691ffcadedf404f71c624c3.tar.gz
handle confirms and dlx
When a dlx is configured we fall back to the normal go-via-the-backing-queue route since the dlx logic assumes that any dead-lettered message is present in the bq. Indeed the bq is the only place in which such messages are kept until they have been confirmed by the dead-letter destinations.
-rw-r--r--src/rabbit_amqqueue_process.erl31
1 files changed, 19 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2de020bb..c58af698 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -544,19 +544,26 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{Delivered, Confirm, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case {Delivered, State2#q.ttl} of
- {true, _} -> State2;
- {false, 0} -> State3 = discard_delivery(Delivery, State2),
- %% TODO: handle confirms and dlx
- State3;
- {false, _} -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ case Delivered of
+ true ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% optimisation
+ false when State1#q.ttl == 0 andalso State1#q.dlx == undefined ->
+ case Confirm of
+ never -> ok;
+ _ -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo])
+ end,
+ discard_delivery(Delivery, State1);
+ false ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->