summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-03 12:20:54 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-03 12:20:54 +0100
commit8e771468c81ce806bc15b9309b7c61f3d411dcd2 (patch)
treef283aed2f77043fde29bfd4fe83b5d1b592a1733
parent5beede6bc8fdaec706f23db17e226b91f5180443 (diff)
downloadrabbitmq-server-bug24845.tar.gz
do not confirm 'immediate' messages that failed to be deliveredbug24845
in particular non-persistent messages were always being confirmed regardless
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0f46f51e..19e1736a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -487,21 +487,21 @@ should_confirm_message(#delivery{sender = SenderPid,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
{eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(_Delivery, _State) ->
- immediately.
+should_confirm_message(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo},
+ _State) ->
+ {immediately, SenderPid, MsgSeqNo}.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
-confirm_if(false, _Delivery) ->
- ok;
-confirm_if(true, #delivery{sender = SenderPid, msg_seq_no = MsgSeqNo}) ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]).
-
maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel =
gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ State;
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -513,11 +513,8 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = SenderPid,
- message = Message},
- Confirm,
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- confirm_if(Confirm == immediately, Delivery),
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -1198,7 +1195,8 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo}, Flow},
State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
case Flow of
@@ -1213,7 +1211,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
case already_been_here(Delivery, State) of
false -> noreply(deliver_or_enqueue(Delivery, State));
Qs -> log_cycle_once(Qs),
- confirm_if(true, Delivery),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
noreply(State)
end;