diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_dead_letter.erl | 77 |
1 files changed, 63 insertions, 14 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 010288024b..b046db998e 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,36 +84,85 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. +maybe_append_to_event_group(Info, _Queue, _Reason, []) -> + [Info]; +maybe_append_to_event_group(Info, Queue, Reason, Acc) -> + case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of + true -> Acc; + false -> [Info | Acc] + end. + +group_by_queue_and_reason(Tables) -> + Infos = [Info || {table, Info} <- Tables], + Grouped = + lists:foldl(fun (Info, Acc) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Infos), + case Matches of + [X] -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X), Q, R, Acc); + [X|_] = Xs when is_list(Xs) -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc) + end + end, [], Infos), + [{table, Info} || Info <- Grouped]. + update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), R = x_death_event_key(Info, <<"reason">>, longstr), case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> rabbit_basic:prepend_table_header(<<"x-death">>, - [{<<"count">>, long, 1} | Info], Headers); + [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> + %% group existing x-death headers in case we have some from + %% before rabbitmq-server#78 + GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - fun ({table, Info0}) -> - x_death_event_key(Info0, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R - end, Tables), + queue_and_reason_matcher(Q, R), GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; [{table, M}] -> - case x_death_event_key(M, <<"count">>, long) of - undefined -> - [{<<"count">>, long, 1} | M]; - N -> - lists:keyreplace( - <<"count">>, 1, M, - {<<"count">>, long, N + 1}) - end + increment_xdeath_event_count(M) end, rabbit_misc:set_table_value(Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + end. + +ensure_xdeath_event_count(Info) -> + ensure_xdeath_event_count(Info, 1). +ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, InitialVal} | Info]; + _ -> + Info + end. + +increment_xdeath_event_count(Info) -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, 1} | Info]; + N -> + lists:keyreplace( + <<"count">>, 1, Info, + {<<"count">>, long, N + 1}) end. +queue_and_reason_matcher(Q, R) -> + fun({table, Info}) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R; + (Info) when is_list(Info) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> |
