From 209677404df56137c57d33b6a566ab4555792ae4 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 21 Mar 2013 17:22:31 +0000 Subject: cosmetic(ish) --- src/rabbit_channel.erl | 3 +-- src/rabbit_limiter.erl | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 79a71b8d..39bd375a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -330,8 +330,7 @@ handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, CTagCredit}, - State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> [ok = rabbit_writer:send_command( WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, credit_drained = CreditDrained}) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index a187fd7b..602681e5 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -68,7 +68,7 @@ %% 4. The limiter process maintains an internal counter of 'messages %% sent but not yet acknowledged', called the 'volume'. %% -%% 5. Queues ask the limiter for permission (with can_send/2) whenever +%% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks %% whether a) the channel isn't blocked by channel.flow, and b) the %% volume has not yet reached the prefetch limit. If so it @@ -77,10 +77,10 @@ %% tells the queue not to proceed. %% %% 6. A queue that has told to proceed (by the return value of -%% can_send/2) sends the message to the channel. Conversely, a +%% can_send/3) sends the message to the channel. Conversely, a %% queue that has been told not to proceed, will not attempt to %% deliver that message, or any future messages, to the -%% channel. This is accomplished by can_send/2 capturing the +%% channel. This is accomplished by can_send/3 capturing the %% outcome in the local state, where it can be accessed with %% is_suspended/1. %% @@ -88,7 +88,7 @@ %% how many messages were ack'ed. The limiter process decrements %% the volume and if it falls below the prefetch_count then it %% notifies (through rabbit_amqqueue:resume/2) all the queues -%% requiring notification, i.e. all those that had a can_send/2 +%% requiring notification, i.e. all those that had a can_send/3 %% request denied. %% %% 8. Upon receipt of such a notification, queues resume delivery to @@ -227,10 +227,10 @@ activate(L = #qstate{state = dormant}) -> activate(L) -> L. can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, - AckReq, CTag) -> + AckRequired, CTag) -> case is_consumer_blocked(L, CTag) of - false -> case State =/= active orelse call_can_send( - Pid, self(), AckReq) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of true -> {continue, L#qstate{ credits = record_send_q(CTag, Credits)}}; false -> {suspend, L#qstate{state = suspended}} @@ -238,12 +238,10 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, true -> {suspend, L} end. -call_can_send(Pid, QPid, AckRequired) -> +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end). + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). resume(L) -> L#qstate{state = active}. -- cgit v1.2.1