diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-03-20 14:38:11 +0100 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-03-20 14:38:11 +0100 |
| commit | 8124e3ce7616eeff440750dd6c55b6b938c7e234 (patch) | |
| tree | bc240d81a1cfa8729d906ad6ce1f0f830109a860 /src | |
| parent | 8e2ae9df61da376607624a871ee09fb134746560 (diff) | |
| download | rabbitmq-server-git-8124e3ce7616eeff440750dd6c55b6b938c7e234.tar.gz | |
Limit x-death growth to one entry per queue
Otherwise the list can grow forever in some cases.
Fixes #78.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_dead_letter.erl | 42 |
1 files changed, 39 insertions, 3 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 728bc43117..debb718c9a 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -34,6 +34,8 @@ %%---------------------------------------------------------------------------- +-define(X_DEATH_HEADER, <<"x-death">>). + publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), @@ -66,8 +68,7 @@ make_msg(Msg = #basic_message{content = Content, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(update_x_death_header(Info, Headers)) end, Content1 = #content{properties = Props} = rabbit_basic:map_headers(HeadersFun2, Content), @@ -78,6 +79,41 @@ make_msg(Msg = #basic_message{content = Content, routing_keys = DeathRoutingKeys, content = Content2}. +x_death_header(undefined) -> + undefined; +x_death_header([]) -> + undefined; +x_death_header(Headers) -> + case lists:keysearch(?X_DEATH_HEADER, 1, Headers) of + false -> undefined; + {value, Val} -> Val + end. + +x_death_event_queue(Info) -> + case lists:keysearch(<<"queue">>, 1, Info) of + false -> undefined; + {value, {<<"queue">>, longstr, Val}} -> Val + end. + +x_death_not_for_queue({table, Info}, Queue) -> + x_death_event_queue(Info) =/= Queue. + +x_deaths_from_header({?X_DEATH_HEADER, array, Table}) -> + Table. + +update_x_death_header(Info, Headers) -> + Q = x_death_event_queue(Info), + case x_death_header(Headers) of + undefined -> + rabbit_basic:prepend_table_header(?X_DEATH_HEADER, + Info, Headers); + Array -> + XDeaths = rabbit_misc:sort_field_table([{table, Info} | + [XD || XD <- x_deaths_from_header(Array), + x_death_not_for_queue(XD, Q)]]), + rabbit_misc:set_table_value(Headers, ?X_DEATH_HEADER, array, XDeaths) + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> @@ -96,7 +132,7 @@ detect_cycles(_Reason, #basic_message{content = Content}, Queues) -> undefined -> NoCycles; _ -> - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + case rabbit_misc:table_lookup(Headers, ?X_DEATH_HEADER) of {array, Deaths} -> {Cycling, NotCycling} = lists:partition(fun (#resource{name = Queue}) -> |
