diff options
Diffstat (limited to 'deps/rabbit/src')
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_message_interceptor.erl | 66 |
3 files changed, 77 insertions, 8 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 3e6e705a55..1269c77a97 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1646,18 +1646,20 @@ ensure_working_fhc() -> %% should be placed into persistent_term for efficiency. persist_static_configuration() -> persist_static_configuration( - [{rabbit, classic_queue_index_v2_segment_entry_count}, - {rabbit, classic_queue_store_v2_max_cache_size}, - {rabbit, classic_queue_store_v2_check_crc32} + [classic_queue_index_v2_segment_entry_count, + classic_queue_store_v2_max_cache_size, + classic_queue_store_v2_check_crc32, + incoming_message_interceptors ]). -persist_static_configuration(AppParams) -> +persist_static_configuration(Params) -> + App = ?MODULE, lists:foreach( - fun(Key = {App, Param}) -> + fun(Param) -> case application:get_env(App, Param) of {ok, Value} -> - ok = persistent_term:put(Key, Value); + ok = persistent_term:put({App, Param}, Value); undefined -> ok end - end, AppParams). + end, Params). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 666d6e88d6..a78b8e1c08 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = #content {properties = Props} = + DecodedContent0 = #content {properties = Props} = maybe_set_fast_reply_to( rabbit_binary_parser:ensure_content_decoded(Content), State), + DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl new file mode 100644 index 0000000000..66ce391fe2 --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -0,0 +1,66 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. + +%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp +%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can +%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols that +%% do not use rabbit_channel to also add headers to incoming messages. +-module(rabbit_message_interceptor). + +-export([intercept/1]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-spec intercept(Content) -> + Content when Content :: rabbit_types:decoded_content(). +intercept(Content) -> + Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []), + lists:foldl(fun(I, C) -> + intercept(C, I) + end, Content, Interceptors). + +intercept(Content, {set_header_routing_node, Overwrite}) -> + %% Binary node name could be placed into persistent_term for efficiency. + Node = atom_to_binary(node()), + set_header(Content, ?HEADER_ROUTING_NODE, longstr, Node, Overwrite); +intercept(Content0, {set_header_timestamp, Overwrite}) -> + NowMs = os:system_time(millisecond), + NowSecs = NowMs div 1_000, + Content = set_header(Content0, ?HEADER_TIMESTAMP, long, NowMs, Overwrite), + set_property_timestamp(Content, NowSecs, Overwrite). + +-spec set_header(Content, binary(), rabbit_framing:amqp_field_type(), + rabbit_framing:amqp_value(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}}, + Key, Type, Value, Overwrite) -> + case {rabbit_basic:header(Key, Headers0), Overwrite} of + {Val, false} when Val =/= undefined -> + Content; + _ -> + Headers1 = if Headers0 =:= undefined -> []; + true -> Headers0 + end, + Headers = rabbit_misc:set_table_value(Headers1, Key, Type, Value), + Content#content{properties = Props#'P_basic'{headers = Headers}, + properties_bin = none} + end. + +-spec set_property_timestamp(Content, pos_integer(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}}, + Timestamp, Overwrite) -> + case {Ts, Overwrite} of + {Secs, false} when is_integer(Secs) -> + Content; + _ -> + Content#content{properties = Props#'P_basic'{timestamp = Timestamp}, + properties_bin = none} + end. |