diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 15:31:42 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 15:31:42 +0100 |
commit | f64f46070dd6e8af7e033c3af1b9c1a6b5bc2d73 (patch) | |
tree | 0c4f204a425cbd139e935204ce779596bddf0867 | |
parent | d5bf6e5bc74c1b483391ab70c2304e20446d0f99 (diff) | |
parent | d6fa107b407757c085fc7a97c2ad6529e4b23f90 (diff) | |
download | rabbitmq-server-bug24459.tar.gz |
merge default into bug24459bug24459
-rw-r--r-- | src/rabbit_channel.erl | 64 |
1 files changed, 34 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3c61447a..bcffe2af 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,7 +34,7 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_ack_q, + unacked_message_q, uncommitted_message_q, uncommitted_acks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -185,7 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new(), + uncommitted_acks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -669,15 +669,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, - tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of none -> ack(Acked, State1); - in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), - State1#ch{uncommitted_ack_q = NewTAQ} + in_progress -> State1#ch{uncommitted_acks = + Acked ++ State1#ch.uncommitted_acks} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -839,6 +838,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, + UAMQL = queue:to_list(UAMQ), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_misc:with_exit_handler( @@ -846,8 +846,8 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_amqqueue:requeue( QPid, MsgIds, self()) end) - end, ok, UAMQ), - ok = notify_limiter(Limiter, UAMQ), + end, ok, UAMQL), + ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1071,8 +1071,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_ack_q = TAQ}) -> - State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + uncommitted_acks = TAL}) -> + State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ))), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; @@ -1080,10 +1080,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = TAQ}) -> - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_acks = TAL}) -> + TAQ = queue:from_list(lists:reverse(TAL)), + {reply, #'tx.rollback_ok'{}, + new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( @@ -1282,18 +1283,18 @@ ack_record(DeliveryTag, ConsumerTag, {DeliveryTag, ConsumerTag, {QPid, MsgId}}. collect_acks(Q, 0, true) -> - {Q, queue:new()}; + {queue:to_list(Q), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). + collect_acks([], queue:new(), Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; + {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)}; Multiple -> - collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, + collect_acks([UnackedMsg | ToAcc], PrefixAcc, QTail, DeliveryTag, Multiple); true -> collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), @@ -1314,7 +1315,7 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new()}. + uncommitted_acks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1322,12 +1323,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) -> {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), State#ch{state = closing}}. -fold_per_queue(F, Acc0, UAQ) -> - T = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), UAQ), - rabbit_misc:gb_trees_fold(F, Acc0, T). +fold_per_queue(_F, Acc, []) -> + Acc; +fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId], Acc); +fold_per_queue(F, Acc, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAL), + rabbit_misc:gb_trees_fold(F, Acc, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1349,9 +1353,9 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; - true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of + true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end @@ -1493,8 +1497,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> - queue:len(TAQ); +i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> + length(TAL); i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> |