summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2015-03-26 12:25:42 +0100
committerJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2015-03-26 12:25:42 +0100
commitc4c4dc00acf3375eee5dfdb572d27e0a91f75061 (patch)
tree6f65d2b31a259fdc51ff2e37b9b0b011e0597e99 /src
parente0cd6f11440b48f284494c2752c7a3fb081beebf (diff)
parent47ef081300486551ebc07954b21bb0e329a12058 (diff)
downloadrabbitmq-server-git-c4c4dc00acf3375eee5dfdb572d27e0a91f75061.tar.gz
Merge pull request #79 from rabbitmq/rabbitmq-server-78
Limit x-death growth to one entry per queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl21
-rw-r--r--src/rabbit_dead_letter.erl40
2 files changed, 58 insertions, 3 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index cd2846c00b..1cb6bef4ab 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -21,7 +21,7 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/4, header_routes/1,
- parse_expiration/1]).
+ parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
%%----------------------------------------------------------------------------
@@ -32,6 +32,7 @@
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
({ok, [pid()]} | rabbit_types:error('not_found'))).
+-type(header() :: any()).
-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -61,6 +62,11 @@
-spec(prepend_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+-spec(header/2 ::
+ (header(), headers()) -> 'undefined' | any()).
+-spec(header/3 ::
+ (header(), headers(), any()) -> 'undefined' | any()).
+
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
@@ -225,6 +231,19 @@ update_invalid(Name, Value, ExistingHdr, Header) ->
NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
set_invalid(NewHdr, Header).
+header(_Header, undefined) ->
+ undefined;
+header(_Header, []) ->
+ undefined;
+header(Header, Headers) ->
+ header(Header, Headers, undefined).
+
+header(Header, Headers, Default) ->
+ case lists:keysearch(Header, 1, Headers) of
+ false -> Default;
+ {value, Val} -> Val
+ end.
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 728bc43117..010288024b 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -66,8 +66,7 @@ make_msg(Msg = #basic_message{content = Content,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
+ HeadersFun1(update_x_death_header(Info, Headers))
end,
Content1 = #content{properties = Props} =
rabbit_basic:map_headers(HeadersFun2, Content),
@@ -78,6 +77,43 @@ make_msg(Msg = #basic_message{content = Content,
routing_keys = DeathRoutingKeys,
content = Content2}.
+
+x_death_event_key(Info, Key, KeyType) ->
+ case lists:keysearch(Key, 1, Info) of
+ false -> undefined;
+ {value, {Key, KeyType, Val}} -> Val
+ end.
+
+update_x_death_header(Info, Headers) ->
+ Q = x_death_event_key(Info, <<"queue">>, longstr),
+ R = x_death_event_key(Info, <<"reason">>, longstr),
+ case rabbit_basic:header(<<"x-death">>, Headers) of
+ undefined ->
+ rabbit_basic:prepend_table_header(<<"x-death">>,
+ [{<<"count">>, long, 1} | Info], Headers);
+ {<<"x-death">>, array, Tables} ->
+ {Matches, Others} = lists:partition(
+ fun ({table, Info0}) ->
+ x_death_event_key(Info0, <<"queue">>, longstr) =:= Q
+ andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R
+ end, Tables),
+ Info1 = case Matches of
+ [] ->
+ [{<<"count">>, long, 1} | Info];
+ [{table, M}] ->
+ case x_death_event_key(M, <<"count">>, long) of
+ undefined ->
+ [{<<"count">>, long, 1} | M];
+ N ->
+ lists:keyreplace(
+ <<"count">>, 1, M,
+ {<<"count">>, long, N + 1})
+ end
+ end,
+ rabbit_misc:set_table_value(Headers, <<"x-death">>, array,
+ [{table, rabbit_misc:sort_field_table(Info1)} | Others])
+ end.
+
per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->