diff options
Diffstat (limited to 'deps/rabbitmq_mqtt')
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 15 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/shared_SUITE.erl | 31 |
2 files changed, 39 insertions, 7 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 48bc67d45b..7810dc1fc7 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1082,13 +1082,14 @@ publish_to_queues( headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}], delivery_mode = delivery_mode(Qos)}, {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), - Content = #content{ - class_id = ClassId, - properties = Props, - properties_bin = none, - protocol = none, - payload_fragments_rev = [Payload] - }, + Content0 = #content{ + class_id = ClassId, + properties = Props, + properties_bin = none, + protocol = none, + payload_fragments_rev = [Payload] + }, + Content = rabbit_message_interceptor:intercept(Content0), BasicMessage = #basic_message{ exchange_name = ExchangeName, routing_keys = [RoutingKey], diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index de94bdee7a..6227aa0b57 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -96,6 +96,7 @@ subgroups() -> ,trace ,max_packet_size_unauthenticated ,default_queue_type + ,incoming_message_interceptors ]} ]}, {cluster_size_3, [], @@ -1424,6 +1425,36 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). +incoming_message_interceptors(Config) -> + Key = {rabbit, ?FUNCTION_NAME}, + ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + Ch = rabbit_ct_client_helpers:open_channel(Config), + Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME), + declare_queue(Ch, QName, []), + bind(Ch, QName, Topic), + C = connect(ClientId, Config), + ok = emqtt:publish(C, Topic, Payload), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-mqtt-publish-qos">>, byte, 0}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))), + + delete_queue(Ch, QName), + true = rpc(Config, persistent_term, erase, [Key]), + ok = emqtt:disconnect(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- |