diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 10 |
2 files changed, 28 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e0472b6442..019280f17a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ unconfirmed, blocked_op, queue_monitors, - dlx + dlx, + dlx_routing_key }). -record(consumer, {tag, ack_required}). @@ -135,6 +136,7 @@ init(Q) -> expiry_timer_ref = undefined, ttl = undefined, dlx = undefined, + dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), blocked_op = undefined, @@ -233,7 +235,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end end, State, [{<<"x-expires">>, fun init_expires/2}, {<<"x-message-ttl">>, fun init_ttl/2}, - {<<"x-dead-letter-exchange">>, fun init_dlx/2}]). + {<<"x-dead-letter-exchange">>, fun init_dlx/2}, + {<<"x-dead-letter-routing-key">>, + fun init_dlx_routing_key/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -243,6 +247,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{ virtual_host = VHostPath}}}) -> State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}. +init_dlx_routing_key(RoutingKey, State) -> + State#q{dlx_routing_key = RoutingKey}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -830,7 +837,7 @@ make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State) -> + State = #q{dlx_routing_key = DlxRoutingKey}) -> Content1 = #content{ properties = Props = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -846,9 +853,8 @@ make_dead_letter_msg(DLX, Reason, {<<"time">>, longstr, list_to_binary(httpd_util:rfc1123_date())}, {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-key">>, array, + {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys1]}]}, - Headers1 = case Headers of undefined -> @@ -862,12 +868,17 @@ make_dead_letter_msg(DLX, Reason, Headers, <<"x-death">>, array, [DeathTable | Prior]) end end, + {DeathRoutingKeys, Headers2} = + case DlxRoutingKey of + undefined -> {RoutingKeys, Headers1}; + _ -> {[DlxRoutingKey], + rabbit_misc:remove_table_value(Headers1, <<"CC">>)} + end, Content2 = rabbit_binary_generator:clear_encoded_content( - Content1#content{properties = Props#'P_basic'{headers = Headers1}}), - + Content1#content{properties = Props#'P_basic'{headers = Headers2}}), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:guid(), - content = Content2}. + routing_keys = DeathRoutingKeys, content = Content2}. now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d1006c0e0a..a9a9a4c418 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -23,7 +23,7 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/2, set_table_value/4]). +-export([table_lookup/2, set_table_value/4, remove_table_value/2]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -109,6 +109,8 @@ (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) -> rabbit_framing:amqp_table()). +-spec(remove_table_value/2 :: + (rabbit_framing:amqp_table(), binary()) -> rabbit_framing:amqp_table()). -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') @@ -298,6 +300,12 @@ set_table_value(Table, Key, Type, Value) -> sort_field_table( lists:keystore(Key, 1, Table, {Key, Type, Value})). +remove_table_value(Table, Key) -> + case lists:keytake(Key, 1, Table) of + false -> Table; + {value, _, Table2} -> Table2 + end. + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; |
