diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-30 15:22:18 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-30 15:22:18 +0300 |
commit | 051e504f8eeb6a856c983586f271cdede1452670 (patch) | |
tree | 15d74100ec90e682ecd2c66948e7312099496ca3 | |
parent | c9109e0dc1cf6e846a9156f089d9a4857f9be767 (diff) | |
parent | 50160a0232450735f97bea22d5310c0a470e61c2 (diff) | |
download | rabbitmq-server-git-051e504f8eeb6a856c983586f271cdede1452670.tar.gz |
Merge pull request #3805 from tomyouyou/deativate_limit_of_qpid
To deactivate limit of all QPids when the limiter has been changed from 'limit' to 'unlimit'.
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 10 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_consumers.erl | 9 |
4 files changed, 32 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a10b30e0a5..91f4a3f130 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -64,6 +64,8 @@ -export([check_max_age/1]). -export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]). +-export([deactivate_limit_all/2]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -1618,6 +1620,13 @@ activate_limit_all(QRefs, ChPid) -> delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). +-spec deactivate_limit_all(qpids(), pid()) -> ok. + +deactivate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], + delegate:invoke_no_result(QPids, {gen_server2, cast, + [{deactivate_limit, ChPid}]}). + -spec credit(amqqueue:amqqueue(), rabbit_types:ctag(), non_neg_integer(), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index d8b29d1d32..a9d673d65d 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -1582,6 +1582,10 @@ handle_cast({activate_limit, ChPid}, State) -> noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), ChPid, State)); +handle_cast({deactivate_limit, ChPid}, State) -> + noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(), + ChPid, State)); + handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 37e8fd21ae..7fee29f2c3 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1546,6 +1546,11 @@ handle_method(#'basic.qos'{global = false, limiter = Limiter}) -> %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + case rabbit_limiter:is_active(Limiter) of + true -> rabbit_amqqueue:deactivate_limit_all( + classic_consumer_queue_pids(State#ch.consumer_mapping), self()); + false -> ok + end, {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount}, limiter = Limiter1}}; @@ -1553,6 +1558,11 @@ handle_method(#'basic.qos'{global = true, prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + case rabbit_limiter:is_active(Limiter) of + true -> rabbit_amqqueue:deactivate_limit_all( + classic_consumer_queue_pids(State#ch.consumer_mapping), self()); + false -> ok + end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl index 83538bd5f5..39c759aa5e 100644 --- a/deps/rabbit/src/rabbit_queue_consumers.erl +++ b/deps/rabbit/src/rabbit_queue_consumers.erl @@ -15,6 +15,8 @@ credit/6, utilisation/1, capacity/1, is_same/3, get_consumer/1, get/3, consumer_tag/1, get_infos/1]). +-export([deactivate_limit_fun/0]). + %%---------------------------------------------------------------------------- -define(QUEUE, lqueue). @@ -385,6 +387,13 @@ activate_limit_fun() -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. +-spec deactivate_limit_fun() -> cr_fun(). + +deactivate_limit_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:deactivate(Limiter)} + end. + -spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), state()) -> 'unchanged' | {'unblocked', state()}. |