summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-05-10 00:33:50 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-05-10 00:33:50 +0300
commitdc9cda7f07ee4e29ba40f914900a6ff34579b3a7 (patch)
tree136773b28f00cd4d19a3d9753a45b95850129f4d /src
parent18fcb1a9c793ac2530fa3b73b63092499120d1f0 (diff)
downloadrabbitmq-server-git-dc9cda7f07ee4e29ba40f914900a6ff34579b3a7.tar.gz
Support x-death event proplists from before #78
We group x-death header values before processing them to make sure there's only one per {queue, reason}.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_dead_letter.erl77
1 files changed, 63 insertions, 14 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 010288024b..b046db998e 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -84,36 +84,85 @@ x_death_event_key(Info, Key, KeyType) ->
{value, {Key, KeyType, Val}} -> Val
end.
+maybe_append_to_event_group(Info, _Queue, _Reason, []) ->
+ [Info];
+maybe_append_to_event_group(Info, Queue, Reason, Acc) ->
+ case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of
+ true -> Acc;
+ false -> [Info | Acc]
+ end.
+
+group_by_queue_and_reason(Tables) ->
+ Infos = [Info || {table, Info} <- Tables],
+ Grouped =
+ lists:foldl(fun (Info, Acc) ->
+ Q = x_death_event_key(Info, <<"queue">>, longstr),
+ R = x_death_event_key(Info, <<"reason">>, longstr),
+ Matcher = queue_and_reason_matcher(Q, R),
+ {Matches, _} = lists:partition(Matcher, Infos),
+ case Matches of
+ [X] ->
+ maybe_append_to_event_group(
+ ensure_xdeath_event_count(X), Q, R, Acc);
+ [X|_] = Xs when is_list(Xs) ->
+ maybe_append_to_event_group(
+ ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc)
+ end
+ end, [], Infos),
+ [{table, Info} || Info <- Grouped].
+
update_x_death_header(Info, Headers) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
case rabbit_basic:header(<<"x-death">>, Headers) of
undefined ->
rabbit_basic:prepend_table_header(<<"x-death">>,
- [{<<"count">>, long, 1} | Info], Headers);
+ [{<<"count">>, long, 1} | Info], Headers);
{<<"x-death">>, array, Tables} ->
+ %% group existing x-death headers in case we have some from
+ %% before rabbitmq-server#78
+ GroupedTables = group_by_queue_and_reason(Tables),
{Matches, Others} = lists:partition(
- fun ({table, Info0}) ->
- x_death_event_key(Info0, <<"queue">>, longstr) =:= Q
- andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R
- end, Tables),
+ queue_and_reason_matcher(Q, R), GroupedTables),
Info1 = case Matches of
[] ->
[{<<"count">>, long, 1} | Info];
[{table, M}] ->
- case x_death_event_key(M, <<"count">>, long) of
- undefined ->
- [{<<"count">>, long, 1} | M];
- N ->
- lists:keyreplace(
- <<"count">>, 1, M,
- {<<"count">>, long, N + 1})
- end
+ increment_xdeath_event_count(M)
end,
rabbit_misc:set_table_value(Headers, <<"x-death">>, array,
- [{table, rabbit_misc:sort_field_table(Info1)} | Others])
+ [{table, rabbit_misc:sort_field_table(Info1)} | Others])
+ end.
+
+ensure_xdeath_event_count(Info) ->
+ ensure_xdeath_event_count(Info, 1).
+ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 ->
+ case x_death_event_key(Info, <<"count">>, long) of
+ undefined ->
+ [{<<"count">>, long, InitialVal} | Info];
+ _ ->
+ Info
+ end.
+
+increment_xdeath_event_count(Info) ->
+ case x_death_event_key(Info, <<"count">>, long) of
+ undefined ->
+ [{<<"count">>, long, 1} | Info];
+ N ->
+ lists:keyreplace(
+ <<"count">>, 1, Info,
+ {<<"count">>, long, N + 1})
end.
+queue_and_reason_matcher(Q, R) ->
+ fun({table, Info}) ->
+ x_death_event_key(Info, <<"queue">>, longstr) =:= Q
+ andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R;
+ (Info) when is_list(Info) ->
+ x_death_event_key(Info, <<"queue">>, longstr) =:= Q
+ andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R
+ end.
+
per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->