summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-08 18:34:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-08 18:34:22 +0000
commit096ecfe44b1519f855cb435bbbb56c271e0ff8d4 (patch)
tree5187e252fd825d43f3ee87a60965c132155d313f /src/rabbit_limiter.erl
parentc56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff)
downloadrabbitmq-server-096ecfe44b1519f855cb435bbbb56c271e0ff8d4.tar.gz
Add initial support for x-prefetch consumer arg (basically auto-credit). It's faster than basic.qos but not as fast as I would like. Also reject / nack is gratuitously broken.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl25
1 files changed, 22 insertions, 3 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4e1e299c..d60e3e28 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -126,8 +126,9 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
- forget_consumer/2]).
+ is_suspended/1, is_consumer_blocked/2, credit/5,
+ set_consumer_prefetch/3, ack_from_queue/3,
+ drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/4]).
@@ -170,6 +171,8 @@
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
boolean()) -> qstate()).
+-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
+ -> qstate()).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -187,7 +190,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false}).
+-record(credit, {credit = 0, drain = false, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -281,6 +284,22 @@ credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ Credits1 = gb_trees:enter(
+ CTag, #credit{credit = Credit, mode = auto}, Credits),
+ Limiter#qstate{credits = Credits1}.
+
+ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ Limiter#qstate{
+ credits = case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{mode = auto,
+ credit = Current,
+ drain = Drain}} ->
+ update_credit(CTag, Current + Credit, Drain, Credits);
+ none ->
+ Credits
+ end}.
+
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(