diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_message_interceptor.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_message_interceptor.erl | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl new file mode 100644 index 0000000000..66ce391fe2 --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -0,0 +1,66 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. + +%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp +%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can +%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols that +%% do not use rabbit_channel to also add headers to incoming messages. +-module(rabbit_message_interceptor). + +-export([intercept/1]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-spec intercept(Content) -> + Content when Content :: rabbit_types:decoded_content(). +intercept(Content) -> + Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []), + lists:foldl(fun(I, C) -> + intercept(C, I) + end, Content, Interceptors). + +intercept(Content, {set_header_routing_node, Overwrite}) -> + %% Binary node name could be placed into persistent_term for efficiency. + Node = atom_to_binary(node()), + set_header(Content, ?HEADER_ROUTING_NODE, longstr, Node, Overwrite); +intercept(Content0, {set_header_timestamp, Overwrite}) -> + NowMs = os:system_time(millisecond), + NowSecs = NowMs div 1_000, + Content = set_header(Content0, ?HEADER_TIMESTAMP, long, NowMs, Overwrite), + set_property_timestamp(Content, NowSecs, Overwrite). + +-spec set_header(Content, binary(), rabbit_framing:amqp_field_type(), + rabbit_framing:amqp_value(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}}, + Key, Type, Value, Overwrite) -> + case {rabbit_basic:header(Key, Headers0), Overwrite} of + {Val, false} when Val =/= undefined -> + Content; + _ -> + Headers1 = if Headers0 =:= undefined -> []; + true -> Headers0 + end, + Headers = rabbit_misc:set_table_value(Headers1, Key, Type, Value), + Content#content{properties = Props#'P_basic'{headers = Headers}, + properties_bin = none} + end. + +-spec set_property_timestamp(Content, pos_integer(), boolean()) -> + Content when Content :: rabbit_types:decoded_content(). +set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}}, + Timestamp, Overwrite) -> + case {Ts, Overwrite} of + {Secs, false} when is_integer(Secs) -> + Content; + _ -> + Content#content{properties = Props#'P_basic'{timestamp = Timestamp}, + properties_bin = none} + end. |