summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-15 14:00:28 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-15 14:00:28 +0000
commit8a816635bd03a5b9b3c98f771332612c15ab81a3 (patch)
tree346c467e50081ac32c23654ede273d36acb202ef
parentf230802d1366c0fce8207f20c45b9b5aae1c4e6b (diff)
downloadrabbitmq-server-8a816635bd03a5b9b3c98f771332612c15ab81a3.tar.gz
Separate out can_cons_send.
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_limiter.erl30
2 files changed, 30 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 66b63386..fa70765d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -429,12 +429,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun,
- E = {ChPid,
- Consumer = #consumer{tag = CTag,
- ack_required = AckReq}},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true ->
@@ -442,21 +437,21 @@ deliver_msg_to_consumer(DeliverFun,
{false, State};
false ->
#cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
- case rabbit_limiter:can_cons_send(
- Limiter, ChPid, CTag, BQ:len(BQS)) of
- {false, Lim2} ->
- block_consumer(C#cr{limiter = Lim2,
- blocked_ctags = [CTag | BCTags]}, E),
+ #consumer{tag = CTag} = Consumer,
+ case rabbit_limiter:can_cons_send(Limiter, CTag) of
+ false ->
+ block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E),
{false, State};
- {true, Lim2} ->
- case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of
+ true ->
+ case rabbit_limiter:can_ch_send(
+ Limiter, self(), Consumer#consumer.ack_required) of
false ->
block_consumer(C#cr{is_limit_active = true}, E),
{false, State};
true ->
AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C#cr{limiter = Lim2},
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end
@@ -467,8 +462,12 @@ deliver_msg_to_consumer(DeliverFun,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
+ limiter = Limiter,
unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
+ State = #q{q = #amqqueue{name = QName},
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ rabbit_limiter:record_cons_send(Limiter, ChPid, ConsumerTag, BQ:len(BQS)),
{{Message, IsDelivered, AckTag}, Stop, State1} =
DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b5ec9f17..e32f072e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,7 +24,7 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_ch_send/3, can_cons_send/4,
+-export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4,
ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
-export([inform/4, forget_consumer/2, copy_queue_state/2]).
@@ -49,8 +49,7 @@
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(can_cons_send/4 :: (token(), pid(), rabbit_types:ctag(),
- non_neg_integer()) -> {boolean(), token()}).
+-spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -112,9 +111,15 @@ can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
can_ch_send(_, _, _) ->
true.
-can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
- {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState),
- {CanQ, Token#token{q_state = NewQState}}.
+can_cons_send(#token{q_state = Credits}, CTag) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{credit = C}} when C > 0 -> true;
+ {ok, #credit{}} -> false;
+ error -> true
+ end.
+
+record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
+ Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}.
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -164,15 +169,12 @@ copy_queue_state(#token{q_state = Credits}, Token) ->
%% we get the queue to hold a bit of state for us (#token.q_state), and
%% maintain a fiction that the limiter is making the decisions...
-can_send_q(CTag, Len, ChPid, Credits) ->
+record_send_q(CTag, Len, ChPid, Credits) ->
case dict:find(CTag, Credits) of
- {ok, #credit{credit = C} = Cred} ->
- if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits),
- {true, Credits2};
- true -> {false, Credits}
- end;
- _ ->
- {true, Credits}
+ {ok, #credit{credit = C} = Cred} when C > 0 ->
+ decr_credit(CTag, Len, ChPid, Cred, Credits);
+ error ->
+ Credits
end.
decr_credit(CTag, Len, ChPid, Cred, Credits) ->