summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-05-15 18:13:40 +0400
committerGitHub <noreply@github.com>2023-05-15 18:13:40 +0400
commit42dbaf8fa6c8e2c310ac4f802887780d2e9e2bb8 (patch)
tree94399c7106121c5327670d5802b4aac5c40948aa
parent1ee8454129b7769fda6dfc1a528e886b290aa258 (diff)
parentddabc3519117c1556ef1a5467931242d07e5f425 (diff)
downloadrabbitmq-server-git-42dbaf8fa6c8e2c310ac4f802887780d2e9e2bb8.tar.gz
Merge pull request #8188 from rabbitmq/message-interceptor
Move plugin rabbitmq-message-timestamp to the core
-rw-r--r--deps/rabbit/BUILD.bazel5
-rw-r--r--deps/rabbit/app.bzl12
-rw-r--r--deps/rabbit/priv/schema/rabbit.schema26
-rw-r--r--deps/rabbit/src/rabbit.erl16
-rw-r--r--deps/rabbit/src/rabbit_channel.erl3
-rw-r--r--deps/rabbit/src/rabbit_message_interceptor.erl65
-rw-r--r--deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets22
-rw-r--r--deps/rabbit/test/rabbit_message_interceptor_SUITE.erl112
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl15
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl31
-rwxr-xr-xmoduleindex.yaml1
11 files changed, 293 insertions, 15 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 000e00d7d7..f7476b222d 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -499,6 +499,11 @@ rabbitmq_integration_suite(
)
rabbitmq_integration_suite(
+ name = "rabbit_message_interceptor_SUITE",
+ size = "medium",
+)
+
+rabbitmq_integration_suite(
name = "message_size_limit_SUITE",
size = "medium",
)
diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl
index 950cdd31e8..5ab5a71ca7 100644
--- a/deps/rabbit/app.bzl
+++ b/deps/rabbit/app.bzl
@@ -139,6 +139,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -379,6 +380,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -635,6 +637,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -1936,3 +1939,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
+ erlang_bytecode(
+ name = "rabbit_message_interceptor_SUITE_beam_files",
+ testonly = True,
+ srcs = ["test/rabbit_message_interceptor_SUITE.erl"],
+ outs = ["test/rabbit_message_interceptor_SUITE.beam"],
+ app_name = "rabbit",
+ erlc_opts = "//:test_erlc_opts",
+ deps = ["//deps/amqp_client:erlang_app"],
+ )
diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema
index 5024376213..7c8b627e9c 100644
--- a/deps/rabbit/priv/schema/rabbit.schema
+++ b/deps/rabbit/priv/schema/rabbit.schema
@@ -2504,6 +2504,32 @@ end}.
end
}.
+%%
+%% Message interceptors
+%%
+{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
+ {datatype, {enum, [true, false]}}]}.
+
+{translation, "rabbit.incoming_message_interceptors",
+ fun(Conf) ->
+ case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
+ [] ->
+ cuttlefish:unset();
+ L ->
+ [begin
+ Interceptor = list_to_atom(Interceptor0),
+ case lists:member(Interceptor, [set_header_timestamp,
+ set_header_routing_node]) of
+ true ->
+ {Interceptor, Overwrite};
+ false ->
+ cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
+ end
+ end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
+ end
+ end
+}.
+
% ===============================
% Validators
% ===============================
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 3e6e705a55..1269c77a97 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -1646,18 +1646,20 @@ ensure_working_fhc() ->
%% should be placed into persistent_term for efficiency.
persist_static_configuration() ->
persist_static_configuration(
- [{rabbit, classic_queue_index_v2_segment_entry_count},
- {rabbit, classic_queue_store_v2_max_cache_size},
- {rabbit, classic_queue_store_v2_check_crc32}
+ [classic_queue_index_v2_segment_entry_count,
+ classic_queue_store_v2_max_cache_size,
+ classic_queue_store_v2_check_crc32,
+ incoming_message_interceptors
]).
-persist_static_configuration(AppParams) ->
+persist_static_configuration(Params) ->
+ App = ?MODULE,
lists:foreach(
- fun(Key = {App, Param}) ->
+ fun(Param) ->
case application:get_env(App, Param) of
{ok, Value} ->
- ok = persistent_term:put(Key, Value);
+ ok = persistent_term:put({App, Param}, Value);
undefined ->
ok
end
- end, AppParams).
+ end, Params).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 666d6e88d6..a78b8e1c08 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = #content {properties = Props} =
+ DecodedContent0 = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State),
+ DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl
new file mode 100644
index 0000000000..05407349bf
--- /dev/null
+++ b/deps/rabbit/src/rabbit_message_interceptor.erl
@@ -0,0 +1,65 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp
+%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can
+%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that
+%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages.
+-module(rabbit_message_interceptor).
+
+-export([intercept/1]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
+-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
+
+-type content() :: rabbit_types:decoded_content().
+
+-spec intercept(content()) -> content().
+intercept(Content) ->
+ Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []),
+ lists:foldl(fun(I, C) ->
+ intercept(C, I)
+ end, Content, Interceptors).
+
+intercept(Content, {set_header_routing_node, Overwrite}) ->
+ Node = atom_to_binary(node()),
+ set_header(Content, {?HEADER_ROUTING_NODE, longstr, Node}, Overwrite);
+intercept(Content0, {set_header_timestamp, Overwrite}) ->
+ NowMs = os:system_time(millisecond),
+ NowSecs = NowMs div 1_000,
+ Content = set_header(Content0, {?HEADER_TIMESTAMP, long, NowMs}, Overwrite),
+ set_property_timestamp(Content, NowSecs, Overwrite).
+
+-spec set_header(content(),
+ {binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()},
+ boolean()) ->
+ content().
+set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}},
+ Header = {Key, Type, Value}, Overwrite) ->
+ case {rabbit_basic:header(Key, Headers0), Overwrite} of
+ {Val, false} when Val =/= undefined ->
+ Content;
+ _ ->
+ Headers = if Headers0 =:= undefined -> [Header];
+ true -> rabbit_misc:set_table_value(Headers0, Key, Type, Value)
+ end,
+ Content#content{properties = Props#'P_basic'{headers = Headers},
+ properties_bin = none}
+ end.
+
+-spec set_property_timestamp(content(), pos_integer(), boolean()) -> content().
+set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}},
+ Timestamp, Overwrite) ->
+ case {Ts, Overwrite} of
+ {Secs, false} when is_integer(Secs) ->
+ Content;
+ _ ->
+ Content#content{properties = Props#'P_basic'{timestamp = Timestamp},
+ properties_bin = none}
+ end.
diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
index b9444eed47..b387befeee 100644
--- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
+++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
@@ -925,6 +925,28 @@ credential_validator.regexp = ^abc\\d+",
[{rabbit, [
{permitted_deprecated_features, #{classic_mirrored_queues => false}}
]}],
+ []},
+
+ %%
+ %% Message interceptors
+ %%
+
+ {message_interceptors,
+ "message_interceptors.incoming.set_header_timestamp.overwrite = true",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_timestamp, true}]}
+ ]}],
+ []},
+
+ {message_interceptors,
+ "
+ message_interceptors.incoming.set_header_routing_node.overwrite = false
+ message_interceptors.incoming.set_header_timestamp.overwrite = false
+ ",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_routing_node, false},
+ {set_header_timestamp, false}]}
+ ]}],
[]}
].
diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
new file mode 100644
index 0000000000..db70c8e45f
--- /dev/null
+++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
@@ -0,0 +1,112 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+-module(rabbit_message_interceptor_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile([nowarn_export_all, export_all]).
+
+-import(rabbit_ct_helpers, [eventually/1]).
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+groups() ->
+ [
+ {tests, [shuffle], [headers_overwrite,
+ headers_no_overwrite
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config0) ->
+ Config1 = rabbit_ct_helpers:set_config(
+ Config0, [{rmq_nodename_suffix, Testcase}]),
+ Overwrite = case Testcase of
+ headers_overwrite -> true;
+ headers_no_overwrite -> false
+ end,
+ Val = maps:to_list(
+ maps:from_keys([set_header_timestamp,
+ set_header_routing_node],
+ Overwrite)),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
+ rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config0) ->
+ Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase),
+ rabbit_ct_helpers:run_teardown_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+headers_overwrite(Config) ->
+ headers(true, Config).
+
+headers_no_overwrite(Config) ->
+ headers(false, Config).
+
+headers(Overwrite, Config) ->
+ Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
+ Payload = QName = atom_to_binary(?FUNCTION_NAME),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = Payload}),
+ AssertHeaders =
+ fun() ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-routed-by">>, longstr, Server}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+ AssertHeaders(),
+
+ Msg = #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = 1,
+ headers = [{<<"timestamp_in_ms">>, long, 1000},
+ {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}]
+ }},
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg),
+ case Overwrite of
+ true ->
+ AssertHeaders();
+ false ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{}, Msg},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+
+ #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ ok.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 48bc67d45b..7810dc1fc7 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -1082,13 +1082,14 @@ publish_to_queues(
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
delivery_mode = delivery_mode(Qos)},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
- Content = #content{
- class_id = ClassId,
- properties = Props,
- properties_bin = none,
- protocol = none,
- payload_fragments_rev = [Payload]
- },
+ Content0 = #content{
+ class_id = ClassId,
+ properties = Props,
+ properties_bin = none,
+ protocol = none,
+ payload_fragments_rev = [Payload]
+ },
+ Content = rabbit_message_interceptor:intercept(Content0),
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index de94bdee7a..6227aa0b57 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -96,6 +96,7 @@ subgroups() ->
,trace
,max_packet_size_unauthenticated
,default_queue_type
+ ,incoming_message_interceptors
]}
]},
{cluster_size_3, [],
@@ -1424,6 +1425,36 @@ default_queue_type(Config) ->
ok = emqtt:disconnect(C2),
ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost).
+incoming_message_interceptors(Config) ->
+ Key = {rabbit, ?FUNCTION_NAME},
+ ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME),
+ declare_queue(Ch, QName, []),
+ bind(Ch, QName, Topic),
+ C = connect(ClientId, Config),
+ ok = emqtt:publish(C, Topic, Payload),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-mqtt-publish-qos">>, byte, 0}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}))),
+
+ delete_queue(Ch, QName),
+ true = rpc(Config, persistent_term, erase, [Key]),
+ ok = emqtt:disconnect(C).
+
%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
diff --git a/moduleindex.yaml b/moduleindex.yaml
index 8d9b7c854d..a4880c091d 100755
--- a/moduleindex.yaml
+++ b/moduleindex.yaml
@@ -575,6 +575,7 @@ rabbit:
- rabbit_looking_glass
- rabbit_maintenance
- rabbit_memory_monitor
+- rabbit_message_interceptor
- rabbit_metrics
- rabbit_mirror_queue_coordinator
- rabbit_mirror_queue_master