diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
commit | f23a51261d9502ec39df0f8db47ba6b22aa7659f (patch) | |
tree | 53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbit/src/rabbit_basic.erl | |
parent | afa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff) | |
parent | 9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff) | |
download | rabbitmq-server-git-stream-timestamp-offset.tar.gz |
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbit/src/rabbit_basic.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_basic.erl | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_basic.erl b/deps/rabbit/src/rabbit_basic.erl new file mode 100644 index 0000000000..cdc9e082e4 --- /dev/null +++ b/deps/rabbit/src/rabbit_basic.erl @@ -0,0 +1,354 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/4, publish/5, publish/1, + message/3, message/4, properties/1, prepend_table_header/3, + extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, + header_routes/1, parse_expiration/1, header/2, header/3]). +-export([build_content/2, from_content/1, msg_size/1, + maybe_gc_large_msg/1, maybe_gc_large_msg/2]). +-export([add_header/4, + peek_fmt_message/1]). + +%%---------------------------------------------------------------------------- + +-type properties_input() :: + rabbit_framing:amqp_property_record() | [{atom(), any()}]. +-type publish_result() :: + ok | rabbit_types:error('not_found'). +-type header() :: any(). +-type headers() :: rabbit_framing:amqp_table() | 'undefined'. + +-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name(). +-type body_input() :: binary() | [binary()]. + +%%---------------------------------------------------------------------------- + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. + +-spec publish + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> + publish_result(). + +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, Properties, Body). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. + +-spec publish + (exchange_input(), rabbit_router:routing_key(), boolean(), + properties_input(), body_input()) -> + publish_result(). + +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, false, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, false, Message, undefined)). + +-spec publish(rabbit_types:delivery()) -> publish_result(). + +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = XName}}) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> publish(X, Delivery); + Err -> Err + end. + +publish(X, Delivery) -> + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), + _ = rabbit_queue_type:deliver(Qs, Delivery, stateless), + ok. + +-spec delivery + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery(). + +delivery(Mandatory, Confirm, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. + +-spec build_content + (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> + rabbit_types:content(). + +build_content(Properties, BodyBin) when is_binary(BodyBin) -> + build_content(Properties, [BodyBin]); + +build_content(Properties, PFR) -> + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), + #content{class_id = ClassId, + properties = Properties, + properties_bin = none, + protocol = none, + payload_fragments_rev = PFR}. + +-spec from_content + (rabbit_types:content()) -> + {rabbit_framing:amqp_property_record(), binary()}. + +from_content(Content) -> + #content{class_id = ClassId, + properties = Props, + payload_fragments_rev = FragmentsRev} = + rabbit_binary_parser:ensure_content_decoded(Content), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), + {Props, list_to_binary(lists:reverse(FragmentsRev))}. + +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any()). + +message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = XName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + id = rabbit_guid:gen(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), + binary()) -> + rabbit_types:message(). + +message(XName, RoutingKey, RawProperties, Body) -> + Properties = properties(RawProperties), + Content = build_content(Properties, Body), + {ok, Msg} = message(XName, RoutingKey, Content), + Msg. + +-spec properties + (properties_input()) -> rabbit_framing:amqp_property_record(). + +properties(P = #'P_basic'{}) -> + P; +properties(P) when is_list(P) -> + %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2), + %% i.e. slow. Use the definition of 'P_basic' directly if + %% possible! + lists:foldl(fun ({Key, Value}, Acc) -> + case indexof(record_info(fields, 'P_basic'), Key) of + 0 -> throw({unknown_basic_property, Key}); + N -> setelement(N + 1, Acc, Value) + end + end, #'P_basic'{}, P). + +-spec prepend_table_header + (binary(), rabbit_framing:amqp_table(), headers()) -> headers(). + +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> + rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). + +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + +-spec header(header(), headers()) -> 'undefined' | any(). + +header(_Header, undefined) -> + undefined; +header(_Header, []) -> + undefined; +header(Header, Headers) -> + header(Header, Headers, undefined). + +-spec header(header(), headers(), any()) -> 'undefined' | any(). + +header(Header, Headers, Default) -> + case lists:keysearch(Header, 1, Headers) of + false -> Default; + {value, Val} -> Val + end. + +-spec extract_headers(rabbit_types:content()) -> headers(). + +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + +extract_timestamp(Content) -> + #content{properties = #'P_basic'{timestamp = Timestamp}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Timestamp. + +-spec map_headers + (fun((headers()) -> headers()), rabbit_types:content()) -> + rabbit_types:content(). + +map_headers(F, Content) -> + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, + Headers1 = F(Headers), + rabbit_binary_generator:clear_encoded_content( + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). + +indexof(L, Element) -> indexof(L, Element, 1). + +indexof([], _Element, _N) -> 0; +indexof([Element | _Rest], Element, N) -> N; +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> throw({error, {delivery_mode_unknown, Other}}) + end. + +%% Extract CC routes from headers + +-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()]. + +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + binary_to_list(HeaderKey), Type}}) + end || HeaderKey <- ?ROUTING_HEADERS]). + +-spec parse_expiration + (rabbit_framing:amqp_property_record()) -> + rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()). + +parse_expiration(#'P_basic'{expiration = undefined}) -> + {ok, undefined}; +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case string:to_integer(binary_to_list(Expiration)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end. + +maybe_gc_large_msg(Content) -> + rabbit_writer:maybe_gc_large_msg(Content). + +maybe_gc_large_msg(Content, undefined) -> + rabbit_writer:msg_size(Content); +maybe_gc_large_msg(Content, GCThreshold) -> + rabbit_writer:maybe_gc_large_msg(Content, GCThreshold). + +msg_size(Content) -> + rabbit_writer:msg_size(Content). + +add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> + Content = rabbit_basic:map_headers( + fun(undefined) -> + rabbit_misc:set_table_value([], Name, Type, Value); + (Headers) -> + rabbit_misc:set_table_value(Headers, Name, Type, Value) + end, Content0), + Msg#basic_message{content = Content}. + +peek_fmt_message(#basic_message{exchange_name = Ex, + routing_keys = RKeys, + content = + #content{payload_fragments_rev = Payl0, + properties = Props}}) -> + Fields = [atom_to_binary(F, utf8) || F <- record_info(fields, 'P_basic')], + T = lists:zip(Fields, tl(tuple_to_list(Props))), + lists:foldl( + fun ({<<"headers">>, Hdrs}, Acc) -> + case Hdrs of + [] -> + Acc; + _ -> + Acc ++ [{header_key(H), V} || {H, _T, V} <- Hdrs] + end; + ({_, undefined}, Acc) -> + Acc; + (KV, Acc) -> + [KV | Acc] + end, [], [{<<"payload (max 64 bytes)">>, + %% restric payload to 64 bytes + binary_prefix_64(iolist_to_binary(lists:reverse(Payl0)), 64)}, + {<<"exchange">>, Ex#resource.name}, + {<<"routing_keys">>, RKeys} | T]). + +header_key(A) -> + <<"header.", A/binary>>. + +binary_prefix_64(Bin, Len) -> + binary:part(Bin, 0, min(byte_size(Bin), Len)). |