diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-05-10 00:33:50 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-05-10 00:33:50 +0300 |
| commit | dc9cda7f07ee4e29ba40f914900a6ff34579b3a7 (patch) | |
| tree | 136773b28f00cd4d19a3d9753a45b95850129f4d /src | |
| parent | 18fcb1a9c793ac2530fa3b73b63092499120d1f0 (diff) | |
| download | rabbitmq-server-git-dc9cda7f07ee4e29ba40f914900a6ff34579b3a7.tar.gz | |
Support x-death event proplists from before #78
We group x-death header values before processing them
to make sure there's only one per {queue, reason}.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_dead_letter.erl | 77 |
1 files changed, 63 insertions, 14 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 010288024b..b046db998e 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,36 +84,85 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. +maybe_append_to_event_group(Info, _Queue, _Reason, []) -> + [Info]; +maybe_append_to_event_group(Info, Queue, Reason, Acc) -> + case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of + true -> Acc; + false -> [Info | Acc] + end. + +group_by_queue_and_reason(Tables) -> + Infos = [Info || {table, Info} <- Tables], + Grouped = + lists:foldl(fun (Info, Acc) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Infos), + case Matches of + [X] -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X), Q, R, Acc); + [X|_] = Xs when is_list(Xs) -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc) + end + end, [], Infos), + [{table, Info} || Info <- Grouped]. + update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), R = x_death_event_key(Info, <<"reason">>, longstr), case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> rabbit_basic:prepend_table_header(<<"x-death">>, - [{<<"count">>, long, 1} | Info], Headers); + [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> + %% group existing x-death headers in case we have some from + %% before rabbitmq-server#78 + GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - fun ({table, Info0}) -> - x_death_event_key(Info0, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R - end, Tables), + queue_and_reason_matcher(Q, R), GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; [{table, M}] -> - case x_death_event_key(M, <<"count">>, long) of - undefined -> - [{<<"count">>, long, 1} | M]; - N -> - lists:keyreplace( - <<"count">>, 1, M, - {<<"count">>, long, N + 1}) - end + increment_xdeath_event_count(M) end, rabbit_misc:set_table_value(Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + end. + +ensure_xdeath_event_count(Info) -> + ensure_xdeath_event_count(Info, 1). +ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, InitialVal} | Info]; + _ -> + Info + end. + +increment_xdeath_event_count(Info) -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, 1} | Info]; + N -> + lists:keyreplace( + <<"count">>, 1, Info, + {<<"count">>, long, N + 1}) end. +queue_and_reason_matcher(Q, R) -> + fun({table, Info}) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R; + (Info) when is_list(Info) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> |
