summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl42
1 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c20cb16c..7e195d2f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State};
@@ -872,7 +872,7 @@ rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -912,9 +912,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.