summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-01 15:31:42 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-01 15:31:42 +0100
commitf64f46070dd6e8af7e033c3af1b9c1a6b5bc2d73 (patch)
tree0c4f204a425cbd139e935204ce779596bddf0867
parentd5bf6e5bc74c1b483391ab70c2304e20446d0f99 (diff)
parentd6fa107b407757c085fc7a97c2ad6529e4b23f90 (diff)
downloadrabbitmq-server-bug24459.tar.gz
merge default into bug24459bug24459
-rw-r--r--src/rabbit_channel.erl64
1 files changed, 34 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3c61447a..bcffe2af 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,7 +34,7 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
+ unacked_message_q, uncommitted_message_q, uncommitted_acks,
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -185,7 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new(),
+ uncommitted_acks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -669,15 +669,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ,
- tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
none -> ack(Acked, State1);
- in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
- State1#ch{uncommitted_ack_q = NewTAQ}
+ in_progress -> State1#ch{uncommitted_acks =
+ Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -839,6 +838,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
OkFun = fun () -> ok end,
+ UAMQL = queue:to_list(UAMQ),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_misc:with_exit_handler(
@@ -846,8 +846,8 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_amqqueue:requeue(
QPid, MsgIds, self())
end)
- end, ok, UAMQ),
- ok = notify_limiter(Limiter, UAMQ),
+ end, ok, UAMQL),
+ ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1071,8 +1071,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_ack_q = TAQ}) ->
- State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ uncommitted_acks = TAL}) ->
+ State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
State, TMQ))),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
@@ -1080,10 +1080,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = TAQ}) ->
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_acks = TAL}) ->
+ TAQ = queue:from_list(lists:reverse(TAL)),
+ {reply, #'tx.rollback_ok'{},
+ new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
@@ -1282,18 +1283,18 @@ ack_record(DeliveryTag, ConsumerTag,
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
collect_acks(Q, 0, true) ->
- {Q, queue:new()};
+ {queue:to_list(Q), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
Multiple ->
- collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc,
+ collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
@@ -1314,7 +1315,7 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new()}.
+ uncommitted_acks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1322,12 +1323,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
State#ch{state = closing}}.
-fold_per_queue(F, Acc0, UAQ) ->
- T = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAQ),
- rabbit_misc:gb_trees_fold(F, Acc0, T).
+fold_per_queue(_F, Acc, []) ->
+ Acc;
+fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId], Acc);
+fold_per_queue(F, Acc, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAL),
+ rabbit_misc:gb_trees_fold(F, Acc, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1349,9 +1353,9 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
- true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
+ true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
@@ -1493,8 +1497,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
- queue:len(TAQ);
+i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
+ length(TAL);
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->