diff options
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c20cb16c..7e195d2f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; @@ -872,7 +872,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -912,9 +912,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. |