summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-14 17:02:50 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-14 17:02:50 +0000
commit57663f28556439ab5ef9a15a35685732d92ea63a (patch)
tree904cba4e163e367d761e754d06689a0d559bf6f7
parent903efe324ad46fa0978c0ed759eb03d83bb26542 (diff)
downloadrabbitmq-server-57663f28556439ab5ef9a15a35685732d92ea63a.tar.gz
Allow setting credit for a consumer tag that does not yet exist, since the adapter wants to do that. We leak if you set credit for a ctag and then don't consume... but the adapter never does that.
-rw-r--r--src/rabbit_channel.erl31
-rw-r--r--src/rabbit_limiter.erl13
2 files changed, 31 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c3a5b16d..2bc2db83 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
queue_names, queue_monitors, consumer_mapping,
blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, capabilities, trace_state}).
+ unconfirmed, confirmed, capabilities, trace_state, credit_map}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -209,7 +209,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost)},
+ trace_state = rabbit_trace:init(VHost),
+ credit_map = dict:new()},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -684,7 +685,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
+ consumer_mapping = ConsumerMapping,
+ credit_map = CreditMap}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -702,6 +704,15 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ case dict:find(ActualConsumerTag, CreditMap) of
+ {ok, {Credit, Count, Drain}} ->
+ ok = rabbit_amqqueue:inform_limiter(
+ self(), Q#amqqueue.pid,
+ {basic_credit, ActualConsumerTag,
+ Credit, Count, Drain, false});
+ error ->
+ ok
+ end,
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
@@ -711,9 +722,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
+ CrM1 = dict:erase(ActualConsumerTag, CreditMap),
State1 = monitor_delivering_queue(
NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1}),
+ State#ch{consumer_mapping = CM1,
+ credit_map = CrM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1083,14 +1096,16 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
count = Count,
drain = Drain}, _,
- State = #ch{consumer_mapping = Consumers}) ->
+ State = #ch{consumer_mapping = Consumers,
+ credit_map = CMap}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
self(), Q#amqqueue.pid,
- {basic_credit, CTag, Credit, Count, Drain}),
+ {basic_credit, CTag, Credit, Count, Drain, true}),
{noreply, State};
- error -> rabbit_misc:protocol_error(
- not_allowed, "unknown consumer tag '~s'", [CTag])
+ error -> CMap2 = dict:store(CTag, {Credit, Count, Drain}, CMap),
+ {reply, #'basic.credit_ok'{available = 0},
+ State#ch{credit_map = CMap2}}
end;
handle_method(_MethodRecord, _Content, _State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 39879063..900fbec3 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -136,9 +136,13 @@ is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
inform(Limiter = #token{q_state = Credits},
- ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) ->
- {Unblock, Credits2} =
- update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) ->
+ {Unblock, Credits2} = update_credit(CTag, Credit, Count, Drain, Credits),
+ case Reply of
+ true -> rabbit_channel:send_command(
+ ChPid, #'basic.credit_ok'{available = Len});
+ false -> ok
+ end,
{Unblock, Limiter#token{q_state = Credits2}}.
forget_consumer(Limiter = #token{q_state = Credits}, CTag) ->
@@ -188,7 +192,7 @@ send_drained(ChPid, CTag, Count) ->
%% Update the credit state.
%% TODO Edge case: if the queue has nothing in it, and drain is set,
%% we want to send a basic.credit back.
-update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
+update_credit(CTag, Credit, Count0, Drain, Credits) ->
Count =
case dict:find(CTag, Credits) of
%% Use our count if we can, more accurate
@@ -196,7 +200,6 @@ update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
%% But if this is new, take it from the adapter
_ -> Count0
end,
- rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}),
NewCredits = write_credit(CTag, Credit, Count, Drain, Credits),
case Credit > 0 of
true -> {[CTag], NewCredits};