summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-01-19 21:40:42 +0000
committerMatthias Radestock <matthias@lshift.net>2010-01-19 21:40:42 +0000
commit1b13d8c1784a6724d4a64e26eb53b95d3f40a40e (patch)
tree2b23768859ad9305d628bb69bb8c3b19d9f04101
parentc40b8f5e55a2a5cc24017413314f08277401fe56 (diff)
parentdbe59636978c95ff707e7c9b821465ed52beb222 (diff)
downloadrabbitmq-server-1b13d8c1784a6724d4a64e26eb53b95d3f40a40e.tar.gz
merge bug22223 into v1_7
-rw-r--r--src/rabbit_channel.erl42
-rw-r--r--src/rabbit_misc.erl9
2 files changed, 29 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c20cb16c..7e195d2f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State};
@@ -872,7 +872,7 @@ rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -912,9 +912,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce..9762619f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,7 +55,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, queue_fold/3]).
-import(mnesia).
-import(lists).
@@ -126,6 +126,7 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-endif.
@@ -489,3 +490,9 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.