summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl41
-rw-r--r--src/rabbit_basic.erl32
2 files changed, 40 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 951a5008bf..b92a0d3ccb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -896,48 +896,27 @@ make_dead_letter_msg(DLX, Reason,
exchange_name = Exchange,
routing_keys = RoutingKeys},
State = #q{dlx_routing_key = DlxRoutingKey}) ->
- Content1 = #content{
- properties = Props = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
-
+ Headers = rabbit_basic:extract_headers(Content),
#resource{name = QName} = qname(State),
-
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- DeathTable = {table, [{<<"reason">>, longstr,
- list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp,
- rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}]},
- Headers1 =
- case Headers of
- undefined ->
- [{<<"x-death">>, array, [DeathTable]}];
- _ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, Prior} ->
- rabbit_misc:set_table_value(
- Headers, <<"x-death">>, array,
- [DeathTable | Prior]);
- _ ->
- [{<<"x-death">>, array, [DeathTable]} | Headers]
- end
- end,
+ Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array,
+ [{longstr, Key} || Key <- RoutingKeys1]}],
+ Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers),
{DeathRoutingKeys, Headers2} =
case DlxRoutingKey of
undefined -> {RoutingKeys, Headers1};
_ -> {[DlxRoutingKey],
lists:keydelete(<<"CC">>, 1, Headers1)}
end,
- Content2 =
- rabbit_binary_generator:clear_encoded_content(
- Content1#content{properties = Props#'P_basic'{headers = Headers2}}),
+ Content1 = rabbit_basic:replace_headers(Headers2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys, content = Content2}.
+ routing_keys = DeathRoutingKeys, content = Content1}.
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 8fbb39832a..a7b13511fa 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,8 +19,8 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/6, publish/1,
- message/3, message/4, properties/1, delivery/4,
- header_routes/1]).
+ message/3, message/4, properties/1, append_table_header/3,
+ extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -32,6 +32,7 @@
-type(publish_result() ::
({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
@@ -56,6 +57,15 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
+
+-spec(append_table_header/3 ::
+ (binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+
+-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
+
+-spec(replace_headers/2 :: (headers(), rabbit_types:content())
+ -> rabbit_types:content()).
+
-spec(header_routes/1 ::
(undefined | rabbit_framing:amqp_table()) ->
[string()] | rabbit_types:error(any())).
@@ -170,6 +180,24 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
+append_table_header(Name, Info, undefined) ->
+ append_table_header(Name, Info, []);
+append_table_header(Name, Info, Headers) ->
+ Prior = case rabbit_misc:table_lookup(Headers, Name) of
+ undefined -> [];
+ {array, Existing} -> Existing
+ end,
+ rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
+replace_headers(Headers, Content = #content{properties = Props}) ->
+ rabbit_binary_generator:clear_encoded_content(
+ Content#content{properties = Props#'P_basic'{headers = Headers}}).
+
indexof(L, Element) -> indexof(L, Element, 1).
indexof([], _Element, _N) -> 0;