summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-05-11 17:52:08 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-05-11 17:52:08 +0200
commitac896b70ca69c3025c0e847e4afa8d3ed92b1522 (patch)
tree98179693af264c48daa404a77f3dffa1a4ddd3a0
parent6fc2e9ea182b0c55d01b2271d9ba9a21e95a4a26 (diff)
parent46cb4eaf14fcf7eff67f4bee0362e2b84ea620f7 (diff)
downloadrabbitmq-server-git-ac896b70ca69c3025c0e847e4afa8d3ed92b1522.tar.gz
Merge pull request #153 from rabbitmq/rabbitmq-server-152
Support x-death event values from before #78
-rw-r--r--src/rabbit_dead_letter.erl89
1 files changed, 74 insertions, 15 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 010288024b..dbf38e41e7 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -84,36 +84,95 @@ x_death_event_key(Info, Key, KeyType) ->
{value, {Key, KeyType, Val}} -> Val
end.
+maybe_append_to_event_group(Table, _Key, _SeenKeys, []) ->
+ [Table];
+maybe_append_to_event_group(Table, {_Queue, _Reason} = Key, SeenKeys, Acc) ->
+ case sets:is_element(Key, SeenKeys) of
+ true -> Acc;
+ false -> [Table | Acc]
+ end.
+
+group_by_queue_and_reason([]) ->
+ [];
+group_by_queue_and_reason([Table]) ->
+ [Table];
+group_by_queue_and_reason(Tables) ->
+ {_, Grouped} =
+ lists:foldl(
+ fun ({table, Info}, {SeenKeys, Acc}) ->
+ Q = x_death_event_key(Info, <<"queue">>, longstr),
+ R = x_death_event_key(Info, <<"reason">>, longstr),
+ Matcher = queue_and_reason_matcher(Q, R),
+ {Matches, _} = lists:partition(Matcher, Tables),
+ {Augmented, N} = case Matches of
+ [X] -> {X, 1};
+ [X|_] = Xs -> {X, length(Xs)}
+ end,
+ Key = {Q, R},
+ Acc1 = maybe_append_to_event_group(
+ ensure_xdeath_event_count(Augmented, N),
+ Key, SeenKeys, Acc),
+ {sets:add_element(Key, SeenKeys), Acc1}
+ end, {sets:new(), []}, Tables),
+ Grouped.
+
update_x_death_header(Info, Headers) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
case rabbit_basic:header(<<"x-death">>, Headers) of
undefined ->
- rabbit_basic:prepend_table_header(<<"x-death">>,
+ rabbit_basic:prepend_table_header(
+ <<"x-death">>,
[{<<"count">>, long, 1} | Info], Headers);
{<<"x-death">>, array, Tables} ->
+ %% group existing x-death headers in case we have some from
+ %% before rabbitmq-server#78
+ GroupedTables = group_by_queue_and_reason(Tables),
{Matches, Others} = lists:partition(
- fun ({table, Info0}) ->
- x_death_event_key(Info0, <<"queue">>, longstr) =:= Q
- andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R
- end, Tables),
+ queue_and_reason_matcher(Q, R),
+ GroupedTables),
Info1 = case Matches of
[] ->
[{<<"count">>, long, 1} | Info];
[{table, M}] ->
- case x_death_event_key(M, <<"count">>, long) of
- undefined ->
- [{<<"count">>, long, 1} | M];
- N ->
- lists:keyreplace(
- <<"count">>, 1, M,
- {<<"count">>, long, N + 1})
- end
+ increment_xdeath_event_count(M)
end,
- rabbit_misc:set_table_value(Headers, <<"x-death">>, array,
+ rabbit_misc:set_table_value(
+ Headers, <<"x-death">>, array,
[{table, rabbit_misc:sort_field_table(Info1)} | Others])
end.
+ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 ->
+ {table, ensure_xdeath_event_count(Info, InitialVal)};
+ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 ->
+ case x_death_event_key(Info, <<"count">>, long) of
+ undefined ->
+ [{<<"count">>, long, InitialVal} | Info];
+ _ ->
+ Info
+ end.
+
+increment_xdeath_event_count(Info) ->
+ case x_death_event_key(Info, <<"count">>, long) of
+ undefined ->
+ [{<<"count">>, long, 1} | Info];
+ N ->
+ lists:keyreplace(
+ <<"count">>, 1, Info,
+ {<<"count">>, long, N + 1})
+ end.
+
+queue_and_reason_matcher(Q, R) ->
+ F = fun(Info) ->
+ x_death_event_key(Info, <<"queue">>, longstr) =:= Q
+ andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R
+ end,
+ fun({table, Info}) ->
+ F(Info);
+ (Info) when is_list(Info) ->
+ F(Info)
+ end.
+
per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
@@ -178,7 +237,7 @@ log_cycle_once(Queues) ->
true -> ok;
undefined -> rabbit_log:warning(
"Message dropped. Dead-letter queues cycle detected" ++
- ": ~p~nThis cycle will NOT be reported again.~n",
+ ": ~p~nThis cycle will NOT be reported again.~n",
[Queues]),
put(Key, true)
end.