summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-01-14 10:55:43 +0000
committerRob Harrop <rob@rabbitmq.com>2011-01-14 10:55:43 +0000
commita6554f87ed0ee87274ec379cce537c16360bb7fd (patch)
treeb37c88715023bce9331a98a72511afbbe7709fcf
parent2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff)
parent321d3ecbe4b78087970b53f447b0ec693620b3c3 (diff)
downloadrabbitmq-server-bug23675.tar.gz
Merge with defaultbug23675
-rw-r--r--src/rabbit_channel.erl25
1 files changed, 17 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7b5f096b..a1bc56e7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -556,6 +556,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -741,14 +747,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1066,6 +1066,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.