summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-25 11:29:33 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-25 11:29:33 +0000
commit194eff02348bb64d1624ad9741fd7a30c631a359 (patch)
treede802622ff1f3980531db6a6d6a9ad94bc8125d7
parenta96cfec75431e3574a45ec2eb98ada0b08d01834 (diff)
downloadrabbitmq-server-194eff02348bb64d1624ad9741fd7a30c631a359.tar.gz
send rejects instead of confirms in case of non-normal queue deaths
-rw-r--r--src/rabbit_channel.erl19
1 files changed, 16 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 91559ea6..321f5ca6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, _Reason},
+handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
@@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}, State),
erase_queue_stats(QPid),
- noreply(
- queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
+ State1 = case Reason of
+ normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
+ _ -> send_rejects(MXs, State#ch{unconfirmed = UC1})
+ end,
+ noreply(queue_blocked(QPid, State1)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -1249,6 +1252,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+send_rejects(MXs, State = #ch{writer_pid = WriterPid}) ->
+ [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State),
+ send_reject(MsgSeqNo, WriterPid)
+ end || {MsgSeqNo, ExchangeName} <- MXs ],
+ State.
+
+send_reject(SeqNo, WriterPid) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.reject'{delivery_tag = SeqNo}).
+
send_confirms(State = #ch{confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),