summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-29 14:50:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-29 14:50:16 +0000
commitf3025ce7562d1f511baef3dbfb7231651b9e3509 (patch)
tree99b244a8b409dd4f52490243b34164d3fdb47dc9
parent3cd142a2d4027e64883681237de936b27fb0ecf5 (diff)
downloadrabbitmq-server-f3025ce7562d1f511baef3dbfb7231651b9e3509.tar.gz
inform_limiter -> credit.
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_limiter.erl11
4 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cb7e961d..b9d41c25 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,7 +28,7 @@
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3, inform_limiter/3]).
+-export([notify_down_all/2, limit_all/3, credit/6]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -145,7 +145,8 @@
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
--spec(inform_limiter/3 :: (rabbit_types:amqqueue(), pid(), any()) -> 'ok').
+-spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean(), boolean()) -> 'ok').
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
@@ -533,8 +534,8 @@ notify_down_all(QPids, ChPid) ->
limit_all(QPids, ChPid, Limiter) ->
delegate:cast(QPids, {limit, ChPid, Limiter}).
-inform_limiter(#amqqueue{pid = QPid}, ChPid, Msg) ->
- delegate:cast(QPid, {inform_limiter, ChPid, Msg}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate:call(QPid, {basic_get, ChPid, NoAck}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e249365..aaa4b537 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1329,12 +1329,13 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
-handle_cast({inform_limiter, ChPid, Msg},
+handle_cast({credit, ChPid, CTag, Credit, Drain, Reply},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
#cr{limiter = Lim,
blocked_ctags = BCTags} = ch_record(ChPid),
- {Unblock, Lim2} = rabbit_limiter:inform(Lim, ChPid, BQ:len(BQS), Msg),
+ {Unblock, Lim2} = rabbit_limiter:credit(
+ Lim, ChPid, CTag, Credit, Drain, Reply, BQ:len(BQS)),
noreply(possibly_unblock(
State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
limiter = Lim2} end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7f6dc3c8..f719d8f3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1118,9 +1118,8 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
drain = Drain}, _,
State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
- {ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
- Q, self(),
- {basic_credit, CTag, Credit, Drain, true}),
+ {ok, Q} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain, true),
{noreply, State};
error -> precondition_failed("unknown consumer tag '~s'", [CTag])
end;
@@ -1196,10 +1195,8 @@ maybe_set_initial_credit(Arguments, CTag, Q) ->
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
{{long, Credit}, {boolean, Drain}} ->
- ok = rabbit_amqqueue:inform_limiter(
- Q, self(),
- {basic_credit, CTag, Credit, Drain,
- false});
+ ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain, false);
_ ->
ok
end;
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index ae9c7918..d9019bfa 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -27,7 +27,7 @@
-export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4,
ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
--export([inform/4, forget_consumer/2, copy_queue_state/2]).
+-export([credit/7, forget_consumer/2, copy_queue_state/2]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
@@ -57,8 +57,9 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
--spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) ->
- {[rabbit_types:ctag()], token()}).
+-spec(credit/7 :: (token(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean(), boolean(), non_neg_integer())
+ -> {[rabbit_types:ctag()], token()}).
-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()).
-spec(copy_queue_state/2 :: (token(), token()) -> token()).
@@ -143,8 +144,8 @@ unblock(Limiter) ->
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
-inform(Limiter = #token{q_state = Credits},
- ChPid, Len, {basic_credit, CTag, Credit, Drain, Reply}) ->
+credit(Limiter = #token{q_state = Credits},
+ ChPid, CTag, Credit, Drain, Reply, Len) ->
{Unblock, Credits2} = update_credit(
CTag, Len, ChPid, Credit, Drain, Credits),
case Reply of