diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-25 11:29:33 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-25 11:29:33 +0000 |
commit | 194eff02348bb64d1624ad9741fd7a30c631a359 (patch) | |
tree | de802622ff1f3980531db6a6d6a9ad94bc8125d7 | |
parent | a96cfec75431e3574a45ec2eb98ada0b08d01834 (diff) | |
download | rabbitmq-server-194eff02348bb64d1624ad9741fd7a30c631a359.tar.gz |
send rejects instead of confirms in case of non-normal queue deaths
-rw-r--r-- | src/rabbit_channel.erl | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 91559ea6..321f5ca6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, _Reason}, +handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to @@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}, State), erase_queue_stats(QPid), - noreply( - queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). + State1 = case Reason of + normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); + _ -> send_rejects(MXs, State#ch{unconfirmed = UC1}) + end, + noreply(queue_blocked(QPid, State1)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -1249,6 +1252,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_rejects(MXs, State = #ch{writer_pid = WriterPid}) -> + [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State), + send_reject(MsgSeqNo, WriterPid) + end || {MsgSeqNo, ExchangeName} <- MXs ], + State. + +send_reject(SeqNo, WriterPid) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.reject'{delivery_tag = SeqNo}). + send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), |