diff options
Diffstat (limited to 'deps/rabbitmq_mqtt/test')
-rw-r--r-- | deps/rabbitmq_mqtt/test/shared_SUITE.erl | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index de94bdee7a..6227aa0b57 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -96,6 +96,7 @@ subgroups() -> ,trace ,max_packet_size_unauthenticated ,default_queue_type + ,incoming_message_interceptors ]} ]}, {cluster_size_3, [], @@ -1424,6 +1425,36 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). +incoming_message_interceptors(Config) -> + Key = {rabbit, ?FUNCTION_NAME}, + ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + Ch = rabbit_ct_client_helpers:open_channel(Config), + Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME), + declare_queue(Ch, QName, []), + bind(Ch, QName, Topic), + C = connect(ClientId, Config), + ok = emqtt:publish(C, Topic, Payload), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-mqtt-publish-qos">>, byte, 0}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))), + + delete_queue(Ch, QName), + true = rpc(Config, persistent_term, erase, [Key]), + ok = emqtt:disconnect(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- |