summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-03-20 14:38:11 +0100
committerMichael Klishin <mklishin@pivotal.io>2015-03-20 14:38:11 +0100
commit8124e3ce7616eeff440750dd6c55b6b938c7e234 (patch)
treebc240d81a1cfa8729d906ad6ce1f0f830109a860 /src
parent8e2ae9df61da376607624a871ee09fb134746560 (diff)
downloadrabbitmq-server-git-8124e3ce7616eeff440750dd6c55b6b938c7e234.tar.gz
Limit x-death growth to one entry per queue
Otherwise the list can grow forever in some cases. Fixes #78.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_dead_letter.erl42
1 files changed, 39 insertions, 3 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 728bc43117..debb718c9a 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -34,6 +34,8 @@
%%----------------------------------------------------------------------------
+-define(X_DEATH_HEADER, <<"x-death">>).
+
publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
@@ -66,8 +68,7 @@ make_msg(Msg = #basic_message{content = Content,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
+ HeadersFun1(update_x_death_header(Info, Headers))
end,
Content1 = #content{properties = Props} =
rabbit_basic:map_headers(HeadersFun2, Content),
@@ -78,6 +79,41 @@ make_msg(Msg = #basic_message{content = Content,
routing_keys = DeathRoutingKeys,
content = Content2}.
+x_death_header(undefined) ->
+ undefined;
+x_death_header([]) ->
+ undefined;
+x_death_header(Headers) ->
+ case lists:keysearch(?X_DEATH_HEADER, 1, Headers) of
+ false -> undefined;
+ {value, Val} -> Val
+ end.
+
+x_death_event_queue(Info) ->
+ case lists:keysearch(<<"queue">>, 1, Info) of
+ false -> undefined;
+ {value, {<<"queue">>, longstr, Val}} -> Val
+ end.
+
+x_death_not_for_queue({table, Info}, Queue) ->
+ x_death_event_queue(Info) =/= Queue.
+
+x_deaths_from_header({?X_DEATH_HEADER, array, Table}) ->
+ Table.
+
+update_x_death_header(Info, Headers) ->
+ Q = x_death_event_queue(Info),
+ case x_death_header(Headers) of
+ undefined ->
+ rabbit_basic:prepend_table_header(?X_DEATH_HEADER,
+ Info, Headers);
+ Array ->
+ XDeaths = rabbit_misc:sort_field_table([{table, Info} |
+ [XD || XD <- x_deaths_from_header(Array),
+ x_death_not_for_queue(XD, Q)]]),
+ rabbit_misc:set_table_value(Headers, ?X_DEATH_HEADER, array, XDeaths)
+ end.
+
per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
@@ -96,7 +132,7 @@ detect_cycles(_Reason, #basic_message{content = Content}, Queues) ->
undefined ->
NoCycles;
_ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ case rabbit_misc:table_lookup(Headers, ?X_DEATH_HEADER) of
{array, Deaths} ->
{Cycling, NotCycling} =
lists:partition(fun (#resource{name = Queue}) ->