diff options
author | Michael Klishin <klishinm@vmware.com> | 2023-05-15 18:13:40 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-15 18:13:40 +0400 |
commit | 42dbaf8fa6c8e2c310ac4f802887780d2e9e2bb8 (patch) | |
tree | 94399c7106121c5327670d5802b4aac5c40948aa /deps/rabbitmq_mqtt/test/shared_SUITE.erl | |
parent | 1ee8454129b7769fda6dfc1a528e886b290aa258 (diff) | |
parent | ddabc3519117c1556ef1a5467931242d07e5f425 (diff) | |
download | rabbitmq-server-git-42dbaf8fa6c8e2c310ac4f802887780d2e9e2bb8.tar.gz |
Merge pull request #8188 from rabbitmq/message-interceptor
Move plugin rabbitmq-message-timestamp to the core
Diffstat (limited to 'deps/rabbitmq_mqtt/test/shared_SUITE.erl')
-rw-r--r-- | deps/rabbitmq_mqtt/test/shared_SUITE.erl | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index de94bdee7a..6227aa0b57 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -96,6 +96,7 @@ subgroups() -> ,trace ,max_packet_size_unauthenticated ,default_queue_type + ,incoming_message_interceptors ]} ]}, {cluster_size_3, [], @@ -1424,6 +1425,36 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). +incoming_message_interceptors(Config) -> + Key = {rabbit, ?FUNCTION_NAME}, + ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + Ch = rabbit_ct_client_helpers:open_channel(Config), + Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME), + declare_queue(Ch, QName, []), + bind(Ch, QName, Topic), + C = connect(ClientId, Config), + ok = emqtt:publish(C, Topic, Payload), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-mqtt-publish-qos">>, byte, 0}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))), + + delete_queue(Ch, QName), + true = rpc(Config, persistent_term, erase, [Key]), + ok = emqtt:disconnect(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- |