diff options
author | Michael Klishin <klishinm@vmware.com> | 2023-03-14 12:51:59 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-14 12:51:59 +0400 |
commit | 9f6ea7bf1edcb4a1f11444730a92597c41412940 (patch) | |
tree | 0cbab0a50813ba927db5b35454e0fe5853dd179c | |
parent | 1100e70dc76d1feef4581949fed3f24c5d10a186 (diff) | |
parent | 0a3136a916f56d2650f7e90d27af3d4bc4556f44 (diff) | |
download | rabbitmq-server-git-9f6ea7bf1edcb4a1f11444730a92597c41412940.tar.gz |
Merge pull request #7601 from rabbitmq/policy-by-queue-type
Allow applying policies to specific queue types
-rw-r--r-- | deps/rabbit/docs/rabbitmqctl.8 | 8 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_policy.erl | 20 | ||||
-rw-r--r-- | deps/rabbit/test/policy_SUITE.erl | 47 | ||||
-rw-r--r-- | deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs | 6 | ||||
-rw-r--r-- | release-notes/3.12.0.md | 5 |
5 files changed, 77 insertions, 9 deletions
diff --git a/deps/rabbit/docs/rabbitmqctl.8 b/deps/rabbit/docs/rabbitmqctl.8 index b5b2d7eabc..3c93c1ae55 100644 --- a/deps/rabbit/docs/rabbitmqctl.8 +++ b/deps/rabbit/docs/rabbitmqctl.8 @@ -1724,7 +1724,13 @@ Which types of object this policy should apply to. Possible values are: .Bl -bullet -compact .It -queues +queues (all queue types, including streams) +.It +classic_queues (classic queues only) +.It +quorum_queues (quorum queues only) +.It +streams (streams only) .It exchanges .It diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl index bdb96468d9..f9b183c49b 100644 --- a/deps/rabbit/src/rabbit_policy.erl +++ b/deps/rabbit/src/rabbit_policy.erl @@ -193,8 +193,8 @@ match_all(NameOrQueue, Policies) -> lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(NameOrQueue, P)]). matches(Q, Policy) when ?is_amqqueue(Q) -> - #resource{name = Name, kind = Kind, virtual_host = VHost} = amqqueue:get_name(Q), - matches_type(Kind, pget('apply-to', Policy)) andalso + #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q), + matches_queue_type(queue, amqqueue:get_type(Q), pget('apply-to', Policy)) andalso is_applicable(Q, pget(definition, Policy)) andalso match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso VHost =:= pget(vhost, Policy); @@ -517,11 +517,16 @@ maybe_notify_of_policy_change({Q1, Q2}, PolicyDef, ActingUser) when ?is_amqqueue rabbit_amqqueue:policy_changed(Q1, Q2). matches_type(exchange, <<"exchanges">>) -> true; -matches_type(queue, <<"queues">>) -> true; matches_type(exchange, <<"all">>) -> true; -matches_type(queue, <<"all">>) -> true; matches_type(_, _) -> false. +matches_queue_type(queue, _, <<"all">>) -> true; +matches_queue_type(queue, _, <<"queues">>) -> true; +matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true; +matches_queue_type(queue, rabbit_quorum_queue, <<"quorum_queues">>) -> true; +matches_queue_type(queue, rabbit_stream_queue, <<"streams">>) -> true; +matches_queue_type(queue, _, _) -> false. + priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B). is_applicable(Q, Policy) when ?is_amqqueue(Q) -> @@ -602,6 +607,9 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). apply_to_validation(_Name, <<"all">>) -> ok; apply_to_validation(_Name, <<"exchanges">>) -> ok; apply_to_validation(_Name, <<"queues">>) -> ok; +apply_to_validation(_Name, <<"classic_queues">>) -> ok; +apply_to_validation(_Name, <<"quorum_queues">>) -> ok; +apply_to_validation(_Name, <<"streams">>) -> ok; apply_to_validation(_Name, Term) -> - {error, "apply-to '~ts' unrecognised; should be 'queues', 'exchanges' " - "or 'all'", [Term]}. + {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', " + " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}. diff --git a/deps/rabbit/test/policy_SUITE.erl b/deps/rabbit/test/policy_SUITE.erl index 26457c4389..c638aaedd9 100644 --- a/deps/rabbit/test/policy_SUITE.erl +++ b/deps/rabbit/test/policy_SUITE.erl @@ -9,7 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). - +-include_lib("stdlib/include/assert.hrl"). -compile(export_all). @@ -25,7 +25,8 @@ groups() -> policy_ttl, operator_policy_ttl, operator_retroactive_policy_ttl, - operator_retroactive_policy_publish_ttl + operator_retroactive_policy_publish_ttl, + queue_type_specific_policies ]} ]. @@ -203,6 +204,43 @@ target_count_policy(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +queue_type_specific_policies(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + ClassicQ = <<"policy_ttl-classic_queue">>, + QuorumQ = <<"policy_ttl-quorum_queue">>, + StreamQ = <<"policy_ttl-stream_queue">>, + + %% all policies match ".*" but different values should be applied based on queue type + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-classic">>, + <<".*">>, <<"classic_queues">>, [{<<"message-ttl">>, 20}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-quorum">>, + <<".*">>, <<"quorum_queues">>, [{<<"message-ttl">>, 40}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-stream">>, + <<".*">>, <<"streams">>, [{<<"max-age">>, "1h"}]), + + declare(Ch, ClassicQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]), + declare(Ch, QuorumQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, StreamQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]), + timer:sleep(1), + + ?assertMatch(20, check_policy_value(Server, ClassicQ, <<"message-ttl">>)), + ?assertMatch(40, check_policy_value(Server, QuorumQ, <<"message-ttl">>)), + ?assertMatch("1h", check_policy_value(Server, StreamQ, <<"max-age">>)), + + delete(Ch, ClassicQ), + delete(Ch, QuorumQ), + delete(Ch, StreamQ), + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-classic">>), + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-quorum">>), + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-stream">>), + + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + %%---------------------------------------------------------------------------- @@ -211,6 +249,11 @@ declare(Ch, Q) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}). +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + arguments = Args}). + delete(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs index f2a322b458..0bd2b2d297 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs @@ -83,6 +83,9 @@ <option value="all">Exchanges and queues</option> <option value="exchanges">Exchanges</option> <option value="queues">Queues</option> + <option value="classic_queues">Classic Queues</option> + <option value="quorum_queues">Quorum Queues</option> + <option value="streams">Streams</option> </select> </td> </tr> @@ -259,6 +262,9 @@ <td> <select name="apply-to"> <option value="queues">Queues</option> + <option value="classic_queues">Classic Queues</option> + <option value="quorum_queues">Quorum Queues</option> + <option value="streams">Streams</option> </select> </td> </tr> diff --git a/release-notes/3.12.0.md b/release-notes/3.12.0.md index 39d5db013c..76bef1f56b 100644 --- a/release-notes/3.12.0.md +++ b/release-notes/3.12.0.md @@ -146,6 +146,11 @@ in the `3.11.x` release series. GitHub issue: [#7208](https://github.com/rabbitmq/rabbitmq-server/issues/7208). + * Policies can now be defined to only apply to specific queue types. For example, you can have two policies that match all queue names ('.*') + but different queue types, so that different parameters are applied to all queues of a specific type. Example usage: + ``` + rabbitmqctl set_policy at-least-once-dead-lettering ".*" '{"dead-letter-strategy": "at-least-once"}' --apply-to quorum_queues + ``` ### CLI Tools |