summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl27
-rw-r--r--src/rabbit_misc.erl10
2 files changed, 28 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e0472b6442..019280f17a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
unconfirmed,
blocked_op,
queue_monitors,
- dlx
+ dlx,
+ dlx_routing_key
}).
-record(consumer, {tag, ack_required}).
@@ -135,6 +136,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
dlx = undefined,
+ dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
blocked_op = undefined,
@@ -233,7 +235,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State, [{<<"x-expires">>, fun init_expires/2},
{<<"x-message-ttl">>, fun init_ttl/2},
- {<<"x-dead-letter-exchange">>, fun init_dlx/2}]).
+ {<<"x-dead-letter-exchange">>, fun init_dlx/2},
+ {<<"x-dead-letter-routing-key">>,
+ fun init_dlx_routing_key/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -243,6 +247,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{
virtual_host = VHostPath}}}) ->
State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}.
+init_dlx_routing_key(RoutingKey, State) ->
+ State#q{dlx_routing_key = RoutingKey}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -830,7 +837,7 @@ make_dead_letter_msg(DLX, Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State) ->
+ State = #q{dlx_routing_key = DlxRoutingKey}) ->
Content1 = #content{
properties = Props = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -846,9 +853,8 @@ make_dead_letter_msg(DLX, Reason,
{<<"time">>, longstr,
list_to_binary(httpd_util:rfc1123_date())},
{<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-key">>, array,
+ {<<"routing-keys">>, array,
[{longstr, Key} || Key <- RoutingKeys1]}]},
-
Headers1 =
case Headers of
undefined ->
@@ -862,12 +868,17 @@ make_dead_letter_msg(DLX, Reason,
Headers, <<"x-death">>, array, [DeathTable | Prior])
end
end,
+ {DeathRoutingKeys, Headers2} =
+ case DlxRoutingKey of
+ undefined -> {RoutingKeys, Headers1};
+ _ -> {[DlxRoutingKey],
+ rabbit_misc:remove_table_value(Headers1, <<"CC">>)}
+ end,
Content2 =
rabbit_binary_generator:clear_encoded_content(
- Content1#content{properties = Props#'P_basic'{headers = Headers1}}),
-
+ Content1#content{properties = Props#'P_basic'{headers = Headers2}}),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:guid(),
- content = Content2}.
+ routing_keys = DeathRoutingKeys, content = Content2}.
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d1006c0e0a..a9a9a4c418 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -23,7 +23,7 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/2, set_table_value/4]).
+-export([table_lookup/2, set_table_value/4, remove_table_value/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -109,6 +109,8 @@
(rabbit_framing:amqp_table(), binary(),
rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
-> rabbit_framing:amqp_table()).
+-spec(remove_table_value/2 ::
+ (rabbit_framing:amqp_table(), binary()) -> rabbit_framing:amqp_table()).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
@@ -298,6 +300,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
+remove_table_value(Table, Key) ->
+ case lists:keytake(Key, 1, Table) of
+ false -> Table;
+ {value, _, Table2} -> Table2
+ end.
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};