diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-03 12:20:54 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-03 12:20:54 +0100 |
commit | 8e771468c81ce806bc15b9309b7c61f3d411dcd2 (patch) | |
tree | f283aed2f77043fde29bfd4fe83b5d1b592a1733 | |
parent | 5beede6bc8fdaec706f23db17e226b91f5180443 (diff) | |
download | rabbitmq-server-bug24845.tar.gz |
do not confirm 'immediate' messages that failed to be deliveredbug24845
in particular non-persistent messages were always being confirmed
regardless
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 |
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0f46f51e..19e1736a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -487,21 +487,21 @@ should_confirm_message(#delivery{sender = SenderPid, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(_Delivery, _State) -> - immediately. +should_confirm_message(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, + _State) -> + {immediately, SenderPid, MsgSeqNo}. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. -confirm_if(false, _Delivery) -> - ok; -confirm_if(true, #delivery{sender = SenderPid, msg_seq_no = MsgSeqNo}) -> - rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]). - maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + State; maybe_record_confirm_message(_Confirm, State) -> State. @@ -513,11 +513,8 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message}, - Confirm, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - confirm_if(Confirm == immediately, Delivery), case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( @@ -1198,7 +1195,8 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. case Flow of @@ -1213,7 +1211,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, case already_been_here(Delivery, State) of false -> noreply(deliver_or_enqueue(Delivery, State)); Qs -> log_cycle_once(Qs), - confirm_if(true, Delivery), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), noreply(State) end; |