diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 16:06:46 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 16:06:46 +0000 |
commit | 39ac6385e1e3b0d215349ec3dc3476eda9432c7d (patch) | |
tree | 75623f6cd0c78ec587b95c4c156ee4d69a11f226 | |
parent | 5727b84d107c4116169d3798855c015fe531630e (diff) | |
download | rabbitmq-server-39ac6385e1e3b0d215349ec3dc3476eda9432c7d.tar.gz |
rabbit_limiter:initial_credit/6.
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 22 |
4 files changed, 48 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b9d41c25..3673d06e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3, credit/6]). +-export([notify_down_all/2, limit_all/3, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -145,13 +145,14 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). --spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), - non_neg_integer(), boolean(), boolean()) -> 'ok'). +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: +-spec(basic_consume/8 :: (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), + {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -534,16 +535,16 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). -credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) -> - delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5c1b68f4..0594e250 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1096,14 +1096,24 @@ handle_call({basic_get, ChPid, NoAck}, _From, end; handle_call({basic_consume, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, + _From, State = #q{exclusive_consumer = Holder, + backing_queue = BQ, + backing_queue_state = BQS}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + Limiter2 = case CreditArgs of + none -> + Limiter; + {Credit, Drain} -> + rabbit_limiter:initial_credit( + Limiter, ChPid, ConsumerTag, Credit, Drain, + BQ:len(BQS)) + end, + C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1326,13 +1336,13 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); -handle_cast({credit, ChPid, CTag, Credit, Drain, Reply}, +handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> #cr{limiter = Lim, blocked_ctags = BCTags} = ch_record(ChPid), {Unblock, Lim2} = rabbit_limiter:credit( - Lim, ChPid, CTag, Credit, Drain, Reply, BQ:len(BQS)), + Lim, ChPid, CTag, Credit, Drain, BQ:len(BQS)), noreply(possibly_unblock( State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, limiter = Lim2} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f719d8f3..20976932 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -717,7 +717,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait, - arguments = Args}, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -738,10 +738,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - maybe_set_initial_credit(Args, ActualConsumerTag, Q), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1119,7 +1119,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:credit( - Q, self(), CTag, Credit, Drain, true), + Q, self(), CTag, Credit, Drain), {noreply, State}; error -> precondition_failed("unknown consumer tag '~s'", [CTag]) end; @@ -1190,17 +1190,14 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -maybe_set_initial_credit(Arguments, CTag, Q) -> +parse_credit_args(Arguments) -> case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {boolean, Drain}} -> - ok = rabbit_amqqueue:credit( - Q, self(), CTag, Credit, Drain, false); - _ -> - ok + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none end; - undefined -> ok + undefined -> none end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4796fc0e..865c4677 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,7 +24,7 @@ disable/1]). -export([limit/2, can_send/6, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([credit/7, forget_consumer/2, copy_queue_state/2]). +-export([initial_credit/6, credit/6, forget_consumer/2, copy_queue_state/2]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). @@ -55,8 +55,11 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). --spec(credit/7 :: (token(), pid(), rabbit_types:ctag(), - non_neg_integer(), boolean(), boolean(), non_neg_integer()) +-spec(initial_credit/6 :: (token(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), non_neg_integer()) + -> token()). +-spec(credit/6 :: (token(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), non_neg_integer()) -> {[rabbit_types:ctag()], token()}). -spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). -spec(copy_queue_state/2 :: (token(), token()) -> token()). @@ -147,14 +150,17 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +initial_credit(Limiter = #token{q_state = Credits}, + ChPid, CTag, Credit, Drain, Len) -> + {[], Credits2} = update_credit( + CTag, Len, ChPid, Credit, Drain, Credits), + Limiter#token{q_state = Credits2}. + credit(Limiter = #token{q_state = Credits}, - ChPid, CTag, Credit, Drain, Reply, Len) -> + ChPid, CTag, Credit, Drain, Len) -> {Unblock, Credits2} = update_credit( CTag, Len, ChPid, Credit, Drain, Credits), - case Reply of - true -> rabbit_channel:send_credit_reply(ChPid, Len); - false -> ok - end, + rabbit_channel:send_credit_reply(ChPid, Len), {Unblock, Limiter#token{q_state = Credits2}}. forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> |