summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-29 16:06:46 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-29 16:06:46 +0000
commit39ac6385e1e3b0d215349ec3dc3476eda9432c7d (patch)
tree75623f6cd0c78ec587b95c4c156ee4d69a11f226
parent5727b84d107c4116169d3798855c015fe531630e (diff)
downloadrabbitmq-server-39ac6385e1e3b0d215349ec3dc3476eda9432c7d.tar.gz
rabbit_limiter:initial_credit/6.
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_limiter.erl22
4 files changed, 48 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b9d41c25..3673d06e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3, credit/6]).
+-export([notify_down_all/2, limit_all/3, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -145,13 +145,14 @@
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
--spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
- non_neg_integer(), boolean(), boolean()) -> 'ok').
+-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> 'ok').
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/7 ::
+-spec(basic_consume/8 ::
(rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
+ rabbit_limiter:token(), rabbit_types:ctag(), boolean(),
+ {non_neg_integer(), boolean()} | 'none', any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -534,16 +535,16 @@ notify_down_all(QPids, ChPid) ->
limit_all(QPids, ChPid, Limiter) ->
delegate:cast(QPids, {limit, ChPid, Limiter}).
-credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) ->
- delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate:call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:call(QPid, {basic_consume, NoAck, ChPid,
- Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5c1b68f4..0594e250 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1096,14 +1096,24 @@ handle_call({basic_get, ChPid, NoAck}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = Holder}) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ _From, State = #q{exclusive_consumer = Holder,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ Limiter2 = case CreditArgs of
+ none ->
+ Limiter;
+ {Credit, Drain} ->
+ rabbit_limiter:initial_credit(
+ Limiter, ChPid, ConsumerTag, Credit, Drain,
+ BQ:len(BQS))
+ end,
+ C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1326,13 +1336,13 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
-handle_cast({credit, ChPid, CTag, Credit, Drain, Reply},
+handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
#cr{limiter = Lim,
blocked_ctags = BCTags} = ch_record(ChPid),
{Unblock, Lim2} = rabbit_limiter:credit(
- Lim, ChPid, CTag, Credit, Drain, Reply, BQ:len(BQS)),
+ Lim, ChPid, CTag, Credit, Drain, BQ:len(BQS)),
noreply(possibly_unblock(
State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
limiter = Lim2} end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f719d8f3..20976932 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -717,7 +717,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Args},
+ arguments = Arguments},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -738,10 +738,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- maybe_set_initial_credit(Args, ActualConsumerTag, Q),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
+ parse_credit_args(Arguments),
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1119,7 +1119,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain, true),
+ Q, self(), CTag, Credit, Drain),
{noreply, State};
error -> precondition_failed("unknown consumer tag '~s'", [CTag])
end;
@@ -1190,17 +1190,14 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-maybe_set_initial_credit(Arguments, CTag, Q) ->
+parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} ->
- ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain, false);
- _ ->
- ok
+ {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
+ _ -> none
end;
- undefined -> ok
+ undefined -> none
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4796fc0e..865c4677 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,7 +24,7 @@
disable/1]).
-export([limit/2, can_send/6, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
--export([credit/7, forget_consumer/2, copy_queue_state/2]).
+-export([initial_credit/6, credit/6, forget_consumer/2, copy_queue_state/2]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
@@ -55,8 +55,11 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
--spec(credit/7 :: (token(), pid(), rabbit_types:ctag(),
- non_neg_integer(), boolean(), boolean(), non_neg_integer())
+-spec(initial_credit/6 :: (token(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean(), non_neg_integer())
+ -> token()).
+-spec(credit/6 :: (token(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean(), non_neg_integer())
-> {[rabbit_types:ctag()], token()}).
-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()).
-spec(copy_queue_state/2 :: (token(), token()) -> token()).
@@ -147,14 +150,17 @@ unblock(Limiter) ->
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
+initial_credit(Limiter = #token{q_state = Credits},
+ ChPid, CTag, Credit, Drain, Len) ->
+ {[], Credits2} = update_credit(
+ CTag, Len, ChPid, Credit, Drain, Credits),
+ Limiter#token{q_state = Credits2}.
+
credit(Limiter = #token{q_state = Credits},
- ChPid, CTag, Credit, Drain, Reply, Len) ->
+ ChPid, CTag, Credit, Drain, Len) ->
{Unblock, Credits2} = update_credit(
CTag, Len, ChPid, Credit, Drain, Credits),
- case Reply of
- true -> rabbit_channel:send_credit_reply(ChPid, Len);
- false -> ok
- end,
+ rabbit_channel:send_credit_reply(ChPid, Len),
{Unblock, Limiter#token{q_state = Credits2}}.
forget_consumer(Limiter = #token{q_state = Credits}, CTag) ->