diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 17:02:50 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 17:02:50 +0000 |
commit | 57663f28556439ab5ef9a15a35685732d92ea63a (patch) | |
tree | 904cba4e163e367d761e754d06689a0d559bf6f7 | |
parent | 903efe324ad46fa0978c0ed759eb03d83bb26542 (diff) | |
download | rabbitmq-server-57663f28556439ab5ef9a15a35685732d92ea63a.tar.gz |
Allow setting credit for a consumer tag that does not yet exist, since the adapter wants to do that. We leak if you set credit for a ctag and then don't consume... but the adapter never does that.
-rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 13 |
2 files changed, 31 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c3a5b16d..2bc2db83 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ queue_names, queue_monitors, consumer_mapping, blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, capabilities, trace_state}). + unconfirmed, confirmed, capabilities, trace_state, credit_map}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -209,7 +209,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + credit_map = dict:new()}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -684,7 +685,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + consumer_mapping = ConsumerMapping, + credit_map = CreditMap}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -702,6 +704,15 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> + case dict:find(ActualConsumerTag, CreditMap) of + {ok, {Credit, Count, Drain}} -> + ok = rabbit_amqqueue:inform_limiter( + self(), Q#amqqueue.pid, + {basic_credit, ActualConsumerTag, + Credit, Count, Drain, false}); + error -> + ok + end, {rabbit_amqqueue:basic_consume( Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, @@ -711,9 +722,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + CrM1 = dict:erase(ActualConsumerTag, CreditMap), State1 = monitor_delivering_queue( NoAck, QPid, QName, - State#ch{consumer_mapping = CM1}), + State#ch{consumer_mapping = CM1, + credit_map = CrM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1083,14 +1096,16 @@ handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, count = Count, drain = Drain}, _, - State = #ch{consumer_mapping = Consumers}) -> + State = #ch{consumer_mapping = Consumers, + credit_map = CMap}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( self(), Q#amqqueue.pid, - {basic_credit, CTag, Credit, Count, Drain}), + {basic_credit, CTag, Credit, Count, Drain, true}), {noreply, State}; - error -> rabbit_misc:protocol_error( - not_allowed, "unknown consumer tag '~s'", [CTag]) + error -> CMap2 = dict:store(CTag, {Credit, Count, Drain}, CMap), + {reply, #'basic.credit_ok'{available = 0}, + State#ch{credit_map = CMap2}} end; handle_method(_MethodRecord, _Content, _State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 39879063..900fbec3 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -136,9 +136,13 @@ is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). inform(Limiter = #token{q_state = Credits}, - ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> - {Unblock, Credits2} = - update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) -> + {Unblock, Credits2} = update_credit(CTag, Credit, Count, Drain, Credits), + case Reply of + true -> rabbit_channel:send_command( + ChPid, #'basic.credit_ok'{available = Len}); + false -> ok + end, {Unblock, Limiter#token{q_state = Credits2}}. forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> @@ -188,7 +192,7 @@ send_drained(ChPid, CTag, Count) -> %% Update the credit state. %% TODO Edge case: if the queue has nothing in it, and drain is set, %% we want to send a basic.credit back. -update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> +update_credit(CTag, Credit, Count0, Drain, Credits) -> Count = case dict:find(CTag, Credits) of %% Use our count if we can, more accurate @@ -196,7 +200,6 @@ update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> %% But if this is new, take it from the adapter _ -> Count0 end, - rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}), NewCredits = write_credit(CTag, Credit, Count, Drain, Credits), case Credit > 0 of true -> {[CTag], NewCredits}; |