summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-14 16:00:38 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-14 16:00:38 +0000
commit903efe324ad46fa0978c0ed759eb03d83bb26542 (patch)
tree6f38afd4dfd38408b1d17d227531781201ea5a68
parente4502d77d2aec095f9983a1cb03d0ec7472612f1 (diff)
downloadrabbitmq-server-903efe324ad46fa0978c0ed759eb03d83bb26542.tar.gz
TODO--
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_limiter.erl7
2 files changed, 8 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f33ce920..47bc1641 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1152,10 +1152,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
+ C = #cr{limiter = Limiter, blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
+ Limiter1 = rabbit_limiter:forget_consumer(Limiter, ConsumerTag),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ update_consumer_count(C#cr{limiter = Limiter1,
+ blocked_consumers = Blocked1}, -1),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9da1bc6f..39879063 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -27,7 +27,7 @@
-export([limit/2, can_ch_send/3, can_cons_send/4,
ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
--export([inform/4]).
+-export([inform/4, forget_consumer/2]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
@@ -141,6 +141,9 @@ inform(Limiter = #token{q_state = Credits},
update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
{Unblock, Limiter#token{q_state = Credits2}}.
+forget_consumer(Limiter = #token{q_state = Credits}, CTag) ->
+ Limiter#token{q_state = dict:erase(CTag, Credits)}.
+
%%----------------------------------------------------------------------------
%% Queue-local code
%%----------------------------------------------------------------------------
@@ -200,8 +203,6 @@ update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
false -> {[], NewCredits}
end.
-%% TODO currently we leak when a single session creates and destroys
-%% lot of links.
write_credit(CTag, Credit, Count, Drain, Credits) ->
dict:store(CTag, #credit{credit = Credit,
count = Count,