summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-30 15:22:18 +0300
committerGitHub <noreply@github.com>2021-11-30 15:22:18 +0300
commit051e504f8eeb6a856c983586f271cdede1452670 (patch)
tree15d74100ec90e682ecd2c66948e7312099496ca3
parentc9109e0dc1cf6e846a9156f089d9a4857f9be767 (diff)
parent50160a0232450735f97bea22d5310c0a470e61c2 (diff)
downloadrabbitmq-server-git-051e504f8eeb6a856c983586f271cdede1452670.tar.gz
Merge pull request #3805 from tomyouyou/deativate_limit_of_qpid
To deactivate limit of all QPids when the limiter has been changed from 'limit' to 'unlimit'.
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl9
-rw-r--r--deps/rabbit/src/rabbit_amqqueue_process.erl4
-rw-r--r--deps/rabbit/src/rabbit_channel.erl10
-rw-r--r--deps/rabbit/src/rabbit_queue_consumers.erl9
4 files changed, 32 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index a10b30e0a5..91f4a3f130 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -64,6 +64,8 @@
-export([check_max_age/1]).
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
+-export([deactivate_limit_all/2]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -1618,6 +1620,13 @@ activate_limit_all(QRefs, ChPid) ->
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
+-spec deactivate_limit_all(qpids(), pid()) -> ok.
+
+deactivate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
+ delegate:invoke_no_result(QPids, {gen_server2, cast,
+ [{deactivate_limit, ChPid}]}).
+
-spec credit(amqqueue:amqqueue(),
rabbit_types:ctag(),
non_neg_integer(),
diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl
index d8b29d1d32..a9d673d65d 100644
--- a/deps/rabbit/src/rabbit_amqqueue_process.erl
+++ b/deps/rabbit/src/rabbit_amqqueue_process.erl
@@ -1582,6 +1582,10 @@ handle_cast({activate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
ChPid, State));
+handle_cast({deactivate_limit, ChPid}, State) ->
+ noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
+ ChPid, State));
+
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 37e8fd21ae..7fee29f2c3 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1546,6 +1546,11 @@ handle_method(#'basic.qos'{global = false,
limiter = Limiter}) ->
%% Ensures that if default was set, it's overridden
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ case rabbit_limiter:is_active(Limiter) of
+ true -> rabbit_amqqueue:deactivate_limit_all(
+ classic_consumer_queue_pids(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
{reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount},
limiter = Limiter1}};
@@ -1553,6 +1558,11 @@ handle_method(#'basic.qos'{global = true,
prefetch_count = 0},
_, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ case rabbit_limiter:is_active(Limiter) of
+ true -> rabbit_amqqueue:deactivate_limit_all(
+ classic_consumer_queue_pids(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl
index 83538bd5f5..39c759aa5e 100644
--- a/deps/rabbit/src/rabbit_queue_consumers.erl
+++ b/deps/rabbit/src/rabbit_queue_consumers.erl
@@ -15,6 +15,8 @@
credit/6, utilisation/1, capacity/1, is_same/3, get_consumer/1, get/3,
consumer_tag/1, get_infos/1]).
+-export([deactivate_limit_fun/0]).
+
%%----------------------------------------------------------------------------
-define(QUEUE, lqueue).
@@ -385,6 +387,13 @@ activate_limit_fun() ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
+-spec deactivate_limit_fun() -> cr_fun().
+
+deactivate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:deactivate(Limiter)}
+ end.
+
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.