diff options
| author | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2015-03-26 12:25:42 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2015-03-26 12:25:42 +0100 |
| commit | c4c4dc00acf3375eee5dfdb572d27e0a91f75061 (patch) | |
| tree | 6f65d2b31a259fdc51ff2e37b9b0b011e0597e99 /src | |
| parent | e0cd6f11440b48f284494c2752c7a3fb081beebf (diff) | |
| parent | 47ef081300486551ebc07954b21bb0e329a12058 (diff) | |
| download | rabbitmq-server-git-c4c4dc00acf3375eee5dfdb572d27e0a91f75061.tar.gz | |
Merge pull request #79 from rabbitmq/rabbitmq-server-78
Limit x-death growth to one entry per queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_basic.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 40 |
2 files changed, 58 insertions, 3 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index cd2846c00b..1cb6bef4ab 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -21,7 +21,7 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/4, header_routes/1, - parse_expiration/1]). + parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). %%---------------------------------------------------------------------------- @@ -32,6 +32,7 @@ (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: ({ok, [pid()]} | rabbit_types:error('not_found'))). +-type(header() :: any()). -type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -61,6 +62,11 @@ -spec(prepend_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). +-spec(header/2 :: + (header(), headers()) -> 'undefined' | any()). +-spec(header/3 :: + (header(), headers(), any()) -> 'undefined' | any()). + -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). -spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) @@ -225,6 +231,19 @@ update_invalid(Name, Value, ExistingHdr, Header) -> NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), set_invalid(NewHdr, Header). +header(_Header, undefined) -> + undefined; +header(_Header, []) -> + undefined; +header(Header, Headers) -> + header(Header, Headers, undefined). + +header(Header, Headers, Default) -> + case lists:keysearch(Header, 1, Headers) of + false -> Default; + {value, Val} -> Val + end. + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 728bc43117..010288024b 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -66,8 +66,7 @@ make_msg(Msg = #basic_message{content = Content, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(update_x_death_header(Info, Headers)) end, Content1 = #content{properties = Props} = rabbit_basic:map_headers(HeadersFun2, Content), @@ -78,6 +77,43 @@ make_msg(Msg = #basic_message{content = Content, routing_keys = DeathRoutingKeys, content = Content2}. + +x_death_event_key(Info, Key, KeyType) -> + case lists:keysearch(Key, 1, Info) of + false -> undefined; + {value, {Key, KeyType, Val}} -> Val + end. + +update_x_death_header(Info, Headers) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + case rabbit_basic:header(<<"x-death">>, Headers) of + undefined -> + rabbit_basic:prepend_table_header(<<"x-death">>, + [{<<"count">>, long, 1} | Info], Headers); + {<<"x-death">>, array, Tables} -> + {Matches, Others} = lists:partition( + fun ({table, Info0}) -> + x_death_event_key(Info0, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R + end, Tables), + Info1 = case Matches of + [] -> + [{<<"count">>, long, 1} | Info]; + [{table, M}] -> + case x_death_event_key(M, <<"count">>, long) of + undefined -> + [{<<"count">>, long, 1} | M]; + N -> + lists:keyreplace( + <<"count">>, 1, M, + {<<"count">>, long, N + 1}) + end + end, + rabbit_misc:set_table_value(Headers, <<"x-death">>, array, + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> |
