diff options
-rw-r--r-- | deps/rabbit/BUILD.bazel | 5 | ||||
-rw-r--r-- | deps/rabbit/app.bzl | 12 | ||||
-rw-r--r-- | deps/rabbit/priv/schema/rabbit.schema | 27 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_message_interceptor.erl | 66 | ||||
-rw-r--r-- | deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets | 22 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_message_interceptor_SUITE.erl | 112 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 15 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/shared_SUITE.erl | 31 | ||||
-rwxr-xr-x | moduleindex.yaml | 1 |
11 files changed, 295 insertions, 15 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 000e00d7d7..f7476b222d 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -499,6 +499,11 @@ rabbitmq_integration_suite( ) rabbitmq_integration_suite( + name = "rabbit_message_interceptor_SUITE", + size = "medium", +) + +rabbitmq_integration_suite( name = "message_size_limit_SUITE", size = "medium", ) diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 950cdd31e8..5ab5a71ca7 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -139,6 +139,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -379,6 +380,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -635,6 +637,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -1936,3 +1939,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbit", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "rabbit_message_interceptor_SUITE_beam_files", + testonly = True, + srcs = ["test/rabbit_message_interceptor_SUITE.erl"], + outs = ["test/rabbit_message_interceptor_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 5024376213..b49f673629 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2504,6 +2504,33 @@ end}. end }. +%% +%% Message interceptors +%% +{mapping, "incoming_message_interceptors.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ + {datatype, {enum, [true, false]}}]}. + +{translation, "rabbit.incoming_message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("incoming_message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + [begin + Interceptor = list_to_atom(Interceptor0), + case lists:member(Interceptor, [set_header_timestamp, + set_header_routing_node]) of + true -> + {Interceptor, Overwrite}; + false -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) + end + end || {["incoming_message_interceptors", Interceptor0, "overwrite"], Overwrite} <- L, + is_list(Interceptor0) andalso is_boolean(Overwrite)] + end + end +}. + % =============================== % Validators % =============================== diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 3e6e705a55..1269c77a97 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1646,18 +1646,20 @@ ensure_working_fhc() -> %% should be placed into persistent_term for efficiency. persist_static_configuration() -> persist_static_configuration( - [{rabbit, classic_queue_index_v2_segment_entry_count}, - {rabbit, classic_queue_store_v2_max_cache_size}, - {rabbit, classic_queue_store_v2_check_crc32} + [classic_queue_index_v2_segment_entry_count, + classic_queue_store_v2_max_cache_size, + classic_queue_store_v2_check_crc32, + incoming_message_interceptors ]). -persist_static_configuration(AppParams) -> +persist_static_configuration(Params) -> + App = ?MODULE, lists:foreach( - fun(Key = {App, Param}) -> + fun(Param) -> case application:get_env(App, Param) of {ok, Value} -> - ok = persistent_term:put(Key, Value); + ok = persistent_term:put({App, Param}, Value); undefined -> ok end - end, AppParams). + end, Params). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 666d6e88d6..a78b8e1c08 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = #content {properties = Props} = + DecodedContent0 = #content {properties = Props} = maybe_set_fast_reply_to( rabbit_binary_parser:ensure_content_decoded(Content), State), + DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl new file mode 100644 index 0000000000..66ce391fe2 --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -0,0 +1,66 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. + +%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp +%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can +%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols that +%% do not use rabbit_channel to also add headers to incoming messages. +-module(rabbit_message_interceptor). + +-export([intercept/1]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-spec intercept(Content) -> + Content when Content :: rabbit_types:decoded_content(). +intercept(Content) -> + Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []), + lists:foldl(fun(I, C) -> + intercept(C, I) + end, Content, Interceptors). + +intercept(Content, {set_header_routing_node, Overwrite}) -> + %% Binary node name could be placed into persistent_term for efficiency. + Node = atom_to_binary(node()), + set_header(Content, ?HEADER_ROUTING_NODE, longstr, Node, Overwrite); +intercept(Content0, {set_header_timestamp, Overwrite}) -> + NowMs = os:system_time(millisecond), + NowSecs = NowMs div 1_000, + Content = set_header(Content0, ?HEADER_TIMESTAMP, long, NowMs, Overwrite), + set_property_timestamp(Content, NowSecs, Overwrite). + +-spec set_header(Content, binary(), rabbit_framing:amqp_field_type(), + rabbit_framing:amqp_value(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}}, + Key, Type, Value, Overwrite) -> + case {rabbit_basic:header(Key, Headers0), Overwrite} of + {Val, false} when Val =/= undefined -> + Content; + _ -> + Headers1 = if Headers0 =:= undefined -> []; + true -> Headers0 + end, + Headers = rabbit_misc:set_table_value(Headers1, Key, Type, Value), + Content#content{properties = Props#'P_basic'{headers = Headers}, + properties_bin = none} + end. + +-spec set_property_timestamp(Content, pos_integer(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}}, + Timestamp, Overwrite) -> + case {Ts, Overwrite} of + {Secs, false} when is_integer(Secs) -> + Content; + _ -> + Content#content{properties = Props#'P_basic'{timestamp = Timestamp}, + properties_bin = none} + end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index b9444eed47..2e6fae9aa5 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -925,6 +925,28 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {permitted_deprecated_features, #{classic_mirrored_queues => false}} ]}], + []}, + + %% + %% Message interceptors + %% + + {message_interceptors, + "incoming_message_interceptors.set_header_timestamp.overwrite = true", + [{rabbit, [ + {incoming_message_interceptors, [{set_header_timestamp, true}]} + ]}], + []}, + + {message_interceptors, + " + incoming_message_interceptors.set_header_routing_node.overwrite = false + incoming_message_interceptors.set_header_timestamp.overwrite = false + ", + [{rabbit, [ + {incoming_message_interceptors, [{set_header_routing_node, false}, + {set_header_timestamp, false}]} + ]}], []} ]. diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl new file mode 100644 index 0000000000..db70c8e45f --- /dev/null +++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl @@ -0,0 +1,112 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved. + +-module(rabbit_message_interceptor_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile([nowarn_export_all, export_all]). + +-import(rabbit_ct_helpers, [eventually/1]). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [shuffle], [headers_overwrite, + headers_no_overwrite + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config0) -> + Config1 = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodename_suffix, Testcase}]), + Overwrite = case Testcase of + headers_overwrite -> true; + headers_no_overwrite -> false + end, + Val = maps:to_list( + maps:from_keys([set_header_timestamp, + set_header_routing_node], + Overwrite)), + Config = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{incoming_message_interceptors, Val}]}), + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config0) -> + Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase), + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +headers_overwrite(Config) -> + headers(true, Config). + +headers_no_overwrite(Config) -> + headers(false, Config). + +headers(Overwrite, Config) -> + Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), + Payload = QName = atom_to_binary(?FUNCTION_NAME), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload}), + AssertHeaders = + fun() -> + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-routed-by">>, longstr, Server}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))) + end, + AssertHeaders(), + + Msg = #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = 1, + headers = [{<<"timestamp_in_ms">>, long, 1000}, + {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}] + }}, + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg), + case Overwrite of + true -> + AssertHeaders(); + false -> + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, Msg}, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))) + end, + + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 48bc67d45b..7810dc1fc7 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1082,13 +1082,14 @@ publish_to_queues( headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}], delivery_mode = delivery_mode(Qos)}, {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), - Content = #content{ - class_id = ClassId, - properties = Props, - properties_bin = none, - protocol = none, - payload_fragments_rev = [Payload] - }, + Content0 = #content{ + class_id = ClassId, + properties = Props, + properties_bin = none, + protocol = none, + payload_fragments_rev = [Payload] + }, + Content = rabbit_message_interceptor:intercept(Content0), BasicMessage = #basic_message{ exchange_name = ExchangeName, routing_keys = [RoutingKey], diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index de94bdee7a..6227aa0b57 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -96,6 +96,7 @@ subgroups() -> ,trace ,max_packet_size_unauthenticated ,default_queue_type + ,incoming_message_interceptors ]} ]}, {cluster_size_3, [], @@ -1424,6 +1425,36 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). +incoming_message_interceptors(Config) -> + Key = {rabbit, ?FUNCTION_NAME}, + ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + Ch = rabbit_ct_client_helpers:open_channel(Config), + Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME), + declare_queue(Ch, QName, []), + bind(Ch, QName, Topic), + C = connect(ClientId, Config), + ok = emqtt:publish(C, Topic, Payload), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-mqtt-publish-qos">>, byte, 0}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))), + + delete_queue(Ch, QName), + true = rpc(Config, persistent_term, erase, [Key]), + ok = emqtt:disconnect(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- diff --git a/moduleindex.yaml b/moduleindex.yaml index 8d9b7c854d..a4880c091d 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -575,6 +575,7 @@ rabbit: - rabbit_looking_glass - rabbit_maintenance - rabbit_memory_monitor +- rabbit_message_interceptor - rabbit_metrics - rabbit_mirror_queue_coordinator - rabbit_mirror_queue_master |