diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-15 14:00:28 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-15 14:00:28 +0000 |
commit | 8a816635bd03a5b9b3c98f771332612c15ab81a3 (patch) | |
tree | 346c467e50081ac32c23654ede273d36acb202ef | |
parent | f230802d1366c0fce8207f20c45b9b5aae1c4e6b (diff) | |
download | rabbitmq-server-8a816635bd03a5b9b3c98f771332612c15ab81a3.tar.gz |
Separate out can_cons_send.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 30 |
2 files changed, 30 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 66b63386..fa70765d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -429,12 +429,7 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, - E = {ChPid, - Consumer = #consumer{tag = CTag, - ack_required = AckReq}}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> @@ -442,21 +437,21 @@ deliver_msg_to_consumer(DeliverFun, {false, State}; false -> #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, - case rabbit_limiter:can_cons_send( - Limiter, ChPid, CTag, BQ:len(BQS)) of - {false, Lim2} -> - block_consumer(C#cr{limiter = Lim2, - blocked_ctags = [CTag | BCTags]}, E), + #consumer{tag = CTag} = Consumer, + case rabbit_limiter:can_cons_send(Limiter, CTag) of + false -> + block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E), {false, State}; - {true, Lim2} -> - case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of + true -> + case rabbit_limiter:can_ch_send( + Limiter, self(), Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C#cr{limiter = Lim2}, + DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end @@ -467,8 +462,12 @@ deliver_msg_to_consumer(DeliverFun, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, + limiter = Limiter, unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}}) -> + State = #q{q = #amqqueue{name = QName}, + backing_queue = BQ, + backing_queue_state = BQS}) -> + rabbit_limiter:record_cons_send(Limiter, ChPid, ConsumerTag, BQ:len(BQS)), {{Message, IsDelivered, AckTag}, Stop, State1} = DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b5ec9f17..e32f072e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,7 +24,7 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_ch_send/3, can_cons_send/4, +-export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). -export([inform/4, forget_consumer/2, copy_queue_state/2]). @@ -49,8 +49,7 @@ -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). -spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(can_cons_send/4 :: (token(), pid(), rabbit_types:ctag(), - non_neg_integer()) -> {boolean(), token()}). +-spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -112,9 +111,15 @@ can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> can_ch_send(_, _, _) -> true. -can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> - {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), - {CanQ, Token#token{q_state = NewQState}}. +can_cons_send(#token{q_state = Credits}, CTag) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = C}} when C > 0 -> true; + {ok, #credit{}} -> false; + error -> true + end. + +record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> + Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}. %% Let the limiter know that the channel has received some acks from a %% consumer @@ -164,15 +169,12 @@ copy_queue_state(#token{q_state = Credits}, Token) -> %% we get the queue to hold a bit of state for us (#token.q_state), and %% maintain a fiction that the limiter is making the decisions... -can_send_q(CTag, Len, ChPid, Credits) -> +record_send_q(CTag, Len, ChPid, Credits) -> case dict:find(CTag, Credits) of - {ok, #credit{credit = C} = Cred} -> - if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits), - {true, Credits2}; - true -> {false, Credits} - end; - _ -> - {true, Credits} + {ok, #credit{credit = C} = Cred} when C > 0 -> + decr_credit(CTag, Len, ChPid, Cred, Credits); + error -> + Credits end. decr_credit(CTag, Len, ChPid, Cred, Credits) -> |