summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/test/shared_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_mqtt/test/shared_SUITE.erl')
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl31
1 files changed, 31 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index de94bdee7a..6227aa0b57 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -96,6 +96,7 @@ subgroups() ->
,trace
,max_packet_size_unauthenticated
,default_queue_type
+ ,incoming_message_interceptors
]}
]},
{cluster_size_3, [],
@@ -1424,6 +1425,36 @@ default_queue_type(Config) ->
ok = emqtt:disconnect(C2),
ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost).
+incoming_message_interceptors(Config) ->
+ Key = {rabbit, ?FUNCTION_NAME},
+ ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME),
+ declare_queue(Ch, QName, []),
+ bind(Ch, QName, Topic),
+ C = connect(ClientId, Config),
+ ok = emqtt:publish(C, Topic, Payload),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-mqtt-publish-qos">>, byte, 0}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}))),
+
+ delete_queue(Ch, QName),
+ true = rpc(Config, persistent_term, erase, [Key]),
+ ok = emqtt:disconnect(C).
+
%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------