summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2023-05-12 14:12:50 +0000
committerDavid Ansari <david.ansari@gmx.de>2023-05-12 17:44:59 +0000
commit4ec930f729fbd4700810a74986c3728408d3e18d (patch)
tree30721c1bdebdd12a34fb2b74d6b34401d958dc48
parent316251a8d688d2e90a81d6b4ad11308566e61dff (diff)
downloadrabbitmq-server-git-message-interceptor.tar.gz
Move plugin rabbitmq-message-timestamp to the coremessage-interceptor
As reported in https://groups.google.com/g/rabbitmq-users/c/x8ACs4dBlkI/ plugins that implement rabbit_channel_interceptor break with Native MQTT in 3.12 because Native MQTT does not use rabbit_channel anymore. Specifically, these plugins don't work anymore in 3.12 when sending a message from an MQTT publisher to an AMQP 0.9.1 consumer. Two of these plugins are https://github.com/rabbitmq/rabbitmq-message-timestamp and https://github.com/rabbitmq/rabbitmq-routing-node-stamp This commit moves both plugins into rabbitmq-server. Therefore, these plugins are deprecated starting in 3.12. Instead of using these plugins, the user gets the same behaviour by configuring rabbitmq.conf as follows: ``` incoming_message_interceptors.set_header_timestamp.overwrite = false incoming_message_interceptors.set_header_routing_node.overwrite = false ``` While both plugins were incompatible to be used together, this commit allows to also set both headers. We name the top level configuration key `incoming_message_interceptors` because only incoming messages are intercepted. Currently, only `set_header_timestamp` and `set_header_routing_node` are supported. (We might support more in the future.) Both can set `overwrite` to `false` or `true`. The meaning of `overwrite` is the same as documented in https://github.com/rabbitmq/rabbitmq-message-timestamp#always-overwrite-timestamps i.e. whether headers should be overwritten if they are already present in the message. Both `set_header_timestamp` and `set_header_routing_node` behave exactly to plugins `rabbitmq-message-timestamp` and `rabbitmq-routing-node-stamp`, respectively. Upon node boot, the configuration is put into persistent_term to not cause any performance penalty in the default case where these settings are disabled. The channel and MQTT connection process will intercept incoming messages and - if configured - add the desired AMQP 0.9.1 headers. For now, this keeps the plugins behaviour when using Native MQTT in 3.12. In the future, once "message containers" are implemented, we can think about more generic message interceptors where plugins can be written to modify arbitrary headers or message contents for various protocols. Likewise, in the future, once MQTT 5.0 is implemented, we can think about an MQTT connection interceptor which could function similar to a `rabbit_channel_interceptor` allowing to modify any MQTT packet.
-rw-r--r--deps/rabbit/BUILD.bazel5
-rw-r--r--deps/rabbit/app.bzl12
-rw-r--r--deps/rabbit/priv/schema/rabbit.schema27
-rw-r--r--deps/rabbit/src/rabbit.erl16
-rw-r--r--deps/rabbit/src/rabbit_channel.erl3
-rw-r--r--deps/rabbit/src/rabbit_message_interceptor.erl66
-rw-r--r--deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets22
-rw-r--r--deps/rabbit/test/rabbit_message_interceptor_SUITE.erl112
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl15
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl31
-rwxr-xr-xmoduleindex.yaml1
11 files changed, 295 insertions, 15 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 000e00d7d7..f7476b222d 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -499,6 +499,11 @@ rabbitmq_integration_suite(
)
rabbitmq_integration_suite(
+ name = "rabbit_message_interceptor_SUITE",
+ size = "medium",
+)
+
+rabbitmq_integration_suite(
name = "message_size_limit_SUITE",
size = "medium",
)
diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl
index 950cdd31e8..5ab5a71ca7 100644
--- a/deps/rabbit/app.bzl
+++ b/deps/rabbit/app.bzl
@@ -139,6 +139,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -379,6 +380,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -635,6 +637,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -1936,3 +1939,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
+ erlang_bytecode(
+ name = "rabbit_message_interceptor_SUITE_beam_files",
+ testonly = True,
+ srcs = ["test/rabbit_message_interceptor_SUITE.erl"],
+ outs = ["test/rabbit_message_interceptor_SUITE.beam"],
+ app_name = "rabbit",
+ erlc_opts = "//:test_erlc_opts",
+ deps = ["//deps/amqp_client:erlang_app"],
+ )
diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema
index 5024376213..b49f673629 100644
--- a/deps/rabbit/priv/schema/rabbit.schema
+++ b/deps/rabbit/priv/schema/rabbit.schema
@@ -2504,6 +2504,33 @@ end}.
end
}.
+%%
+%% Message interceptors
+%%
+{mapping, "incoming_message_interceptors.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
+ {datatype, {enum, [true, false]}}]}.
+
+{translation, "rabbit.incoming_message_interceptors",
+ fun(Conf) ->
+ case cuttlefish_variable:filter_by_prefix("incoming_message_interceptors", Conf) of
+ [] ->
+ cuttlefish:unset();
+ L ->
+ [begin
+ Interceptor = list_to_atom(Interceptor0),
+ case lists:member(Interceptor, [set_header_timestamp,
+ set_header_routing_node]) of
+ true ->
+ {Interceptor, Overwrite};
+ false ->
+ cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
+ end
+ end || {["incoming_message_interceptors", Interceptor0, "overwrite"], Overwrite} <- L,
+ is_list(Interceptor0) andalso is_boolean(Overwrite)]
+ end
+ end
+}.
+
% ===============================
% Validators
% ===============================
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 3e6e705a55..1269c77a97 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -1646,18 +1646,20 @@ ensure_working_fhc() ->
%% should be placed into persistent_term for efficiency.
persist_static_configuration() ->
persist_static_configuration(
- [{rabbit, classic_queue_index_v2_segment_entry_count},
- {rabbit, classic_queue_store_v2_max_cache_size},
- {rabbit, classic_queue_store_v2_check_crc32}
+ [classic_queue_index_v2_segment_entry_count,
+ classic_queue_store_v2_max_cache_size,
+ classic_queue_store_v2_check_crc32,
+ incoming_message_interceptors
]).
-persist_static_configuration(AppParams) ->
+persist_static_configuration(Params) ->
+ App = ?MODULE,
lists:foreach(
- fun(Key = {App, Param}) ->
+ fun(Param) ->
case application:get_env(App, Param) of
{ok, Value} ->
- ok = persistent_term:put(Key, Value);
+ ok = persistent_term:put({App, Param}, Value);
undefined ->
ok
end
- end, AppParams).
+ end, Params).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 666d6e88d6..a78b8e1c08 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = #content {properties = Props} =
+ DecodedContent0 = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State),
+ DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl
new file mode 100644
index 0000000000..66ce391fe2
--- /dev/null
+++ b/deps/rabbit/src/rabbit_message_interceptor.erl
@@ -0,0 +1,66 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp
+%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can
+%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols that
+%% do not use rabbit_channel to also add headers to incoming messages.
+-module(rabbit_message_interceptor).
+
+-export([intercept/1]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
+-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
+
+-spec intercept(Content) ->
+ Content when Content :: rabbit_types:decoded_content().
+intercept(Content) ->
+ Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []),
+ lists:foldl(fun(I, C) ->
+ intercept(C, I)
+ end, Content, Interceptors).
+
+intercept(Content, {set_header_routing_node, Overwrite}) ->
+ %% Binary node name could be placed into persistent_term for efficiency.
+ Node = atom_to_binary(node()),
+ set_header(Content, ?HEADER_ROUTING_NODE, longstr, Node, Overwrite);
+intercept(Content0, {set_header_timestamp, Overwrite}) ->
+ NowMs = os:system_time(millisecond),
+ NowSecs = NowMs div 1_000,
+ Content = set_header(Content0, ?HEADER_TIMESTAMP, long, NowMs, Overwrite),
+ set_property_timestamp(Content, NowSecs, Overwrite).
+
+-spec set_header(Content, binary(), rabbit_framing:amqp_field_type(),
+ rabbit_framing:amqp_value(), boolean()) ->
+ Content when Content :: rabbit_types:decoded_content().
+set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}},
+ Key, Type, Value, Overwrite) ->
+ case {rabbit_basic:header(Key, Headers0), Overwrite} of
+ {Val, false} when Val =/= undefined ->
+ Content;
+ _ ->
+ Headers1 = if Headers0 =:= undefined -> [];
+ true -> Headers0
+ end,
+ Headers = rabbit_misc:set_table_value(Headers1, Key, Type, Value),
+ Content#content{properties = Props#'P_basic'{headers = Headers},
+ properties_bin = none}
+ end.
+
+-spec set_property_timestamp(Content, pos_integer(), boolean()) ->
+ Content when Content :: rabbit_types:decoded_content().
+set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}},
+ Timestamp, Overwrite) ->
+ case {Ts, Overwrite} of
+ {Secs, false} when is_integer(Secs) ->
+ Content;
+ _ ->
+ Content#content{properties = Props#'P_basic'{timestamp = Timestamp},
+ properties_bin = none}
+ end.
diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
index b9444eed47..2e6fae9aa5 100644
--- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
+++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
@@ -925,6 +925,28 @@ credential_validator.regexp = ^abc\\d+",
[{rabbit, [
{permitted_deprecated_features, #{classic_mirrored_queues => false}}
]}],
+ []},
+
+ %%
+ %% Message interceptors
+ %%
+
+ {message_interceptors,
+ "incoming_message_interceptors.set_header_timestamp.overwrite = true",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_timestamp, true}]}
+ ]}],
+ []},
+
+ {message_interceptors,
+ "
+ incoming_message_interceptors.set_header_routing_node.overwrite = false
+ incoming_message_interceptors.set_header_timestamp.overwrite = false
+ ",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_routing_node, false},
+ {set_header_timestamp, false}]}
+ ]}],
[]}
].
diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
new file mode 100644
index 0000000000..db70c8e45f
--- /dev/null
+++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
@@ -0,0 +1,112 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+-module(rabbit_message_interceptor_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile([nowarn_export_all, export_all]).
+
+-import(rabbit_ct_helpers, [eventually/1]).
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+groups() ->
+ [
+ {tests, [shuffle], [headers_overwrite,
+ headers_no_overwrite
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config0) ->
+ Config1 = rabbit_ct_helpers:set_config(
+ Config0, [{rmq_nodename_suffix, Testcase}]),
+ Overwrite = case Testcase of
+ headers_overwrite -> true;
+ headers_no_overwrite -> false
+ end,
+ Val = maps:to_list(
+ maps:from_keys([set_header_timestamp,
+ set_header_routing_node],
+ Overwrite)),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
+ rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config0) ->
+ Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase),
+ rabbit_ct_helpers:run_teardown_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+headers_overwrite(Config) ->
+ headers(true, Config).
+
+headers_no_overwrite(Config) ->
+ headers(false, Config).
+
+headers(Overwrite, Config) ->
+ Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
+ Payload = QName = atom_to_binary(?FUNCTION_NAME),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = Payload}),
+ AssertHeaders =
+ fun() ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-routed-by">>, longstr, Server}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+ AssertHeaders(),
+
+ Msg = #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = 1,
+ headers = [{<<"timestamp_in_ms">>, long, 1000},
+ {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}]
+ }},
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg),
+ case Overwrite of
+ true ->
+ AssertHeaders();
+ false ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{}, Msg},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+
+ #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ ok.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 48bc67d45b..7810dc1fc7 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -1082,13 +1082,14 @@ publish_to_queues(
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
delivery_mode = delivery_mode(Qos)},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
- Content = #content{
- class_id = ClassId,
- properties = Props,
- properties_bin = none,
- protocol = none,
- payload_fragments_rev = [Payload]
- },
+ Content0 = #content{
+ class_id = ClassId,
+ properties = Props,
+ properties_bin = none,
+ protocol = none,
+ payload_fragments_rev = [Payload]
+ },
+ Content = rabbit_message_interceptor:intercept(Content0),
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index de94bdee7a..6227aa0b57 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -96,6 +96,7 @@ subgroups() ->
,trace
,max_packet_size_unauthenticated
,default_queue_type
+ ,incoming_message_interceptors
]}
]},
{cluster_size_3, [],
@@ -1424,6 +1425,36 @@ default_queue_type(Config) ->
ok = emqtt:disconnect(C2),
ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost).
+incoming_message_interceptors(Config) ->
+ Key = {rabbit, ?FUNCTION_NAME},
+ ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME),
+ declare_queue(Ch, QName, []),
+ bind(Ch, QName, Topic),
+ C = connect(ClientId, Config),
+ ok = emqtt:publish(C, Topic, Payload),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-mqtt-publish-qos">>, byte, 0}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}))),
+
+ delete_queue(Ch, QName),
+ true = rpc(Config, persistent_term, erase, [Key]),
+ ok = emqtt:disconnect(C).
+
%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
diff --git a/moduleindex.yaml b/moduleindex.yaml
index 8d9b7c854d..a4880c091d 100755
--- a/moduleindex.yaml
+++ b/moduleindex.yaml
@@ -575,6 +575,7 @@ rabbit:
- rabbit_looking_glass
- rabbit_maintenance
- rabbit_memory_monitor
+- rabbit_message_interceptor
- rabbit_metrics
- rabbit_mirror_queue_coordinator
- rabbit_mirror_queue_master