diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 14:50:16 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 14:50:16 +0000 |
commit | f3025ce7562d1f511baef3dbfb7231651b9e3509 (patch) | |
tree | 99b244a8b409dd4f52490243b34164d3fdb47dc9 | |
parent | 3cd142a2d4027e64883681237de936b27fb0ecf5 (diff) | |
download | rabbitmq-server-f3025ce7562d1f511baef3dbfb7231651b9e3509.tar.gz |
inform_limiter -> credit.
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 11 |
4 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cb7e961d..b9d41c25 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,7 +28,7 @@ -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3, inform_limiter/3]). +-export([notify_down_all/2, limit_all/3, credit/6]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -145,7 +145,8 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). --spec(inform_limiter/3 :: (rabbit_types:amqqueue(), pid(), any()) -> 'ok'). +-spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), boolean()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: @@ -533,8 +534,8 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). -inform_limiter(#amqqueue{pid = QPid}, ChPid, Msg) -> - delegate:cast(QPid, {inform_limiter, ChPid, Msg}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e249365..aaa4b537 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1329,12 +1329,13 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); -handle_cast({inform_limiter, ChPid, Msg}, +handle_cast({credit, ChPid, CTag, Credit, Drain, Reply}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> #cr{limiter = Lim, blocked_ctags = BCTags} = ch_record(ChPid), - {Unblock, Lim2} = rabbit_limiter:inform(Lim, ChPid, BQ:len(BQS), Msg), + {Unblock, Lim2} = rabbit_limiter:credit( + Lim, ChPid, CTag, Credit, Drain, Reply, BQ:len(BQS)), noreply(possibly_unblock( State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, limiter = Lim2} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7f6dc3c8..f719d8f3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1118,9 +1118,8 @@ handle_method(#'basic.credit'{consumer_tag = CTag, drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of - {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( - Q, self(), - {basic_credit, CTag, Credit, Drain, true}), + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain, true), {noreply, State}; error -> precondition_failed("unknown consumer tag '~s'", [CTag]) end; @@ -1196,10 +1195,8 @@ maybe_set_initial_credit(Arguments, CTag, Q) -> {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of {{long, Credit}, {boolean, Drain}} -> - ok = rabbit_amqqueue:inform_limiter( - Q, self(), - {basic_credit, CTag, Credit, Drain, - false}); + ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain, false); _ -> ok end; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index ae9c7918..d9019bfa 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -27,7 +27,7 @@ -export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([inform/4, forget_consumer/2, copy_queue_state/2]). +-export([credit/7, forget_consumer/2, copy_queue_state/2]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). @@ -57,8 +57,9 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). --spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> - {[rabbit_types:ctag()], token()}). +-spec(credit/7 :: (token(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), boolean(), non_neg_integer()) + -> {[rabbit_types:ctag()], token()}). -spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). -spec(copy_queue_state/2 :: (token(), token()) -> token()). @@ -143,8 +144,8 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). -inform(Limiter = #token{q_state = Credits}, - ChPid, Len, {basic_credit, CTag, Credit, Drain, Reply}) -> +credit(Limiter = #token{q_state = Credits}, + ChPid, CTag, Credit, Drain, Reply, Len) -> {Unblock, Credits2} = update_credit( CTag, Len, ChPid, Credit, Drain, Credits), case Reply of |