summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-03-14 12:51:59 +0400
committerGitHub <noreply@github.com>2023-03-14 12:51:59 +0400
commit9f6ea7bf1edcb4a1f11444730a92597c41412940 (patch)
tree0cbab0a50813ba927db5b35454e0fe5853dd179c
parent1100e70dc76d1feef4581949fed3f24c5d10a186 (diff)
parent0a3136a916f56d2650f7e90d27af3d4bc4556f44 (diff)
downloadrabbitmq-server-git-9f6ea7bf1edcb4a1f11444730a92597c41412940.tar.gz
Merge pull request #7601 from rabbitmq/policy-by-queue-type
Allow applying policies to specific queue types
-rw-r--r--deps/rabbit/docs/rabbitmqctl.88
-rw-r--r--deps/rabbit/src/rabbit_policy.erl20
-rw-r--r--deps/rabbit/test/policy_SUITE.erl47
-rw-r--r--deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs6
-rw-r--r--release-notes/3.12.0.md5
5 files changed, 77 insertions, 9 deletions
diff --git a/deps/rabbit/docs/rabbitmqctl.8 b/deps/rabbit/docs/rabbitmqctl.8
index b5b2d7eabc..3c93c1ae55 100644
--- a/deps/rabbit/docs/rabbitmqctl.8
+++ b/deps/rabbit/docs/rabbitmqctl.8
@@ -1724,7 +1724,13 @@ Which types of object this policy should apply to.
Possible values are:
.Bl -bullet -compact
.It
-queues
+queues (all queue types, including streams)
+.It
+classic_queues (classic queues only)
+.It
+quorum_queues (quorum queues only)
+.It
+streams (streams only)
.It
exchanges
.It
diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl
index bdb96468d9..f9b183c49b 100644
--- a/deps/rabbit/src/rabbit_policy.erl
+++ b/deps/rabbit/src/rabbit_policy.erl
@@ -193,8 +193,8 @@ match_all(NameOrQueue, Policies) ->
lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(NameOrQueue, P)]).
matches(Q, Policy) when ?is_amqqueue(Q) ->
- #resource{name = Name, kind = Kind, virtual_host = VHost} = amqqueue:get_name(Q),
- matches_type(Kind, pget('apply-to', Policy)) andalso
+ #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
+ matches_queue_type(queue, amqqueue:get_type(Q), pget('apply-to', Policy)) andalso
is_applicable(Q, pget(definition, Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
VHost =:= pget(vhost, Policy);
@@ -517,11 +517,16 @@ maybe_notify_of_policy_change({Q1, Q2}, PolicyDef, ActingUser) when ?is_amqqueue
rabbit_amqqueue:policy_changed(Q1, Q2).
matches_type(exchange, <<"exchanges">>) -> true;
-matches_type(queue, <<"queues">>) -> true;
matches_type(exchange, <<"all">>) -> true;
-matches_type(queue, <<"all">>) -> true;
matches_type(_, _) -> false.
+matches_queue_type(queue, _, <<"all">>) -> true;
+matches_queue_type(queue, _, <<"queues">>) -> true;
+matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true;
+matches_queue_type(queue, rabbit_quorum_queue, <<"quorum_queues">>) -> true;
+matches_queue_type(queue, rabbit_stream_queue, <<"streams">>) -> true;
+matches_queue_type(queue, _, _) -> false.
+
priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
is_applicable(Q, Policy) when ?is_amqqueue(Q) ->
@@ -602,6 +607,9 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
apply_to_validation(_Name, <<"all">>) -> ok;
apply_to_validation(_Name, <<"exchanges">>) -> ok;
apply_to_validation(_Name, <<"queues">>) -> ok;
+apply_to_validation(_Name, <<"classic_queues">>) -> ok;
+apply_to_validation(_Name, <<"quorum_queues">>) -> ok;
+apply_to_validation(_Name, <<"streams">>) -> ok;
apply_to_validation(_Name, Term) ->
- {error, "apply-to '~ts' unrecognised; should be 'queues', 'exchanges' "
- "or 'all'", [Term]}.
+ {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
+ " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}.
diff --git a/deps/rabbit/test/policy_SUITE.erl b/deps/rabbit/test/policy_SUITE.erl
index 26457c4389..c638aaedd9 100644
--- a/deps/rabbit/test/policy_SUITE.erl
+++ b/deps/rabbit/test/policy_SUITE.erl
@@ -9,7 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-
+-include_lib("stdlib/include/assert.hrl").
-compile(export_all).
@@ -25,7 +25,8 @@ groups() ->
policy_ttl,
operator_policy_ttl,
operator_retroactive_policy_ttl,
- operator_retroactive_policy_publish_ttl
+ operator_retroactive_policy_publish_ttl,
+ queue_type_specific_policies
]}
].
@@ -203,6 +204,43 @@ target_count_policy(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
+queue_type_specific_policies(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ ClassicQ = <<"policy_ttl-classic_queue">>,
+ QuorumQ = <<"policy_ttl-quorum_queue">>,
+ StreamQ = <<"policy_ttl-stream_queue">>,
+
+ %% all policies match ".*" but different values should be applied based on queue type
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-classic">>,
+ <<".*">>, <<"classic_queues">>, [{<<"message-ttl">>, 20}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-quorum">>,
+ <<".*">>, <<"quorum_queues">>, [{<<"message-ttl">>, 40}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-stream">>,
+ <<".*">>, <<"streams">>, [{<<"max-age">>, "1h"}]),
+
+ declare(Ch, ClassicQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
+ declare(Ch, QuorumQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ declare(Ch, StreamQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
+ timer:sleep(1),
+
+ ?assertMatch(20, check_policy_value(Server, ClassicQ, <<"message-ttl">>)),
+ ?assertMatch(40, check_policy_value(Server, QuorumQ, <<"message-ttl">>)),
+ ?assertMatch("1h", check_policy_value(Server, StreamQ, <<"max-age">>)),
+
+ delete(Ch, ClassicQ),
+ delete(Ch, QuorumQ),
+ delete(Ch, StreamQ),
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-classic">>),
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-quorum">>),
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-stream">>),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
%%----------------------------------------------------------------------------
@@ -211,6 +249,11 @@ declare(Ch, Q) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true}).
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ arguments = Args}).
+
delete(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs
index f2a322b458..0bd2b2d297 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs
@@ -83,6 +83,9 @@
<option value="all">Exchanges and queues</option>
<option value="exchanges">Exchanges</option>
<option value="queues">Queues</option>
+ <option value="classic_queues">Classic Queues</option>
+ <option value="quorum_queues">Quorum Queues</option>
+ <option value="streams">Streams</option>
</select>
</td>
</tr>
@@ -259,6 +262,9 @@
<td>
<select name="apply-to">
<option value="queues">Queues</option>
+ <option value="classic_queues">Classic Queues</option>
+ <option value="quorum_queues">Quorum Queues</option>
+ <option value="streams">Streams</option>
</select>
</td>
</tr>
diff --git a/release-notes/3.12.0.md b/release-notes/3.12.0.md
index 39d5db013c..76bef1f56b 100644
--- a/release-notes/3.12.0.md
+++ b/release-notes/3.12.0.md
@@ -146,6 +146,11 @@ in the `3.11.x` release series.
GitHub issue: [#7208](https://github.com/rabbitmq/rabbitmq-server/issues/7208).
+ * Policies can now be defined to only apply to specific queue types. For example, you can have two policies that match all queue names ('.*')
+ but different queue types, so that different parameters are applied to all queues of a specific type. Example usage:
+ ```
+ rabbitmqctl set_policy at-least-once-dead-lettering ".*" '{"dead-letter-strategy": "at-least-once"}' --apply-to quorum_queues
+ ```
### CLI Tools