diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 16:00:38 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 16:00:38 +0000 |
commit | 903efe324ad46fa0978c0ed759eb03d83bb26542 (patch) | |
tree | 6f38afd4dfd38408b1d17d227531781201ea5a68 | |
parent | e4502d77d2aec095f9983a1cb03d0ec7472612f1 (diff) | |
download | rabbitmq-server-903efe324ad46fa0978c0ed759eb03d83bb26542.tar.gz |
TODO--
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 7 |
2 files changed, 8 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f33ce920..47bc1641 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1152,10 +1152,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{limiter = Limiter, blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), + Limiter1 = rabbit_limiter:forget_consumer(Limiter, ConsumerTag), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + update_consumer_count(C#cr{limiter = Limiter1, + blocked_consumers = Blocked1}, -1), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9da1bc6f..39879063 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -27,7 +27,7 @@ -export([limit/2, can_ch_send/3, can_cons_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([inform/4]). +-export([inform/4, forget_consumer/2]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). @@ -141,6 +141,9 @@ inform(Limiter = #token{q_state = Credits}, update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), {Unblock, Limiter#token{q_state = Credits2}}. +forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> + Limiter#token{q_state = dict:erase(CTag, Credits)}. + %%---------------------------------------------------------------------------- %% Queue-local code %%---------------------------------------------------------------------------- @@ -200,8 +203,6 @@ update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> false -> {[], NewCredits} end. -%% TODO currently we leak when a single session creates and destroys -%% lot of links. write_credit(CTag, Credit, Count, Drain, Credits) -> dict:store(CTag, #credit{credit = Credit, count = Count, |