summaryrefslogtreecommitdiff
path: root/src/rabbit_basic.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_basic.erl')
-rw-r--r--src/rabbit_basic.erl354
1 files changed, 0 insertions, 354 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
deleted file mode 100644
index cdc9e082e4..0000000000
--- a/src/rabbit_basic.erl
+++ /dev/null
@@ -1,354 +0,0 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
-%% License, v. 2.0. If a copy of the MPL was not distributed with this
-%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
-%%
-%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
--module(rabbit_basic).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
-
--export([publish/4, publish/5, publish/1,
- message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
- header_routes/1, parse_expiration/1, header/2, header/3]).
--export([build_content/2, from_content/1, msg_size/1,
- maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
--export([add_header/4,
- peek_fmt_message/1]).
-
-%%----------------------------------------------------------------------------
-
--type properties_input() ::
- rabbit_framing:amqp_property_record() | [{atom(), any()}].
--type publish_result() ::
- ok | rabbit_types:error('not_found').
--type header() :: any().
--type headers() :: rabbit_framing:amqp_table() | 'undefined'.
-
--type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name().
--type body_input() :: binary() | [binary()].
-
-%%----------------------------------------------------------------------------
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-
--spec publish
- (exchange_input(), rabbit_router:routing_key(), properties_input(),
- body_input()) ->
- publish_result().
-
-publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, Properties, Body).
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-
--spec publish
- (exchange_input(), rabbit_router:routing_key(), boolean(),
- properties_input(), body_input()) ->
- publish_result().
-
-publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
- Message = message(XName, RKey, properties(Props), Body),
- publish(X, delivery(Mandatory, false, Message, undefined));
-publish(XName, RKey, Mandatory, Props, Body) ->
- Message = message(XName, RKey, properties(Props), Body),
- publish(delivery(Mandatory, false, Message, undefined)).
-
--spec publish(rabbit_types:delivery()) -> publish_result().
-
-publish(Delivery = #delivery{
- message = #basic_message{exchange_name = XName}}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, Delivery);
- Err -> Err
- end.
-
-publish(X, Delivery) ->
- Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
- _ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
- ok.
-
--spec delivery
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
- rabbit_types:delivery().
-
-delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
- message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
-
--spec build_content
- (rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
- rabbit_types:content().
-
-build_content(Properties, BodyBin) when is_binary(BodyBin) ->
- build_content(Properties, [BodyBin]);
-
-build_content(Properties, PFR) ->
- %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
- {ClassId, _MethodId} =
- rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
- #content{class_id = ClassId,
- properties = Properties,
- properties_bin = none,
- protocol = none,
- payload_fragments_rev = PFR}.
-
--spec from_content
- (rabbit_types:content()) ->
- {rabbit_framing:amqp_property_record(), binary()}.
-
-from_content(Content) ->
- #content{class_id = ClassId,
- properties = Props,
- payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
- {ClassId, _MethodId} =
- rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
- {Props, list_to_binary(lists:reverse(FragmentsRev))}.
-
-%% This breaks the spec rule forbidding message modification
-strip_header(#content{properties = #'P_basic'{headers = undefined}}
- = DecodedContent, _Key) ->
- DecodedContent;
-strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
- = DecodedContent, Key) ->
- case lists:keysearch(Key, 1, Headers) of
- false -> DecodedContent;
- {value, Found} -> Headers0 = lists:delete(Found, Headers),
- rabbit_binary_generator:clear_encoded_content(
- DecodedContent#content{
- properties = Props#'P_basic'{
- headers = Headers0}})
- end.
-
--spec message
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- rabbit_types:decoded_content()) ->
- rabbit_types:ok_or_error2(rabbit_types:message(), any()).
-
-message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
- try
- {ok, #basic_message{
- exchange_name = XName,
- content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:gen(),
- is_persistent = is_message_persistent(DecodedContent),
- routing_keys = [RoutingKey |
- header_routes(Props#'P_basic'.headers)]}}
- catch
- {error, _Reason} = Error -> Error
- end.
-
--spec message
- (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
- binary()) ->
- rabbit_types:message().
-
-message(XName, RoutingKey, RawProperties, Body) ->
- Properties = properties(RawProperties),
- Content = build_content(Properties, Body),
- {ok, Msg} = message(XName, RoutingKey, Content),
- Msg.
-
--spec properties
- (properties_input()) -> rabbit_framing:amqp_property_record().
-
-properties(P = #'P_basic'{}) ->
- P;
-properties(P) when is_list(P) ->
- %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
- %% i.e. slow. Use the definition of 'P_basic' directly if
- %% possible!
- lists:foldl(fun ({Key, Value}, Acc) ->
- case indexof(record_info(fields, 'P_basic'), Key) of
- 0 -> throw({unknown_basic_property, Key});
- N -> setelement(N + 1, Acc, Value)
- end
- end, #'P_basic'{}, P).
-
--spec prepend_table_header
- (binary(), rabbit_framing:amqp_table(), headers()) -> headers().
-
-prepend_table_header(Name, Info, undefined) ->
- prepend_table_header(Name, Info, []);
-prepend_table_header(Name, Info, Headers) ->
- case rabbit_misc:table_lookup(Headers, Name) of
- {array, Existing} ->
- prepend_table(Name, Info, Existing, Headers);
- undefined ->
- prepend_table(Name, Info, [], Headers);
- Other ->
- Headers2 = prepend_table(Name, Info, [], Headers),
- set_invalid_header(Name, Other, Headers2)
- end.
-
-prepend_table(Name, Info, Prior, Headers) ->
- rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
-
-set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
- case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
- undefined ->
- set_invalid([{Name, array, [Value]}], Headers);
- {table, ExistingHdr} ->
- update_invalid(Name, Value, ExistingHdr, Headers);
- Other ->
- %% somehow the x-invalid-headers header is corrupt
- Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
- set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
- end.
-
-set_invalid(NewHdr, Headers) ->
- rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
-
-update_invalid(Name, Value, ExistingHdr, Header) ->
- Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
- undefined -> [Value];
- {array, Prior} -> [Value | Prior]
- end,
- NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
- set_invalid(NewHdr, Header).
-
--spec header(header(), headers()) -> 'undefined' | any().
-
-header(_Header, undefined) ->
- undefined;
-header(_Header, []) ->
- undefined;
-header(Header, Headers) ->
- header(Header, Headers, undefined).
-
--spec header(header(), headers(), any()) -> 'undefined' | any().
-
-header(Header, Headers, Default) ->
- case lists:keysearch(Header, 1, Headers) of
- false -> Default;
- {value, Val} -> Val
- end.
-
--spec extract_headers(rabbit_types:content()) -> headers().
-
-extract_headers(Content) ->
- #content{properties = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- Headers.
-
-extract_timestamp(Content) ->
- #content{properties = #'P_basic'{timestamp = Timestamp}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- Timestamp.
-
--spec map_headers
- (fun((headers()) -> headers()), rabbit_types:content()) ->
- rabbit_types:content().
-
-map_headers(F, Content) ->
- Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
- #content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
- Headers1 = F(Headers),
- rabbit_binary_generator:clear_encoded_content(
- Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
-
-indexof(L, Element) -> indexof(L, Element, 1).
-
-indexof([], _Element, _N) -> 0;
-indexof([Element | _Rest], Element, N) -> N;
-indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
-
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> throw({error, {delivery_mode_unknown, Other}})
- end.
-
-%% Extract CC routes from headers
-
--spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
-
-header_routes(undefined) ->
- [];
-header_routes(HeadersTable) ->
- lists:append(
- [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
- {array, Routes} -> [Route || {longstr, Route} <- Routes];
- undefined -> [];
- {Type, _Val} -> throw({error, {unacceptable_type_in_header,
- binary_to_list(HeaderKey), Type}})
- end || HeaderKey <- ?ROUTING_HEADERS]).
-
--spec parse_expiration
- (rabbit_framing:amqp_property_record()) ->
- rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
-
-parse_expiration(#'P_basic'{expiration = undefined}) ->
- {ok, undefined};
-parse_expiration(#'P_basic'{expiration = Expiration}) ->
- case string:to_integer(binary_to_list(Expiration)) of
- {error, no_integer} = E ->
- E;
- {N, ""} ->
- case rabbit_misc:check_expiry(N) of
- ok -> {ok, N};
- E = {error, _} -> E
- end;
- {_, S} ->
- {error, {leftover_string, S}}
- end.
-
-maybe_gc_large_msg(Content) ->
- rabbit_writer:maybe_gc_large_msg(Content).
-
-maybe_gc_large_msg(Content, undefined) ->
- rabbit_writer:msg_size(Content);
-maybe_gc_large_msg(Content, GCThreshold) ->
- rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
-
-msg_size(Content) ->
- rabbit_writer:msg_size(Content).
-
-add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
- Content = rabbit_basic:map_headers(
- fun(undefined) ->
- rabbit_misc:set_table_value([], Name, Type, Value);
- (Headers) ->
- rabbit_misc:set_table_value(Headers, Name, Type, Value)
- end, Content0),
- Msg#basic_message{content = Content}.
-
-peek_fmt_message(#basic_message{exchange_name = Ex,
- routing_keys = RKeys,
- content =
- #content{payload_fragments_rev = Payl0,
- properties = Props}}) ->
- Fields = [atom_to_binary(F, utf8) || F <- record_info(fields, 'P_basic')],
- T = lists:zip(Fields, tl(tuple_to_list(Props))),
- lists:foldl(
- fun ({<<"headers">>, Hdrs}, Acc) ->
- case Hdrs of
- [] ->
- Acc;
- _ ->
- Acc ++ [{header_key(H), V} || {H, _T, V} <- Hdrs]
- end;
- ({_, undefined}, Acc) ->
- Acc;
- (KV, Acc) ->
- [KV | Acc]
- end, [], [{<<"payload (max 64 bytes)">>,
- %% restric payload to 64 bytes
- binary_prefix_64(iolist_to_binary(lists:reverse(Payl0)), 64)},
- {<<"exchange">>, Ex#resource.name},
- {<<"routing_keys">>, RKeys} | T]).
-
-header_key(A) ->
- <<"header.", A/binary>>.
-
-binary_prefix_64(Bin, Len) ->
- binary:part(Bin, 0, min(byte_size(Bin), Len)).