diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-08 18:34:22 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-08 18:34:22 +0000 |
commit | 096ecfe44b1519f855cb435bbbb56c271e0ff8d4 (patch) | |
tree | 5187e252fd825d43f3ee87a60965c132155d313f /src/rabbit_limiter.erl | |
parent | c56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff) | |
download | rabbitmq-server-096ecfe44b1519f855cb435bbbb56c271e0ff8d4.tar.gz |
Add initial support for x-prefetch consumer arg (basically auto-credit). It's faster than basic.qos but not as fast as I would like. Also reject / nack is gratuitously broken.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4e1e299c..d60e3e28 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -126,8 +126,9 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, - forget_consumer/2]). + is_suspended/1, is_consumer_blocked/2, credit/5, + set_consumer_prefetch/3, ack_from_queue/3, + drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). @@ -170,6 +171,8 @@ -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). -spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), boolean()) -> qstate()). +-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) + -> qstate()). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -187,7 +190,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false}). +-record(credit, {credit = 0, drain = false, mode}). %%---------------------------------------------------------------------------- %% API @@ -281,6 +284,22 @@ credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. +set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + Credits1 = gb_trees:enter( + CTag, #credit{credit = Credit, mode = auto}, Credits), + Limiter#qstate{credits = Credits1}. + +ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + Limiter#qstate{ + credits = case gb_trees:lookup(CTag, Credits) of + {value, #credit{mode = auto, + credit = Current, + drain = Drain}} -> + update_credit(CTag, Current + Credit, Drain, Credits); + none -> + Credits + end}. + drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( |