From 36eee0ca2a10a05895e8f48cf22e5c3a2b6f2085 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 11 Jan 2011 17:06:07 +0000 Subject: Extract common logic for message rejection. Introduce basic.nack and rework basic.reject --- src/rabbit_channel.erl | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 930e48e6..1ad92318 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -568,6 +568,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, _ -> add_tx_participants(DeliveredQPids, State2) end}; +handle_method(#'basic.nack'{delivery_tag = DeliveryTag, + multiple = Multiple}, + _, State) -> + reject(DeliveryTag, true, Multiple, State); + handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, @@ -753,14 +758,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{unacked_message_q = UAMQ}) -> - {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}; + _, State) -> + reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1078,6 +1077,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}. + ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> {DeliveryTag, ConsumerTag, {QPid, MsgId}}. -- cgit v1.2.1 From 299c60ac89f6a1cdb8025101eb2ab7d8fdf0f1fd Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 12 Jan 2011 09:20:52 +0000 Subject: Honour the requeue flag in basic.nack --- src/rabbit_channel.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1ad92318..a56615a9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -569,9 +569,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end}; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, - multiple = Multiple}, + multiple = Multiple, + requeue = Requeue}, _, State) -> - reject(DeliveryTag, true, Multiple, State); + reject(DeliveryTag, Requeue, Multiple, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, -- cgit v1.2.1