diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
commit | f23a51261d9502ec39df0f8db47ba6b22aa7659f (patch) | |
tree | 53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_management_agent/src | |
parent | afa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff) | |
parent | 9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff) | |
download | rabbitmq-server-git-stream-timestamp-offset.tar.gz |
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_management_agent/src')
16 files changed, 3732 insertions, 0 deletions
diff --git a/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl b/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl new file mode 100644 index 0000000000..bc6bdbdc25 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand'). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + output/2, + switches/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +usage() -> + <<"reset_stats_db [--all]">>. + +validate(_, _) -> + ok. + +merge_defaults(A, Opts) -> + {A, maps:merge(#{all => false}, Opts)}. + +switches() -> + [{all, boolean}]. + +run(_Args, #{node := Node, all := true}) -> + rabbit_misc:rpc_call(Node, rabbit_mgmt_storage, reset_all, []); +run(_Args, #{node := Node, all := false}) -> + rabbit_misc:rpc_call(Node, rabbit_mgmt_storage, reset, []). + +output(Output, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Output). + +banner(_, #{all := true}) -> + <<"Resetting statistics database in all nodes">>; +banner(_, #{node := Node}) -> + erlang:iolist_to_binary([<<"Resetting statistics database on node ">>, + atom_to_binary(Node, utf8)]). + +description() -> + <<"Resets statistics database. This will remove all metrics data!">>. diff --git a/deps/rabbitmq_management_agent/src/exometer_slide.erl b/deps/rabbitmq_management_agent/src/exometer_slide.erl new file mode 100644 index 0000000000..2c4e4c6d35 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/exometer_slide.erl @@ -0,0 +1,551 @@ +%% This file is a copy of exometer_slide.erl from https://github.com/Feuerlabs/exometer_core, +%% with the following modifications: +%% +%% 1) The elements are tuples of numbers +%% +%% 2) Only one element for each expected interval point is added, intermediate values +%% are discarded. Thus, if we have a window of 60s and interval of 5s, at max 12 elements +%% are stored. +%% +%% 3) Additions can be provided as increments to the last value stored +%% +%% 4) sum/1 implements the sum of several slides, generating a new timestamp sequence based +%% on the given intervals. Elements on each window are added to the closest interval point. +%% +%% Original commit: https://github.com/Feuerlabs/exometer_core/commit/2759edc804211b5245867b32c9a20c8fe1d93441 +%% +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% ------------------------------------------------------------------- +%% +%% @author Tony Rogvall <tony@rogvall.se> +%% @author Ulf Wiger <ulf@feuerlabs.com> +%% @author Magnus Feuer <magnus@feuerlabs.com> +%% +%% @doc Efficient sliding-window buffer +%% +%% Initial implementation: 29 Sep 2009 by Tony Rogvall +%% +%% This module implements an efficient sliding window, maintaining +%% two lists - a primary and a secondary. Values are paired with a +%% timestamp (millisecond resolution, see `timestamp/0') +%% and prepended to the primary list. When the time span between the oldest +%% and the newest entry in the primary list exceeds the given window size, +%% the primary list is shifted into the secondary list position, and the +%% new entry is added to a new (empty) primary list. +%% +%% The window can be converted to a list using `to_list/1'. +%% @end +%% +%% +%% All modifications are (C) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% The Initial Developer of the Original Code is Basho Technologies, Inc. + +-module(exometer_slide). + +-export([new/2, new/3, + reset/1, + add_element/3, + to_list/2, + to_list/3, + foldl/5, + map/2, + to_normalized_list/5]). + +-export([timestamp/0, + last_two/1, + last/1]). + +-export([sum/1, + sum/2, + sum/5, + optimize/1]). + +%% For testing +-export([buffer/1]). + +-compile(inline). +-compile(inline_list_funcs). + + +-type value() :: tuple(). +-type internal_value() :: tuple() | drop. +-type timestamp() :: non_neg_integer(). + +-type fold_acc() :: any(). +-type fold_fun() :: fun(({timestamp(), internal_value()}, fold_acc()) -> fold_acc()). + +%% Fixed size event buffer +-record(slide, {size = 0 :: integer(), % ms window + n = 0 :: integer(), % number of elements in buf1 + max_n :: infinity | integer(), % max no of elements + incremental = false :: boolean(), + interval :: integer(), + last = 0 :: integer(), % millisecond timestamp + first = undefined :: undefined | integer(), % millisecond timestamp + buf1 = [] :: [internal_value()], + buf2 = [] :: [internal_value()], + total :: undefined | value()}). + +-opaque slide() :: #slide{}. + +-export_type([slide/0, timestamp/0]). + +-spec timestamp() -> timestamp(). +%% @doc Generate a millisecond-resolution timestamp. +%% +%% This timestamp format is used e.g. by the `exometer_slide' and +%% `exometer_histogram' implementations. +%% @end +timestamp() -> + os:system_time(milli_seconds). + +-spec new(_Size::integer(), _Options::list()) -> slide(). +%% @doc Create a new sliding-window buffer. +%% +%% `Size' determines the size in milliseconds of the sliding window. +%% The implementation prepends values into a primary list until the oldest +%% element in the list is `Size' ms older than the current value. It then +%% swaps the primary list into a secondary list, and starts prepending to +%% a new primary list. This means that more data than fits inside the window +%% will be kept - upwards of twice as much. On the other hand, updating the +%% buffer is very cheap. +%% @end +new(Size, Opts) -> new(timestamp(), Size, Opts). + +-spec new(Timestamp :: timestamp(), Size::integer(), Options::list()) -> slide(). +new(TS, Size, Opts) -> + #slide{size = Size, + max_n = proplists:get_value(max_n, Opts, infinity), + interval = proplists:get_value(interval, Opts, infinity), + last = TS, + first = undefined, + incremental = proplists:get_value(incremental, Opts, false), + buf1 = [], + buf2 = []}. + +-spec reset(slide()) -> slide(). + +%% @doc Empty the buffer +%% +reset(Slide) -> + Slide#slide{n = 0, buf1 = [], buf2 = [], last = 0, first = undefined}. + +%% @doc Add an element to the buffer, tagged with the given timestamp. +%% +%% Apart from the specified timestamp, this function works just like +%% {@link add_element/2}. +%% @end +-spec add_element(timestamp(), value(), slide()) -> slide(). +add_element(_TS, _Evt, Slide) when Slide#slide.size == 0 -> + Slide; +add_element(TS, Evt, #slide{last = Last, interval = Interval, total = Total0, + incremental = true} = Slide) + when (TS - Last) < Interval -> + Total = add_to_total(Evt, Total0), + Slide#slide{total = Total}; +add_element(TS, Evt, #slide{last = Last, interval = Interval} = Slide) + when (TS - Last) < Interval -> + Slide#slide{total = Evt}; +add_element(TS, Evt, #slide{last = Last, size = Sz, incremental = true, + n = N, max_n = MaxN, total = Total0, + buf1 = Buf1} = Slide) -> + N1 = N+1, + Total = add_to_total(Evt, Total0), + %% Total could be the same as the last sample, by adding and substracting + %% the same amout to the totals. That is not strictly a drop, but should + %% generate new samples. + %% I.e. 0, 0, -14, 14 (total = 0, samples = 14, -14, 0, drop) + case {is_zeros(Evt), Buf1} of + {_, []} -> + Slide#slide{n = N1, first = TS, buf1 = [{TS, Total} | Buf1], + last = TS, total = Total}; + _ when TS - Last > Sz; N1 > MaxN -> + %% swap + Slide#slide{last = TS, n = 1, buf1 = [{TS, Total}], + buf2 = Buf1, total = Total}; + {true, [{_, Total}, {_, drop} = Drop | Tail]} -> + %% Memory optimisation + Slide#slide{buf1 = [{TS, Total}, Drop | Tail], + n = N1, last = TS}; + {true, [{DropTS, Total} | Tail]} -> + %% Memory optimisation + Slide#slide{buf1 = [{TS, Total}, {DropTS, drop} | Tail], + n = N1, last = TS}; + _ -> + Slide#slide{n = N1, buf1 = [{TS, Total} | Buf1], + last = TS, total = Total} + end; +add_element(TS, Evt, #slide{last = Last, size = Sz, n = N, max_n = MaxN, + buf1 = Buf1} = Slide) + when TS - Last > Sz; N + 1 > MaxN -> + Slide#slide{last = TS, n = 1, buf1 = [{TS, Evt}], + buf2 = Buf1, total = Evt}; +add_element(TS, Evt, #slide{buf1 = [{_, Evt}, {_, drop} = Drop | Tail], + n = N} = Slide) -> + %% Memory optimisation + Slide#slide{buf1 = [{TS, Evt}, Drop | Tail], n = N + 1, last = TS}; +add_element(TS, Evt, #slide{buf1 = [{DropTS, Evt} | Tail], n = N} = Slide) -> + %% Memory optimisation + Slide#slide{buf1 = [{TS, Evt}, {DropTS, drop} | Tail], + n = N + 1, last = TS}; +add_element(TS, Evt, #slide{n = N, buf1 = Buf1} = Slide) -> + N1 = N+1, + case Buf1 of + [] -> + Slide#slide{n = N1, buf1 = [{TS, Evt} | Buf1], + last = TS, first = TS, total = Evt}; + _ -> + Slide#slide{n = N1, buf1 = [{TS, Evt} | Buf1], + last = TS, total = Evt} + end. + +add_to_total(Evt, undefined) -> + Evt; +add_to_total({A0}, {B0}) -> + {B0 + A0}; +add_to_total({A0, A1}, {B0, B1}) -> + {B0 + A0, B1 + A1}; +add_to_total({A0, A1, A2}, {B0, B1, B2}) -> + {B0 + A0, B1 + A1, B2 + A2}; +add_to_total({A0, A1, A2, A3}, {B0, B1, B2, B3}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3}; +add_to_total({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4}; +add_to_total({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5}; +add_to_total({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6}; +add_to_total({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7}; +add_to_total({A0, A1, A2, A3, A4, A5, A6, A7, A8}, {B0, B1, B2, B3, B4, B5, B6, B7, B8}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7, B8 + A8}; +add_to_total({A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, + A15, A16, A17, A18, A19}, + {B0, B1, B2, B3, B4, B5, B6, B7, B8, B9, B10, B11, B12, B13, B14, + B15, B16, B17, B18, B19}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7, B8 + A8, + B9 + A9, B10 + A10, B11 + A11, B12 + A12, B13 + A13, B14 + A14, B15 + A15, B16 + A16, + B17 + A17, B18 + A18, B19 + A19}. + +is_zeros({0}) -> + true; +is_zeros({0, 0}) -> + true; +is_zeros({0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0, 0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0, 0, 0, 0, 0, 0}) -> + true; +is_zeros({0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) -> + true; +is_zeros(_) -> + false. + +-spec optimize(slide()) -> slide(). +optimize(#slide{buf2 = []} = Slide) -> + Slide; +optimize(#slide{buf1 = Buf1, buf2 = Buf2, max_n = MaxN, n = N} = Slide) + when is_integer(MaxN) andalso length(Buf1) < MaxN -> + Slide#slide{buf1 = Buf1, + buf2 = lists:sublist(Buf2, n_diff(MaxN, N) + 1)}; +optimize(Slide) -> Slide. + +snd(T) when is_tuple(T) -> + element(2, T). + + +-spec to_list(timestamp(), slide()) -> [{timestamp(), value()}]. +%% @doc Convert the sliding window into a list of timestamped values. +%% @end +to_list(_Now, #slide{size = Sz}) when Sz == 0 -> + []; +to_list(Now, #slide{size = Sz} = Slide) -> + snd(to_list_from(Now, Now - Sz, Slide)). + +to_list(Now, Start, Slide) -> + snd(to_list_from(Now, Start, Slide)). + +to_list_from(Now, Start0, #slide{max_n = MaxN, buf2 = Buf2, first = FirstTS, + interval = Interval} = Slide) -> + + {NewN, Buf1} = maybe_add_last_sample(Now, Slide), + Start = first_max(FirstTS, Start0), + {Prev0, Buf1_1} = take_since(Buf1, Now, Start, first_max(MaxN, NewN), [], Interval), + case take_since(Buf2, Now, Start, first_max(MaxN, NewN), Buf1_1, Interval) of + {undefined, Buf1_1} -> + {Prev0, Buf1_1}; + {_Prev, Buf1_1} = Res -> + case Prev0 of + undefined -> + Res; + _ -> + %% If take_since returns the same buffer, that means we don't + %% need Buf2 at all. We might be returning a too old sample + %% in previous, so we must use the one from Buf1 + {Prev0, Buf1_1} + end; + Res -> + Res + end. + +first_max(F, X) when is_integer(F) -> max(F, X); +first_max(_, X) -> X. + +-spec last_two(slide()) -> [{timestamp(), value()}]. +%% @doc Returns the newest 2 elements on the sample +last_two(#slide{buf1 = [{TS, Evt} = H1, {_, drop} | _], interval = Interval}) -> + [H1, {TS - Interval, Evt}]; +last_two(#slide{buf1 = [H1, H2_0 | _], interval = Interval}) -> + H2 = adjust_timestamp(H1, H2_0, Interval), + [H1, H2]; +last_two(#slide{buf1 = [H1], buf2 = [H2_0 | _], + interval = Interval}) -> + H2 = adjust_timestamp(H1, H2_0, Interval), + [H1, H2]; +last_two(#slide{buf1 = [H1], buf2 = []}) -> + [H1]; +last_two(_) -> + []. + +adjust_timestamp({TS1, _}, {TS2, V2}, Interval) -> + case TS1 - TS2 > Interval of + true -> {TS1 - Interval, V2}; + false -> {TS2, V2} + end. + +-spec last(slide()) -> value() | undefined. +last(#slide{total = T}) when T =/= undefined -> + T; +last(#slide{buf1 = [{_TS, T} | _]}) -> + T; +last(#slide{buf2 = [{_TS, T} | _]}) -> + T; +last(_) -> + undefined. + +-spec foldl(timestamp(), timestamp(), fold_fun(), fold_acc(), slide()) -> fold_acc(). +%% @doc Fold over the sliding window, starting from `Timestamp'. +%% Now provides a reference point to evaluate whether to include +%% partial, unrealised sample values in the sequence. Unrealised values will be +%% appended to the sequence when Now >= LastTS + Interval +%% +%% The fun should as `fun({Timestamp, Value}, Acc) -> NewAcc'. +%% The values are processed in order from oldest to newest. +%% @end +foldl(_Now, _Timestamp, _Fun, _Acc, #slide{size = Sz}) when Sz == 0 -> + []; +foldl(Now, Start0, Fun, Acc, #slide{max_n = _MaxN, buf2 = _Buf2, + interval = _Interval} = Slide) -> + lists:foldl(Fun, Acc, element(2, to_list_from(Now, Start0, Slide)) ++ [last]). + +map(Fun, #slide{buf1 = Buf1, buf2 = Buf2, total = Total} = Slide) -> + BufFun = fun({Timestamp, Value}) -> + {Timestamp, Fun(Value)} + end, + MappedBuf1 = lists:map(BufFun, Buf1), + MappedBuf2 = lists:map(BufFun, Buf2), + MappedTotal = Fun(Total), + Slide#slide{buf1 = MappedBuf1, buf2 = MappedBuf2, total = MappedTotal}. + +maybe_add_last_sample(_Now, #slide{total = T, n = N, + buf1 = [{_, T} | _] = Buf1}) -> + {N, Buf1}; +maybe_add_last_sample(Now, #slide{total = T, + n = N, + last = Last, + interval = I, + buf1 = Buf1}) + when T =/= undefined andalso Now >= Last + I -> + {N + 1, [{Last + I, T} | Buf1]}; +maybe_add_last_sample(_Now, #slide{buf1 = Buf1, n = N}) -> + {N, Buf1}. + + +create_normalized_lookup(Start, Interval, RoundFun, Samples) -> + lists:foldl(fun({TS, Value}, Acc) when TS - Start >= 0 -> + NewTS = map_timestamp(TS, Start, Interval, RoundFun), + maps:update_with(NewTS, + fun({T, V}) when T > TS -> + {T, V}; + (_) -> + {TS, Value} + end, {TS, Value}, Acc); + (_, Acc) -> + Acc + end, #{}, Samples). + +-spec to_normalized_list(timestamp(), timestamp(), integer(), slide(), + no_pad | tuple()) -> [tuple()]. +to_normalized_list(Now, Start, Interval, Slide, Empty) -> + to_normalized_list(Now, Start, Interval, Slide, Empty, fun ceil/1). + +to_normalized_list(Now, Start, Interval, #slide{first = FirstTS0, + total = Total} = Slide, + Empty, RoundFun) -> + + RoundTSFun = fun (TS) -> map_timestamp(TS, Start, Interval, RoundFun) end, + + % add interval as we don't want to miss a sample due to rounding + {Prev, Samples} = to_list_from(Now + Interval, Start, Slide), + Lookup = create_normalized_lookup(Start, Interval, RoundFun, Samples), + + NowRound = RoundTSFun(Now), + + Pad = case Samples of + _ when Empty =:= no_pad -> + []; + [{TS, _} | _] when Prev =/= undefined, Start =< TS -> + [{T, snd(Prev)} + || T <- lists:seq(RoundTSFun(TS) - Interval, Start, + -Interval)]; + [{TS, _} | _] when is_number(FirstTS0) andalso Start < FirstTS0 -> + % only if we know there is nothing in the past can we + % generate a 0 pad + [{T, Empty} || T <- lists:seq(RoundTSFun(TS) - Interval, Start, + -Interval)]; + _ when FirstTS0 =:= undefined andalso Total =:= undefined -> + [{T, Empty} || T <- lists:seq(NowRound, Start, -Interval)]; + [] -> % samples have been seen, use the total to pad + [{T, Total} || T <- lists:seq(NowRound, Start, -Interval)]; + _ -> [] + end, + + {_, Res1} = lists:foldl( + fun(T, {Last, Acc}) -> + case maps:find(T, Lookup) of + {ok, {_, V}} -> + {V, [{T, V} | Acc]}; + error when Last =:= undefined -> + {Last, Acc}; + error -> % this pads the last value into the future + {Last, [{T, Last} | Acc]} + end + end, {undefined, []}, + lists:seq(Start, NowRound, Interval)), + Res1 ++ Pad. + + +%% @doc Sums a list of slides +%% +%% Takes the last known timestamp and creates an template version of the +%% sliding window. Timestamps are then truncated and summed with the value +%% in the template slide. +%% @end +-spec sum([slide()]) -> slide(). +sum(Slides) -> sum(Slides, no_pad). + +sum([#slide{size = Size, interval = Interval} | _] = Slides, Pad) -> + % take the freshest timestamp as reference point for summing operation + Now = lists:max([Last || #slide{last = Last} <- Slides]), + Start = Now - Size, + sum(Now, Start, Interval, Slides, Pad). + + +sum(Now, Start, Interval, [Slide | _ ] = All, Pad) -> + Fun = fun({TS, Value}, Acc) -> + maps:update_with(TS, fun(V) -> add_to_total(V, Value) end, + Value, Acc) + end, + {Total, Dict} = + lists:foldl(fun(#slide{total = T} = S, {Tot, Acc}) -> + Samples = to_normalized_list(Now, Start, Interval, S, + Pad, fun ceil/1), + Total = add_to_total(T, Tot), + Folded = lists:foldl(Fun, Acc, Samples), + {Total, Folded} + end, {undefined, #{}}, All), + + {First, Buffer} = case lists:sort(maps:to_list(Dict)) of + [] -> + F = case [TS || #slide{first = TS} <- All, + is_integer(TS)] of + [] -> undefined; + FS -> lists:min(FS) + end, + {F, []}; + [{F, _} | _ ] = B -> + {F, lists:reverse(B)} + end, + Slide#slide{buf1 = Buffer, buf2 = [], total = Total, n = length(Buffer), + first = First, last = Now}. + + +truncated_seq(_First, _Last, _Incr, 0) -> + []; +truncated_seq(TS, TS, _Incr, MaxN) when MaxN > 0 -> + [TS]; +truncated_seq(First, Last, Incr, MaxN) when First =< Last andalso MaxN > 0 -> + End = min(Last, First + (MaxN * Incr) - Incr), + lists:seq(First, End, Incr); +truncated_seq(First, Last, Incr, MaxN) -> + End = max(Last, First + (MaxN * Incr) - Incr), + lists:seq(First, End, Incr). + + +take_since([{DropTS, drop} | T], Now, Start, N, [{TS, Evt} | _] = Acc, + Interval) -> + case T of + [] -> + Fill = [{TS0, Evt} || TS0 <- truncated_seq(TS - Interval, + max(DropTS, Start), + -Interval, N)], + {undefined, lists:reverse(Fill) ++ Acc}; + [{TS0, _} = E | Rest] when TS0 >= Start, N > 0 -> + Fill = [{TS1, Evt} || TS1 <- truncated_seq(TS0 + Interval, + max(TS0 + Interval, TS - Interval), + Interval, N)], + take_since(Rest, Now, Start, decr(N), [E | Fill ++ Acc], Interval); + [Prev | _] -> % next sample is out of range so needs to be filled from Start + Fill = [{TS1, Evt} || TS1 <- truncated_seq(Start, max(Start, TS - Interval), + Interval, N)], + {Prev, Fill ++ Acc} + end; +take_since([{TS, V} = H | T], Now, Start, N, Acc, Interval) when TS >= Start, + N > 0, + TS =< Now, + is_tuple(V) -> + take_since(T, Now, Start, decr(N), [H|Acc], Interval); +take_since([{TS,_} | T], Now, Start, N, Acc, Interval) when TS >= Start, N > 0 -> + take_since(T, Now, Start, decr(N), Acc, Interval); +take_since([Prev | _], _, _, _, Acc, _) -> + {Prev, Acc}; +take_since(_, _, _, _, Acc, _) -> + %% Don't reverse; already the wanted order. + {undefined, Acc}. + +decr(N) when is_integer(N) -> + N-1; +decr(N) -> N. + +n_diff(A, B) when is_integer(A) -> + A - B. + +ceil(X) when X < 0 -> + trunc(X); +ceil(X) -> + T = trunc(X), + case X - T == 0 of + true -> T; + false -> T + 1 + end. + +map_timestamp(TS, Start, Interval, Round) -> + Factor = Round((TS - Start) / Interval), + Start + Interval * Factor. + +buffer(#slide{buf1 = Buf1, buf2 = Buf2}) -> + Buf1 ++ Buf2. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl new file mode 100644 index 0000000000..e889815c2f --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl @@ -0,0 +1,17 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_agent_app). + +-behaviour(application). +-export([start/2, stop/1]). + +start(_Type, _StartArgs) -> + rabbit_mgmt_agent_sup_sup:start_link(). + +stop(_State) -> + ok. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl new file mode 100644 index 0000000000..e8d074e891 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl @@ -0,0 +1,22 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_agent_config). + +-export([get_env/1, get_env/2]). + +%% some people have reasons to only run with the agent enabled: +%% make it possible for them to configure key management app +%% settings such as rates_mode. +get_env(Key) -> + rabbit_misc:get_env(rabbitmq_management, Key, + rabbit_misc:get_env(rabbitmq_management_agent, Key, + undefined)). + +get_env(Key, Default) -> + rabbit_misc:get_env(rabbitmq_management, Key, + rabbit_misc:get_env(rabbitmq_management_agent, Key, + Default)). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl new file mode 100644 index 0000000000..0c4a5465e9 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl @@ -0,0 +1,55 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_agent_sup). + +%% pg2 is deprecated in OTP 23. +-compile(nowarn_deprecated_function). + +-behaviour(supervisor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). +-include("rabbit_mgmt_metrics.hrl"). + +-export([init/1]). +-export([start_link/0]). + +init([]) -> + MCs = maybe_enable_metrics_collector(), + ExternalStats = {rabbit_mgmt_external_stats, + {rabbit_mgmt_external_stats, start_link, []}, + permanent, 5000, worker, [rabbit_mgmt_external_stats]}, + {ok, {{one_for_one, 100, 10}, [ExternalStats] ++ MCs}}. + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +maybe_enable_metrics_collector() -> + case application:get_env(rabbitmq_management_agent, disable_metrics_collector, false) of + false -> + pg2:create(management_db), + ok = pg2:join(management_db, self()), + ST = {rabbit_mgmt_storage, {rabbit_mgmt_storage, start_link, []}, + permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_storage]}, + MD = {delegate_management_sup, {delegate_sup, start_link, [5, ?DELEGATE_PREFIX]}, + permanent, ?SUPERVISOR_WAIT, supervisor, [delegate_sup]}, + MC = [{rabbit_mgmt_metrics_collector:name(Table), + {rabbit_mgmt_metrics_collector, start_link, [Table]}, + permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_collector]} + || {Table, _} <- ?CORE_TABLES], + MGC = [{rabbit_mgmt_metrics_gc:name(Event), + {rabbit_mgmt_metrics_gc, start_link, [Event]}, + permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_gc]} + || Event <- ?GC_EVENTS], + GC = {rabbit_mgmt_gc, {rabbit_mgmt_gc, start_link, []}, + permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_gc]}, + [ST, MD, GC | MC ++ MGC]; + true -> + [] + end. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl new file mode 100644 index 0000000000..17ffa35307 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl @@ -0,0 +1,28 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_agent_sup_sup). + +-behaviour(supervisor2). + +-export([init/1]). +-export([start_link/0, start_child/0]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +start_child() -> + supervisor2:start_child(?MODULE, sup()). + +sup() -> + {rabbit_mgmt_agent_sup, {rabbit_mgmt_agent_sup, start_link, []}, + temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_mgmt_agent_sup]}. + +init([]) -> + {ok, {{one_for_one, 0, 1}, [sup()]}}. + +start_link() -> + supervisor2:start_link({local, ?MODULE}, ?MODULE, []). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl new file mode 100644 index 0000000000..d73c8a3819 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl @@ -0,0 +1,572 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_data). + +-include("rabbit_mgmt_records.hrl"). +-include("rabbit_mgmt_metrics.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). + +-export([empty/2, pick_range/2]). + +% delegate api +-export([overview_data/4, + consumer_data/2, + all_list_queue_data/3, + all_detail_queue_data/3, + all_exchange_data/3, + all_connection_data/3, + all_list_channel_data/3, + all_detail_channel_data/3, + all_vhost_data/3, + all_node_data/3, + augmented_created_stats/2, + augmented_created_stats/3, + augment_channel_pids/2, + augment_details/2, + lookup_element/2, + lookup_element/3 + ]). + +-import(rabbit_misc, [pget/2]). + +-type maybe_slide() :: exometer_slide:slide() | not_found. +-type ranges() :: {maybe_range(), maybe_range(), maybe_range(), maybe_range()}. +-type maybe_range() :: no_range | #range{}. + +%%---------------------------------------------------------------------------- +%% Internal, query-time - node-local operations +%%---------------------------------------------------------------------------- + +created_stats(Name, Type) -> + case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of + [] -> not_found; + [Elem] -> Elem + end. + +created_stats(Type) -> + %% TODO better tab2list? + ets:select(Type, [{{'_', '_', '$3'}, [], ['$3']}]). + +-spec all_detail_queue_data(pid(), [any()], ranges()) -> #{atom() => any()}. +all_detail_queue_data(_Pid, Ids, Ranges) -> + lists:foldl(fun (Id, Acc) -> + Data = detail_queue_data(Ranges, Id), + maps:put(Id, Data, Acc) + end, #{}, Ids). + +all_list_queue_data(_Pid, Ids, Ranges) -> + lists:foldl(fun (Id, Acc) -> + Data = list_queue_data(Ranges, Id), + maps:put(Id, Data, Acc) + end, #{}, Ids). + +all_detail_channel_data(_Pid, Ids, Ranges) -> + lists:foldl(fun (Id, Acc) -> + Data = detail_channel_data(Ranges, Id), + maps:put(Id, Data, Acc) + end, #{}, Ids). + +all_list_channel_data(_Pid, Ids, Ranges) -> + lists:foldl(fun (Id, Acc) -> + Data = list_channel_data(Ranges, Id), + maps:put(Id, Data, Acc) + end, #{}, Ids). + +connection_data(Ranges, Id) -> + maps:from_list([raw_message_data(connection_stats_coarse_conn_stats, + pick_range(coarse_conn_stats, Ranges), Id), + {connection_stats, lookup_element(connection_stats, Id)}]). + +exchange_data(Ranges, Id) -> + maps:from_list( + exchange_raw_detail_stats_data(Ranges, Id) ++ + [raw_message_data(exchange_stats_publish_out, + pick_range(fine_stats, Ranges), Id), + raw_message_data(exchange_stats_publish_in, + pick_range(fine_stats, Ranges), Id)]). + +vhost_data(Ranges, Id) -> + maps:from_list([raw_message_data(vhost_stats_coarse_conn_stats, + pick_range(coarse_conn_stats, Ranges), Id), + raw_message_data(vhost_msg_stats, + pick_range(queue_msg_rates, Ranges), Id), + raw_message_data(vhost_stats_fine_stats, + pick_range(fine_stats, Ranges), Id), + raw_message_data(vhost_stats_deliver_stats, + pick_range(deliver_get, Ranges), Id)]). + +node_data(Ranges, Id) -> + maps:from_list( + [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++ + [{node_node_metrics, node_node_metrics()}] ++ + node_raw_detail_stats_data(Ranges, Id) ++ + [raw_message_data(node_coarse_stats, + pick_range(coarse_node_stats, Ranges), Id), + raw_message_data(node_persister_stats, + pick_range(coarse_node_stats, Ranges), Id), + {node_stats, lookup_element(node_stats, Id)}] ++ + node_connection_churn_rates_data(Ranges, Id)). + +overview_data(_Pid, User, Ranges, VHosts) -> + Raw = [raw_all_message_data(vhost_msg_stats, pick_range(queue_msg_counts, Ranges), VHosts), + raw_all_message_data(vhost_stats_fine_stats, pick_range(fine_stats, Ranges), VHosts), + raw_all_message_data(vhost_msg_rates, pick_range(queue_msg_rates, Ranges), VHosts), + raw_all_message_data(vhost_stats_deliver_stats, pick_range(deliver_get, Ranges), VHosts), + raw_message_data(connection_churn_rates, pick_range(queue_msg_rates, Ranges), node())], + maps:from_list(Raw ++ + [{connections_count, count_created_stats(connection_created_stats, User)}, + {channels_count, count_created_stats(channel_created_stats, User)}, + {consumers_count, ets:info(consumer_stats, size)}]). + +consumer_data(_Pid, VHost) -> + maps:from_list( + [{C, augment_msg_stats(augment_consumer(C))} + || C <- consumers_by_vhost(VHost)]). + +all_connection_data(_Pid, Ids, Ranges) -> + maps:from_list([{Id, connection_data(Ranges, Id)} || Id <- Ids]). + +all_exchange_data(_Pid, Ids, Ranges) -> + maps:from_list([{Id, exchange_data(Ranges, Id)} || Id <- Ids]). + +all_vhost_data(_Pid, Ids, Ranges) -> + maps:from_list([{Id, vhost_data(Ranges, Id)} || Id <- Ids]). + +all_node_data(_Pid, Ids, Ranges) -> + maps:from_list([{Id, node_data(Ranges, Id)} || Id <- Ids]). + +channel_raw_message_data(Ranges, Id) -> + [raw_message_data(channel_stats_fine_stats, pick_range(fine_stats, Ranges), Id), + raw_message_data(channel_stats_deliver_stats, pick_range(deliver_get, Ranges), Id), + raw_message_data(channel_process_stats, pick_range(process_stats, Ranges), Id)]. + +queue_raw_message_data(Ranges, Id) -> + [raw_message_data(queue_stats_publish, pick_range(fine_stats, Ranges), Id), + raw_message_data(queue_stats_deliver_stats, pick_range(deliver_get, Ranges), Id), + raw_message_data(queue_process_stats, pick_range(process_stats, Ranges), Id), + raw_message_data(queue_msg_stats, pick_range(queue_msg_counts, Ranges), Id)]. + +queue_raw_deliver_stats_data(Ranges, Id) -> + [raw_message_data2(channel_queue_stats_deliver_stats, + pick_range(deliver_get, Ranges), Key) + || Key <- get_table_keys(channel_queue_stats_deliver_stats, second(Id))] ++ + [raw_message_data2(queue_exchange_stats_publish, + pick_range(fine_stats, Ranges), Key) + || Key <- get_table_keys(queue_exchange_stats_publish, first(Id))]. + +node_raw_detail_stats_data(Ranges, Id) -> + [raw_message_data2(node_node_coarse_stats, + pick_range(coarse_node_node_stats, Ranges), Key) + || Key <- get_table_keys(node_node_coarse_stats, first(Id))]. + +node_connection_churn_rates_data(Ranges, Id) -> + [raw_message_data(connection_churn_rates, + pick_range(churn_rates, Ranges), Id)]. + +exchange_raw_detail_stats_data(Ranges, Id) -> + [raw_message_data2(channel_exchange_stats_fine_stats, + pick_range(fine_stats, Ranges), Key) + || Key <- get_table_keys(channel_exchange_stats_fine_stats, second(Id))] ++ + [raw_message_data2(queue_exchange_stats_publish, + pick_range(fine_stats, Ranges), Key) + || Key <- get_table_keys(queue_exchange_stats_publish, second(Id))]. + +channel_raw_detail_stats_data(Ranges, Id) -> + [raw_message_data2(channel_exchange_stats_fine_stats, + pick_range(fine_stats, Ranges), Key) + || Key <- get_table_keys(channel_exchange_stats_fine_stats, first(Id))] ++ + [raw_message_data2(channel_queue_stats_deliver_stats, + pick_range(fine_stats, Ranges), Key) + || Key <- get_table_keys(channel_queue_stats_deliver_stats, first(Id))]. + +raw_message_data2(Table, no_range, Id) -> + SmallSample = lookup_smaller_sample(Table, Id), + {{Table, Id}, {SmallSample, not_found}}; +raw_message_data2(Table, Range, Id) -> + SmallSample = lookup_smaller_sample(Table, Id), + Samples = lookup_samples(Table, Id, Range), + {{Table, Id}, {SmallSample, Samples}}. + +detail_queue_data(Ranges, Id) -> + maps:from_list(queue_raw_message_data(Ranges, Id) ++ + queue_raw_deliver_stats_data(Ranges, Id) ++ + [{queue_stats, lookup_element(queue_stats, Id)}, + {consumer_stats, get_queue_consumer_stats(Id)}]). + +list_queue_data(Ranges, Id) -> + maps:from_list(queue_raw_message_data(Ranges, Id) ++ + queue_raw_deliver_stats_data(Ranges, Id) ++ + [{queue_stats, lookup_element(queue_stats, Id)}]). + +detail_channel_data(Ranges, Id) -> + maps:from_list(channel_raw_message_data(Ranges, Id) ++ + channel_raw_detail_stats_data(Ranges, Id) ++ + [{channel_stats, lookup_element(channel_stats, Id)}, + {consumer_stats, get_consumer_stats(Id)}]). + +list_channel_data(Ranges, Id) -> + maps:from_list(channel_raw_message_data(Ranges, Id) ++ + channel_raw_detail_stats_data(Ranges, Id) ++ + [{channel_stats, lookup_element(channel_stats, Id)}]). + +-spec raw_message_data(atom(), maybe_range(), any()) -> + {atom(), {maybe_slide(), maybe_slide()}}. +raw_message_data(Table, no_range, Id) -> + SmallSample = lookup_smaller_sample(Table, Id), + {Table, {SmallSample, not_found}}; +raw_message_data(Table, Range, Id) -> + SmallSample = lookup_smaller_sample(Table, Id), + Samples = lookup_samples(Table, Id, Range), + {Table, {SmallSample, Samples}}. + +raw_all_message_data(Table, Range, VHosts) -> + SmallSample = lookup_all(Table, VHosts, select_smaller_sample(Table)), + RangeSample = case Range of + no_range -> not_found; + _ -> + lookup_all(Table, VHosts, select_range_sample(Table, + Range)) + end, + {Table, {SmallSample, RangeSample}}. + +get_queue_consumer_stats(Id) -> + Consumers = ets:select(consumer_stats, match_queue_consumer_spec(Id)), + [augment_consumer(C) || C <- Consumers]. + +get_consumer_stats(Id) -> + Consumers = ets:select(consumer_stats, match_consumer_spec(Id)), + [augment_consumer(C) || C <- Consumers]. + +count_created_stats(Type, all) -> + ets:info(Type, size); +count_created_stats(Type, User) -> + length(filter_user(created_stats(Type), User)). + +augment_consumer({{Q, Ch, CTag}, Props}) -> + [{queue, format_resource(Q)}, + {channel_details, augment_channel_pid(Ch)}, + {channel_pid, Ch}, + {consumer_tag, CTag} | Props]. + +consumers_by_vhost(VHost) -> + ets:select(consumer_stats, + [{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'}, + [{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}], + ['$_']}]). + +augment_msg_stats(Props) -> + augment_details(Props, []) ++ Props. + +augment_details([{_, none} | T], Acc) -> + augment_details(T, Acc); +augment_details([{_, unknown} | T], Acc) -> + augment_details(T, Acc); +augment_details([{connection, Value} | T], Acc) -> + augment_details(T, [{connection_details, augment_connection_pid(Value)} | Acc]); +augment_details([{channel, Value} | T], Acc) -> + augment_details(T, [{channel_details, augment_channel_pid(Value)} | Acc]); +augment_details([{owner_pid, Value} | T], Acc) -> + augment_details(T, [{owner_pid_details, augment_connection_pid(Value)} | Acc]); +augment_details([_ | T], Acc) -> + augment_details(T, Acc); +augment_details([], Acc) -> + Acc. + +augment_channel_pids(_Pid, ChPids) -> + lists:map(fun (ChPid) -> augment_channel_pid(ChPid) end, ChPids). + +augment_channel_pid(Pid) -> + Ch = lookup_element(channel_created_stats, Pid, 3), + Conn = lookup_element(connection_created_stats, pget(connection, Ch), 3), + case Conn of + [] -> %% If the connection has just been opened, we might not yet have the data + []; + _ -> + [{name, pget(name, Ch)}, + {pid, pget(pid, Ch)}, + {number, pget(number, Ch)}, + {user, pget(user, Ch)}, + {connection_name, pget(name, Conn)}, + {peer_port, pget(peer_port, Conn)}, + {peer_host, pget(peer_host, Conn)}] + end. + +augment_connection_pid(Pid) -> + Conn = lookup_element(connection_created_stats, Pid, 3), + case Conn of + [] -> %% If the connection has just been opened, we might not yet have the data + []; + _ -> + [{name, pget(name, Conn)}, + {peer_port, pget(peer_port, Conn)}, + {peer_host, pget(peer_host, Conn)}] + end. + +augmented_created_stats(_Pid, Key, Type) -> + case created_stats(Key, Type) of + not_found -> not_found; + S -> augment_msg_stats(S) + end. + +augmented_created_stats(_Pid, Type) -> + [ augment_msg_stats(S) || S <- created_stats(Type) ]. + +match_consumer_spec(Id) -> + [{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], ['$_']}]. + +match_queue_consumer_spec(Id) -> + [{{{'$1', '_', '_'}, '_'}, [{'==', {Id}, '$1'}], ['$_']}]. + +lookup_element(Table, Key) -> lookup_element(Table, Key, 2). + +lookup_element(Table, Key, Pos) -> + try ets:lookup_element(Table, Key, Pos) + catch error:badarg -> [] + end. + +-spec lookup_smaller_sample(atom(), any()) -> maybe_slide(). +lookup_smaller_sample(Table, Id) -> + case ets:lookup(Table, {Id, select_smaller_sample(Table)}) of + [] -> + not_found; + [{_, Slide}] -> + Slide1 = exometer_slide:optimize(Slide), + maybe_convert_for_compatibility(Table, Slide1) + end. + +-spec lookup_samples(atom(), any(), #range{}) -> maybe_slide(). +lookup_samples(Table, Id, Range) -> + case ets:lookup(Table, {Id, select_range_sample(Table, Range)}) of + [] -> + not_found; + [{_, Slide}] -> + Slide1 = exometer_slide:optimize(Slide), + maybe_convert_for_compatibility(Table, Slide1) + end. + +lookup_all(Table, Ids, SecondKey) -> + Slides = lists:foldl(fun(Id, Acc) -> + case ets:lookup(Table, {Id, SecondKey}) of + [] -> + Acc; + [{_, Slide}] -> + [Slide | Acc] + end + end, [], Ids), + case Slides of + [] -> + not_found; + _ -> + Slide = exometer_slide:sum(Slides, empty(Table, 0)), + maybe_convert_for_compatibility(Table, Slide) + end. + +maybe_convert_for_compatibility(Table, Slide) + when Table =:= channel_stats_fine_stats orelse + Table =:= channel_exchange_stats_fine_stats orelse + Table =:= vhost_stats_fine_stats -> + ConversionNeeded = rabbit_feature_flags:is_disabled( + drop_unroutable_metric), + case ConversionNeeded of + false -> + Slide; + true -> + %% drop_drop because the metric is named "drop_unroutable" + rabbit_mgmt_data_compat:drop_drop_unroutable_metric(Slide) + end; +maybe_convert_for_compatibility(Table, Slide) + when Table =:= channel_queue_stats_deliver_stats orelse + Table =:= channel_stats_deliver_stats orelse + Table =:= queue_stats_deliver_stats orelse + Table =:= vhost_stats_deliver_stats -> + ConversionNeeded = rabbit_feature_flags:is_disabled( + empty_basic_get_metric), + case ConversionNeeded of + false -> + Slide; + true -> + rabbit_mgmt_data_compat:drop_get_empty_queue_metric(Slide) + end; +maybe_convert_for_compatibility(_, Slide) -> + Slide. + +get_table_keys(Table, Id0) -> + ets:select(Table, match_spec_keys(Id0)). + +match_spec_keys(Id) -> + MatchCondition = to_match_condition(Id), + MatchHead = {{{'$1', '$2'}, '_'}, '_'}, + [{MatchHead, [MatchCondition], [{{'$1', '$2'}}]}]. + +to_match_condition({'_', Id1}) when is_tuple(Id1) -> + {'==', {Id1}, '$2'}; +to_match_condition({'_', Id1}) -> + {'==', Id1, '$2'}; +to_match_condition({Id0, '_'}) when is_tuple(Id0) -> + {'==', {Id0}, '$1'}; +to_match_condition({Id0, '_'}) -> + {'==', Id0, '$1'}. + +mgmt_queue_length_stats(Id) when Id =:= node() -> + GCsQueueLengths = lists:map(fun (T) -> + case whereis(rabbit_mgmt_metrics_gc:name(T)) of + P when is_pid(P) -> + {message_queue_len, Len} = + erlang:process_info(P, message_queue_len), + {T, Len}; + _ -> {T, 0} + end + end, + ?GC_EVENTS), + [{metrics_gc_queue_length, GCsQueueLengths}]; +mgmt_queue_length_stats(_Id) -> + % if it isn't for the current node just return an empty list + []. + +node_node_metrics() -> + maps:from_list(ets:tab2list(node_node_metrics)). + +select_range_sample(Table, #range{first = First, last = Last}) -> + Range = Last - First, + Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies), + Policy = retention_policy(Table), + [T | _] = TablePolicies = lists:sort(proplists:get_value(Policy, Policies)), + {_, Sample} = select_smallest_above(T, TablePolicies, Range), + Sample. + +select_smaller_sample(Table) -> + Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies), + Policy = retention_policy(Table), + TablePolicies = proplists:get_value(Policy, Policies), + [V | _] = lists:sort([I || {_, I} <- TablePolicies]), + V. + +select_smallest_above(V, [], _) -> + V; +select_smallest_above(_, [{H, _} = S | _T], Interval) when (H * 1000) > Interval -> + S; +select_smallest_above(_, [H | T], Interval) -> + select_smallest_above(H, T, Interval). + +pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) -> + RangeL; +pick_range(K, {_RangeL, RangeM, _RangeD, _RangeN}) when K == fine_stats; + K == deliver_get; + K == queue_msg_rates -> + RangeM; +pick_range(K, {_RangeL, _RangeM, RangeD, _RangeN}) when K == coarse_conn_stats; + K == process_stats -> + RangeD; +pick_range(K, {_RangeL, _RangeM, _RangeD, RangeN}) + when K == coarse_node_stats; + K == coarse_node_node_stats; + K == churn_rates -> + RangeN. + +first(Id) -> + {Id, '_'}. + +second(Id) -> + {'_', Id}. + +empty(Type, V) when Type =:= connection_stats_coarse_conn_stats; + Type =:= queue_msg_stats; + Type =:= vhost_msg_stats -> + {V, V, V}; +empty(Type, V) when Type =:= channel_stats_fine_stats; + Type =:= channel_exchange_stats_fine_stats; + Type =:= vhost_stats_fine_stats -> + {V, V, V, V}; +empty(Type, V) when Type =:= channel_queue_stats_deliver_stats; + Type =:= queue_stats_deliver_stats; + Type =:= vhost_stats_deliver_stats; + Type =:= channel_stats_deliver_stats -> + {V, V, V, V, V, V, V, V}; +empty(Type, V) when Type =:= channel_process_stats; + Type =:= queue_process_stats; + Type =:= queue_stats_publish; + Type =:= queue_exchange_stats_publish; + Type =:= exchange_stats_publish_out; + Type =:= exchange_stats_publish_in -> + {V}; +empty(node_coarse_stats, V) -> + {V, V, V, V, V, V, V, V}; +empty(node_persister_stats, V) -> + {V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V}; +empty(Type, V) when Type =:= node_node_coarse_stats; + Type =:= vhost_stats_coarse_conn_stats; + Type =:= queue_msg_rates; + Type =:= vhost_msg_rates -> + {V, V}; +empty(connection_churn_rates, V) -> + {V, V, V, V, V, V, V}. + +retention_policy(connection_stats_coarse_conn_stats) -> + basic; +retention_policy(channel_stats_fine_stats) -> + basic; +retention_policy(channel_queue_stats_deliver_stats) -> + detailed; +retention_policy(channel_exchange_stats_fine_stats) -> + detailed; +retention_policy(channel_process_stats) -> + basic; +retention_policy(vhost_stats_fine_stats) -> + global; +retention_policy(vhost_stats_deliver_stats) -> + global; +retention_policy(vhost_stats_coarse_conn_stats) -> + global; +retention_policy(vhost_msg_rates) -> + global; +retention_policy(channel_stats_deliver_stats) -> + basic; +retention_policy(queue_stats_deliver_stats) -> + basic; +retention_policy(queue_stats_publish) -> + basic; +retention_policy(queue_exchange_stats_publish) -> + basic; +retention_policy(exchange_stats_publish_out) -> + basic; +retention_policy(exchange_stats_publish_in) -> + basic; +retention_policy(queue_process_stats) -> + basic; +retention_policy(queue_msg_stats) -> + basic; +retention_policy(queue_msg_rates) -> + basic; +retention_policy(vhost_msg_stats) -> + global; +retention_policy(node_coarse_stats) -> + global; +retention_policy(node_persister_stats) -> + global; +retention_policy(node_node_coarse_stats) -> + global; +retention_policy(connection_churn_rates) -> + global. + +format_resource(unknown) -> unknown; +format_resource(Res) -> format_resource(name, Res). + +format_resource(_, unknown) -> + unknown; +format_resource(NameAs, #resource{name = Name, virtual_host = VHost}) -> + [{NameAs, Name}, {vhost, VHost}]. + +filter_user(List, #user{username = Username, tags = Tags}) -> + case is_monitor(Tags) of + true -> List; + false -> [I || I <- List, pget(user, I) == Username] + end. + +is_monitor(T) -> intersects(T, [administrator, monitoring]). +intersects(A, B) -> lists:any(fun(I) -> lists:member(I, B) end, A). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl new file mode 100644 index 0000000000..9fd127aff5 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl @@ -0,0 +1,80 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_data_compat). + +-export([fill_get_empty_queue_metric/1, + drop_get_empty_queue_metric/1, + fill_consumer_active_fields/1, + fill_drop_unroutable_metric/1, + drop_drop_unroutable_metric/1]). + +fill_get_empty_queue_metric(Slide) -> + exometer_slide:map( + fun + (Value) when is_tuple(Value) andalso size(Value) =:= 8 -> + Value; + (Value) when is_tuple(Value) andalso size(Value) =:= 7 -> + %% Inject a 0 for the new metric + list_to_tuple( + tuple_to_list(Value) ++ [0]); + (Value) -> + Value + end, Slide). + +drop_get_empty_queue_metric(Slide) -> + exometer_slide:map( + fun + (Value) when is_tuple(Value) andalso size(Value) =:= 8 -> + %% We want to remove the last element, which is + %% the count of basic.get on empty queues. + list_to_tuple( + lists:sublist( + tuple_to_list(Value), size(Value) - 1)); + (Value) when is_tuple(Value) andalso size(Value) =:= 7 -> + Value; + (Value) -> + Value + end, Slide). + +fill_drop_unroutable_metric(Slide) -> + exometer_slide:map( + fun + (Value) when is_tuple(Value) andalso size(Value) =:= 4 -> + Value; + (Value) when is_tuple(Value) andalso size(Value) =:= 3 -> + %% Inject a 0 + list_to_tuple( + tuple_to_list(Value) ++ [0]); + (Value) -> + Value + end, Slide). + +drop_drop_unroutable_metric(Slide) -> + exometer_slide:map( + fun + (Value) when is_tuple(Value) andalso size(Value) =:= 4 -> + %% Remove the last element. + list_to_tuple( + lists:sublist( + tuple_to_list(Value), size(Value) - 1)); + (Value) when is_tuple(Value) andalso size(Value) =:= 3 -> + Value; + (Value) -> + Value + end, Slide). + +fill_consumer_active_fields(ConsumersStats) -> + [case proplists:get_value(active, ConsumerStats) of + undefined -> + [{active, true}, + {activity_status, up} + | ConsumerStats]; + _ -> + ConsumerStats + end + || ConsumerStats <- ConsumersStats]. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl new file mode 100644 index 0000000000..c1e43223d7 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl @@ -0,0 +1,99 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_db_handler). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +%% Make sure our database is hooked in *before* listening on the network or +%% recovering queues (i.e. so there can't be any events fired before it starts). +-rabbit_boot_step({rabbit_mgmt_db_handler, + [{description, "management agent"}, + {mfa, {?MODULE, add_handler, []}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, rabbit_event}, + {enables, recovery}]}). + +-behaviour(gen_event). + +-export([add_handler/0, gc/0, rates_mode/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +add_handler() -> + ok = ensure_statistics_enabled(), + gen_event:add_handler(rabbit_event, ?MODULE, []). + +gc() -> + erlang:garbage_collect(whereis(rabbit_event)). + +rates_mode() -> + case rabbit_mgmt_agent_config:get_env(rates_mode) of + undefined -> basic; + Mode -> Mode + end. + +handle_force_fine_statistics() -> + case rabbit_mgmt_agent_config:get_env(force_fine_statistics) of + undefined -> + ok; + X -> + rabbit_log:warning( + "force_fine_statistics set to ~p; ignored.~n" + "Replaced by {rates_mode, none} in the rabbitmq_management " + "application.~n", [X]) + end. + +%%---------------------------------------------------------------------------- + +ensure_statistics_enabled() -> + ForceStats = rates_mode() =/= none, + handle_force_fine_statistics(), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), + rabbit_log:info("Management plugin: using rates mode '~p'~n", [rates_mode()]), + case {ForceStats, StatsLevel} of + {true, fine} -> + ok; + {true, _} -> + application:set_env(rabbit, collect_statistics, fine); + {false, none} -> + application:set_env(rabbit, collect_statistics, coarse); + {_, _} -> + ok + end, + ok = rabbit:force_event_refresh(erlang:make_ref()). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, []}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event(#event{type = Type} = Event, State) + when Type == connection_closed; Type == channel_closed; Type == queue_deleted; + Type == exchange_deleted; Type == vhost_deleted; + Type == consumer_deleted; Type == node_node_deleted; + Type == channel_consumer_deleted -> + gen_server:cast(rabbit_mgmt_metrics_gc:name(Type), {event, Event}), + {ok, State}; +handle_event(_, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl new file mode 100644 index 0000000000..5e92d8394c --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl @@ -0,0 +1,501 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_external_stats). + +%% Transitional step until we can require Erlang/OTP 21 and +%% use the now recommended try/catch syntax for obtaining the stack trace. +-compile(nowarn_deprecated_function). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([list_registry_plugins/1]). + +-import(rabbit_misc, [pget/2]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(METRICS_KEYS, [fd_used, sockets_used, mem_used, disk_free, proc_used, gc_num, + gc_bytes_reclaimed, context_switches]). + +-define(PERSISTER_KEYS, [persister_stats]). + +-define(OTHER_KEYS, [name, partitions, os_pid, fd_total, sockets_total, mem_limit, + mem_alarm, disk_free_limit, disk_free_alarm, proc_total, + rates_mode, uptime, run_queue, processors, exchange_types, + auth_mechanisms, applications, contexts, log_files, + db_dir, config_files, net_ticktime, enabled_plugins, + mem_calculation_strategy, ra_open_file_metrics]). + +-define(TEN_MINUTES_AS_SECONDS, 600). + +%%-------------------------------------------------------------------- + +-record(state, { + fd_total, + fhc_stats, + node_owners, + last_ts, + interval, + error_logged_time +}). + +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- + +get_used_fd(State0) -> + try + case get_used_fd(os:type(), State0) of + {State1, UsedFd} when is_number(UsedFd) -> + {State1, UsedFd}; + {State1, _Other} -> + %% Defaults to 0 if data is not available + {State1, 0} + end + catch + _:Error -> + State2 = log_fd_error("Could not infer the number of file handles used: ~p~n", [Error], State0), + {State2, 0} + end. + +get_used_fd({unix, linux}, State0) -> + case file:list_dir("/proc/" ++ os:getpid() ++ "/fd") of + {ok, Files} -> + {State0, length(Files)}; + {error, _} -> + get_used_fd({unix, generic}, State0) + end; + +get_used_fd({unix, BSD}, State0) + when BSD == openbsd; BSD == freebsd; BSD == netbsd -> + IsDigit = fun (D) -> lists:member(D, "0123456789*") end, + Output = os:cmd("fstat -p " ++ os:getpid()), + try + F = fun (Line) -> + lists:all(IsDigit, lists:nth(4, string:tokens(Line, " "))) + end, + UsedFd = length(lists:filter(F, string:tokens(Output, "\n"))), + {State0, UsedFd} + catch _:Error:Stacktrace -> + State1 = log_fd_error("Could not parse fstat output:~n~s~n~p~n", + [Output, {Error, Stacktrace}], State0), + {State1, 0} + end; + +get_used_fd({unix, _}, State0) -> + Cmd = rabbit_misc:format( + "lsof -d \"0-9999999\" -lna -p ~s || echo failed", [os:getpid()]), + Res = os:cmd(Cmd), + case string:right(Res, 7) of + "failed\n" -> + State1 = log_fd_error("Could not obtain lsof output~n", [], State0), + {State1, 0}; + _ -> + UsedFd = string:words(Res, $\n) - 1, + {State0, UsedFd} + end; + +%% handle.exe can be obtained from +%% https://technet.microsoft.com/en-us/sysinternals/bb896655.aspx + +%% Output looks like: + +%% Handle v3.42 +%% Copyright (C) 1997-2008 Mark Russinovich +%% Sysinternals - www.sysinternals.com +%% +%% Handle type summary: +%% ALPC Port : 2 +%% Desktop : 1 +%% Directory : 1 +%% Event : 108 +%% File : 25 +%% IoCompletion : 3 +%% Key : 7 +%% KeyedEvent : 1 +%% Mutant : 1 +%% Process : 3 +%% Process : 38 +%% Thread : 41 +%% Timer : 3 +%% TpWorkerFactory : 2 +%% WindowStation : 2 +%% Total handles: 238 + +%% Nthandle v4.22 - Handle viewer +%% Copyright (C) 1997-2019 Mark Russinovich +%% Sysinternals - www.sysinternals.com +%% +%% Handle type summary: +%% <Unknown type> : 1 +%% <Unknown type> : 166 +%% ALPC Port : 11 +%% Desktop : 1 +%% Directory : 2 +%% Event : 226 +%% File : 122 +%% IoCompletion : 8 +%% IRTimer : 6 +%% Key : 42 +%% Mutant : 7 +%% Process : 3 +%% Section : 2 +%% Semaphore : 43 +%% Thread : 36 +%% TpWorkerFactory : 3 +%% WaitCompletionPacket: 25 +%% WindowStation : 2 +%% Total handles: 706 + +%% Note that the "File" number appears to include network sockets too; I assume +%% that's the number we care about. Note also that if you omit "-s" you will +%% see a list of file handles *without* network sockets. If you then add "-a" +%% you will see a list of handles of various types, including network sockets +%% shown as file handles to \Device\Afd. + +get_used_fd({win32, _}, State0) -> + Handle = rabbit_misc:os_cmd( + "handle.exe /accepteula -s -p " ++ os:getpid() ++ " 2> nul"), + case Handle of + [] -> + State1 = log_fd_error("Could not find handle.exe, please install from sysinternals~n", [], State0), + {State1, 0}; + _ -> + case find_files_line(string:tokens(Handle, "\r\n")) of + unknown -> + State1 = log_fd_error("handle.exe output did not contain " + "a line beginning with ' File ', unable " + "to determine used file descriptor " + "count: ~p~n", [Handle], State0), + {State1, 0}; + UsedFd -> + {State0, UsedFd} + end + end. + +find_files_line([]) -> + unknown; +find_files_line([" File " ++ Rest | _T]) -> + [Files] = string:tokens(Rest, ": "), + list_to_integer(Files); +find_files_line([_H | T]) -> + find_files_line(T). + +-define(SAFE_CALL(Fun, NoProcFailResult), + try + Fun + catch exit:{noproc, _} -> NoProcFailResult + end). + +get_disk_free_limit() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free_limit(), + disk_free_monitoring_disabled). + +get_disk_free() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free(), + disk_free_monitoring_disabled). + +log_fd_error(Fmt, Args, #state{error_logged_time = undefined}=State) -> + % rabbitmq/rabbitmq-management#90 + % no errors have been logged, so log it and make a note of when + Now = erlang:monotonic_time(second), + ok = rabbit_log:error(Fmt, Args), + State#state{error_logged_time = Now}; +log_fd_error(Fmt, Args, #state{error_logged_time = Time}=State) -> + Now = erlang:monotonic_time(second), + case Now >= Time + ?TEN_MINUTES_AS_SECONDS of + true -> + % rabbitmq/rabbitmq-management#90 + % it has been longer than 10 minutes, + % re-log the error + ok = rabbit_log:error(Fmt, Args), + State#state{error_logged_time = Now}; + _ -> + % 10 minutes have not yet passed + State + end. +%%-------------------------------------------------------------------- + +infos([], Acc, State) -> + {State, lists:reverse(Acc)}; +infos([Item|T], Acc0, State0) -> + {State1, Infos} = i(Item, State0), + Acc1 = [{Item, Infos}|Acc0], + infos(T, Acc1, State1). + +i(name, State) -> + {State, node()}; +i(partitions, State) -> + {State, rabbit_node_monitor:partitions()}; +i(fd_used, State) -> + get_used_fd(State); +i(fd_total, #state{fd_total = FdTotal}=State) -> + {State, FdTotal}; +i(sockets_used, State) -> + {State, proplists:get_value(sockets_used, file_handle_cache:info([sockets_used]))}; +i(sockets_total, State) -> + {State, proplists:get_value(sockets_limit, file_handle_cache:info([sockets_limit]))}; +i(os_pid, State) -> + {State, list_to_binary(os:getpid())}; +i(mem_used, State) -> + {State, vm_memory_monitor:get_process_memory()}; +i(mem_calculation_strategy, State) -> + {State, vm_memory_monitor:get_memory_calculation_strategy()}; +i(mem_limit, State) -> + {State, vm_memory_monitor:get_memory_limit()}; +i(mem_alarm, State) -> + {State, resource_alarm_set(memory)}; +i(proc_used, State) -> + {State, erlang:system_info(process_count)}; +i(proc_total, State) -> + {State, erlang:system_info(process_limit)}; +i(run_queue, State) -> + {State, erlang:statistics(run_queue)}; +i(processors, State) -> + {State, erlang:system_info(logical_processors)}; +i(disk_free_limit, State) -> + {State, get_disk_free_limit()}; +i(disk_free, State) -> + {State, get_disk_free()}; +i(disk_free_alarm, State) -> + {State, resource_alarm_set(disk)}; +i(contexts, State) -> + {State, rabbit_web_dispatch_contexts()}; +i(uptime, State) -> + {Total, _} = erlang:statistics(wall_clock), + {State, Total}; +i(rates_mode, State) -> + {State, rabbit_mgmt_db_handler:rates_mode()}; +i(exchange_types, State) -> + {State, list_registry_plugins(exchange)}; +i(log_files, State) -> + {State, [list_to_binary(F) || F <- rabbit:log_locations()]}; +i(db_dir, State) -> + {State, list_to_binary(rabbit_mnesia:dir())}; +i(config_files, State) -> + {State, [list_to_binary(F) || F <- rabbit:config_files()]}; +i(net_ticktime, State) -> + {State, net_kernel:get_net_ticktime()}; +i(persister_stats, State) -> + {State, persister_stats(State)}; +i(enabled_plugins, State) -> + {ok, Dir} = application:get_env(rabbit, enabled_plugins_file), + {State, rabbit_plugins:read_enabled(Dir)}; +i(auth_mechanisms, State) -> + {ok, Mechanisms} = application:get_env(rabbit, auth_mechanisms), + F = fun (N) -> + lists:member(list_to_atom(binary_to_list(N)), Mechanisms) + end, + {State, list_registry_plugins(auth_mechanism, F)}; +i(applications, State) -> + {State, [format_application(A) || A <- lists:keysort(1, rabbit_misc:which_applications())]}; +i(gc_num, State) -> + {GCs, _, _} = erlang:statistics(garbage_collection), + {State, GCs}; +i(gc_bytes_reclaimed, State) -> + {_, Words, _} = erlang:statistics(garbage_collection), + {State, Words * erlang:system_info(wordsize)}; +i(context_switches, State) -> + {Sw, 0} = erlang:statistics(context_switches), + {State, Sw}; +i(ra_open_file_metrics, State) -> + {State, [{ra_log_wal, ra_metrics(ra_log_wal)}, + {ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}. + +ra_metrics(K) -> + try + case ets:lookup(ra_open_file_metrics, whereis(K)) of + [] -> 0; + [{_, C}] -> C + end + catch + error:badarg -> + %% On startup the mgmt might start before ra does + 0 + end. + +resource_alarm_set(Source) -> + lists:member({{resource_limit, Source, node()},[]}, + rabbit_alarm:get_alarms()). + +list_registry_plugins(Type) -> + list_registry_plugins(Type, fun(_) -> true end). + +list_registry_plugins(Type, Fun) -> + [registry_plugin_enabled(set_plugin_name(Name, Module), Fun) || + {Name, Module} <- rabbit_registry:lookup_all(Type)]. + +registry_plugin_enabled(Desc, Fun) -> + Desc ++ [{enabled, Fun(proplists:get_value(name, Desc))}]. + +format_application({Application, Description, Version}) -> + [{name, Application}, + {description, list_to_binary(Description)}, + {version, list_to_binary(Version)}]. + +set_plugin_name(Name, Module) -> + [{name, list_to_binary(atom_to_list(Name))} | + proplists:delete(name, Module:description())]. + +persister_stats(#state{fhc_stats = FHC}) -> + [{flatten_key(K), V} || {{_Op, _Type} = K, V} <- FHC]. + +flatten_key({A, B}) -> + list_to_atom(atom_to_list(A) ++ "_" ++ atom_to_list(B)). + +cluster_links() -> + {ok, Items} = net_kernel:nodes_info(), + [Link || Item <- Items, + Link <- [format_nodes_info(Item)], Link =/= undefined]. + +format_nodes_info({Node, Info}) -> + Owner = proplists:get_value(owner, Info), + case catch process_info(Owner, links) of + {links, Links} -> + case [Link || Link <- Links, is_port(Link)] of + [Port] -> + {Node, Owner, format_nodes_info1(Port)}; + _ -> + undefined + end; + _ -> + undefined + end. + +format_nodes_info1(Port) -> + case {rabbit_net:socket_ends(Port, inbound), + rabbit_net:getstat(Port, [recv_oct, send_oct])} of + {{ok, {PeerAddr, PeerPort, SockAddr, SockPort}}, {ok, Stats}} -> + [{peer_addr, maybe_ntoab(PeerAddr)}, + {peer_port, PeerPort}, + {sock_addr, maybe_ntoab(SockAddr)}, + {sock_port, SockPort}, + {recv_bytes, pget(recv_oct, Stats)}, + {send_bytes, pget(send_oct, Stats)}]; + _ -> + [] + end. + +maybe_ntoab(A) when is_tuple(A) -> list_to_binary(rabbit_misc:ntoab(A)); +maybe_ntoab(H) -> H. + +%%-------------------------------------------------------------------- + +%% This is slightly icky in that we introduce knowledge of +%% rabbit_web_dispatch, which is not a dependency. But the last thing I +%% want to do is create a rabbitmq_mochiweb_management_agent plugin. +rabbit_web_dispatch_contexts() -> + [format_context(C) || C <- rabbit_web_dispatch_registry_list_all()]. + +%% For similar reasons we don't declare a dependency on +%% rabbitmq_mochiweb - so at startup there's no guarantee it will be +%% running. So we have to catch this noproc. +rabbit_web_dispatch_registry_list_all() -> + case code:is_loaded(rabbit_web_dispatch_registry) of + false -> []; + _ -> try + M = rabbit_web_dispatch_registry, %% Fool xref + M:list_all() + catch exit:{noproc, _} -> + [] + end + end. + +format_context({Path, Description, Rest}) -> + [{description, list_to_binary(Description)}, + {path, list_to_binary("/" ++ Path)} | + format_mochiweb_option_list(Rest)]. + +format_mochiweb_option_list(C) -> + [{K, format_mochiweb_option(K, V)} || {K, V} <- C]. + +format_mochiweb_option(ssl_opts, V) -> + format_mochiweb_option_list(V); +format_mochiweb_option(_K, V) -> + case io_lib:printable_list(V) of + true -> list_to_binary(V); + false -> list_to_binary(rabbit_misc:format("~w", [V])) + end. + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), + State = #state{fd_total = file_handle_cache:ulimit(), + fhc_stats = get_fhc_stats(), + node_owners = sets:new(), + interval = Interval}, + %% We can update stats straight away as they need to be available + %% when the mgmt plugin starts a collector + {ok, emit_update(State)}. + +handle_call(_Req, _From, State) -> + {reply, unknown_request, State}. + +handle_cast(_C, State) -> + {noreply, State}. + +handle_info(emit_update, State) -> + {noreply, emit_update(State)}; + +handle_info(_I, State) -> + {noreply, State}. + +terminate(_, _) -> ok. + +code_change(_, State, _) -> {ok, State}. + +%%-------------------------------------------------------------------- + +emit_update(State0) -> + State1 = update_state(State0), + {State2, MStats} = infos(?METRICS_KEYS, [], State1), + {State3, PStats} = infos(?PERSISTER_KEYS, [], State2), + {State4, OStats} = infos(?OTHER_KEYS, [], State3), + [{persister_stats, PStats0}] = PStats, + [{name, _Name} | OStats0] = OStats, + rabbit_core_metrics:node_stats(persister_metrics, PStats0), + rabbit_core_metrics:node_stats(coarse_metrics, MStats), + rabbit_core_metrics:node_stats(node_metrics, OStats0), + rabbit_event:notify(node_stats, PStats ++ MStats ++ OStats), + erlang:send_after(State4#state.interval, self(), emit_update), + emit_node_node_stats(State4). + +emit_node_node_stats(State = #state{node_owners = Owners}) -> + Links = cluster_links(), + NewOwners = sets:from_list([{Node, Owner} || {Node, Owner, _} <- Links]), + Dead = sets:to_list(sets:subtract(Owners, NewOwners)), + [rabbit_event:notify( + node_node_deleted, [{route, Route}]) || {Node, _Owner} <- Dead, + Route <- [{node(), Node}, + {Node, node()}]], + [begin + rabbit_core_metrics:node_node_stats({node(), Node}, Stats), + rabbit_event:notify( + node_node_stats, [{route, {node(), Node}} | Stats]) + end || {Node, _Owner, Stats} <- Links], + State#state{node_owners = NewOwners}. + +update_state(State0) -> + %% Store raw data, the average operation time is calculated during querying + %% from the accumulated total + FHC = get_fhc_stats(), + State0#state{fhc_stats = FHC}. + +get_fhc_stats() -> + dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end, + dict:from_list(file_handle_cache_stats:get()), + dict:from_list(get_ra_io_metrics()))). + +get_ra_io_metrics() -> + lists:sort(ets:tab2list(ra_io_metrics)). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl new file mode 100644 index 0000000000..c8173c1244 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl @@ -0,0 +1,20 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_ff). + +-rabbit_feature_flag( + {empty_basic_get_metric, + #{desc => "Count AMQP `basic.get` on empty queues in stats", + stability => stable + }}). + +-rabbit_feature_flag( + {drop_unroutable_metric, + #{desc => "Count unroutable publishes to be dropped in stats", + stability => stable + }}). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl new file mode 100644 index 0000000000..4c9e8c189f --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl @@ -0,0 +1,559 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mgmt_format). + +-export([format/2, ip/1, ipb/1, amqp_table/1, tuple/1]). +-export([parameter/1, now_to_str/1, now_to_str_ms/1, strip_pids/1]). +-export([protocol/1, resource/1, queue/1, queue_state/1, queue_info/1]). +-export([exchange/1, user/1, internal_user/1, binding/1, url/2]). +-export([pack_binding_props/2, tokenise/1]). +-export([to_amqp_table/1, listener/1, web_context/1, properties/1, basic_properties/1]). +-export([record/2, to_basic_properties/1]). +-export([addr/1, port/1]). +-export([format_nulls/1, escape_html_tags/1]). +-export([print/2, print/1]). + +-export([format_queue_stats/1, format_channel_stats/1, + format_consumer_arguments/1, + format_connection_created/1, + format_accept_content/1, format_args/1]). + +-export([strip_queue_pids/1]). + +-export([clean_consumer_details/1, clean_channel_details/1]). + +-export([args_hash/1]). + +-import(rabbit_misc, [pget/2, pget/3, pset/3]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). + +%%-------------------------------------------------------------------- + +format(Stats, {[], _}) -> + [Stat || {_Name, Value} = Stat <- Stats, Value =/= unknown]; +format(Stats, {Fs, true}) -> + [Fs(Stat) || {_Name, Value} = Stat <- Stats, Value =/= unknown]; +format(Stats, {Fs, false}) -> + lists:concat([Fs(Stat) || {_Name, Value} = Stat <- Stats, + Value =/= unknown]). + +format_queue_stats({reductions, _}) -> + []; +format_queue_stats({exclusive_consumer_pid, _}) -> + []; +format_queue_stats({single_active_consumer_pid, _}) -> + []; +format_queue_stats({slave_pids, ''}) -> + []; +format_queue_stats({slave_pids, Pids}) -> + [{slave_nodes, [node(Pid) || Pid <- Pids]}]; +format_queue_stats({leader, Leader}) -> + [{node, Leader}]; +format_queue_stats({synchronised_slave_pids, ''}) -> + []; +format_queue_stats({effective_policy_definition, []}) -> + [{effective_policy_definition, #{}}]; +format_queue_stats({synchronised_slave_pids, Pids}) -> + [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]}]; +format_queue_stats({backing_queue_status, Value}) -> + [{backing_queue_status, properties(Value)}]; +format_queue_stats({idle_since, Value}) -> + [{idle_since, now_to_str(Value)}]; +format_queue_stats({state, Value}) -> + queue_state(Value); +format_queue_stats({disk_reads, _}) -> + []; +format_queue_stats({disk_writes, _}) -> + []; +format_queue_stats(Stat) -> + [Stat]. + +format_channel_stats([{idle_since, Value} | Rest]) -> + [{idle_since, now_to_str(Value)} | Rest]; +format_channel_stats(Stats) -> + Stats. + +%% Conerts an HTTP API request payload value +%% to AMQP 0-9-1 arguments table +format_args({arguments, []}) -> + {arguments, []}; +format_args({arguments, Value}) -> + {arguments, to_amqp_table(Value)}; +format_args(Stat) -> + Stat. + +format_connection_created({host, Value}) -> + {host, addr(Value)}; +format_connection_created({peer_host, Value}) -> + {peer_host, addr(Value)}; +format_connection_created({port, Value}) -> + {port, port(Value)}; +format_connection_created({peer_port, Value}) -> + {peer_port, port(Value)}; +format_connection_created({protocol, Value}) -> + {protocol, protocol(Value)}; +format_connection_created({client_properties, Value}) -> + {client_properties, amqp_table(Value)}; +format_connection_created(Stat) -> + Stat. + +format_exchange_and_queue({policy, Value}) -> + policy(Value); +format_exchange_and_queue({arguments, Value}) -> + [{arguments, amqp_table(Value)}]; +format_exchange_and_queue({name, Value}) -> + resource(Value); +format_exchange_and_queue(Stat) -> + [Stat]. + +format_binding({source, Value}) -> + resource(source, Value); +format_binding({arguments, Value}) -> + [{arguments, amqp_table(Value)}]; +format_binding(Stat) -> + [Stat]. + +format_basic_properties({headers, Value}) -> + {headers, amqp_table(Value)}; +format_basic_properties(Stat) -> + Stat. + +format_accept_content({durable, Value}) -> + {durable, parse_bool(Value)}; +format_accept_content({auto_delete, Value}) -> + {auto_delete, parse_bool(Value)}; +format_accept_content({internal, Value}) -> + {internal, parse_bool(Value)}; +format_accept_content(Stat) -> + Stat. + +print(Fmt, Val) when is_list(Val) -> + list_to_binary(lists:flatten(io_lib:format(Fmt, Val))); +print(Fmt, Val) -> + print(Fmt, [Val]). + +print(Val) when is_list(Val) -> + list_to_binary(lists:flatten(Val)); +print(Val) -> + Val. + +%% TODO - can we remove all these "unknown" cases? Coverage never hits them. + +ip(unknown) -> unknown; +ip(IP) -> list_to_binary(rabbit_misc:ntoa(IP)). + +ipb(unknown) -> unknown; +ipb(IP) -> list_to_binary(rabbit_misc:ntoab(IP)). + +addr(S) when is_list(S); is_atom(S); is_binary(S) -> print("~s", S); +addr(Addr) when is_tuple(Addr) -> ip(Addr). + +port(Port) when is_number(Port) -> Port; +port(Port) -> print("~w", Port). + +properties(unknown) -> unknown; +properties(Table) -> maps:from_list([{Name, tuple(Value)} || + {Name, Value} <- Table]). + +amqp_table(Value) -> rabbit_misc:amqp_table(Value). + +parameter(P) -> pset(value, pget(value, P), P). + +tuple(unknown) -> unknown; +tuple(Tuple) when is_tuple(Tuple) -> [tuple(E) || E <- tuple_to_list(Tuple)]; +tuple(Term) -> Term. + +protocol(unknown) -> + unknown; +protocol(Version = {_Major, _Minor, _Revision}) -> + protocol({'AMQP', Version}); +protocol({Family, Version}) -> + print("~s ~s", [Family, protocol_version(Version)]). + +protocol_version(Arbitrary) + when is_list(Arbitrary) -> Arbitrary; +protocol_version({Major, Minor}) -> io_lib:format("~B-~B", [Major, Minor]); +protocol_version({Major, Minor, 0}) -> protocol_version({Major, Minor}); +protocol_version({Major, Minor, Revision}) -> io_lib:format("~B-~B-~B", + [Major, Minor, Revision]). + +now_to_str(unknown) -> + unknown; +now_to_str(MilliSeconds) -> + BaseDate = calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, + {0, 0, 0}}), + Seconds = BaseDate + (MilliSeconds div 1000), + {{Y, M, D}, {H, Min, S}} = calendar:gregorian_seconds_to_datetime(Seconds), + print("~w-~2.2.0w-~2.2.0w ~w:~2.2.0w:~2.2.0w", [Y, M, D, H, Min, S]). + +now_to_str_ms(unknown) -> + unknown; +now_to_str_ms(MilliSeconds) -> + print("~s:~3.3.0w", [now_to_str(MilliSeconds), MilliSeconds rem 1000]). + +resource(unknown) -> unknown; +resource(Res) -> resource(name, Res). + +resource(_, unknown) -> + unknown; +resource(NameAs, #resource{name = Name, virtual_host = VHost}) -> + [{NameAs, Name}, {vhost, VHost}]. + +policy('') -> []; +policy(Policy) -> [{policy, Policy}]. + +internal_user(User) -> + [{name, internal_user:get_username(User)}, + {password_hash, base64:encode(internal_user:get_password_hash(User))}, + {hashing_algorithm, rabbit_auth_backend_internal:hashing_module_for_user( + User)}, + {tags, tags(internal_user:get_tags(User))}, + {limits, internal_user:get_limits(User)}]. + +user(User) -> + [{name, User#user.username}, + {tags, tags(User#user.tags)}]. + +tags(Tags) -> + list_to_binary(string:join([atom_to_list(T) || T <- Tags], ",")). + +listener(#listener{node = Node, protocol = Protocol, + ip_address = IPAddress, port = Port, opts=Opts}) -> + [{node, Node}, + {protocol, Protocol}, + {ip_address, ip(IPAddress)}, + {port, Port}, + {socket_opts, format_socket_opts(Opts)}]. + +web_context(Props0) -> + SslOpts = pget(ssl_opts, Props0, []), + Props = proplists:delete(ssl_opts, Props0), + [{ssl_opts, format_socket_opts(SslOpts)} | Props]. + +format_socket_opts(Opts) -> + format_socket_opts(Opts, []). + +format_socket_opts([], Acc) -> + lists:reverse(Acc); +%% for HTTP API listeners this will be included into +%% socket_opts +format_socket_opts([{ssl_opts, Value} | Tail], Acc) -> + format_socket_opts(Tail, [{ssl_opts, format_socket_opts(Value)} | Acc]); +%% exclude options that have values that are nested +%% data structures or may include functions. They are fairly +%% obscure and not worth reporting via HTTP API. +format_socket_opts([{verify_fun, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([{crl_cache, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([{partial_chain, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([{user_lookup_fun, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([{sni_fun, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +%% we do not report SNI host details in the UI, +%% so skip this option and avoid some recursive formatting +%% complexity +format_socket_opts([{sni_hosts, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([{reuse_session, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +%% we do not want to report configured cipher suites, even +%% though formatting them is straightforward +format_socket_opts([{ciphers, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +%% single atom options, e.g. `binary` +format_socket_opts([Head | Tail], Acc) when is_atom(Head) -> + format_socket_opts(Tail, [{Head, true} | Acc]); +%% verify_fun value is a tuple that includes a function +format_socket_opts([_Head = {verify_fun, _Value} | Tail], Acc) -> + format_socket_opts(Tail, Acc); +format_socket_opts([Head = {Name, Value} | Tail], Acc) when is_list(Value) -> + case io_lib:printable_unicode_list(Value) of + true -> format_socket_opts(Tail, [{Name, unicode:characters_to_binary(Value)} | Acc]); + false -> format_socket_opts(Tail, [Head | Acc]) + end; +format_socket_opts([{Name, Value} | Tail], Acc) when is_tuple(Value) -> + format_socket_opts(Tail, [{Name, tuple_to_list(Value)} | Acc]); +%% exclude functions from JSON encoding +format_socket_opts([_Head = {_Name, Value} | Tail], Acc) when is_function(Value) -> + format_socket_opts(Tail, Acc); +format_socket_opts([Head | Tail], Acc) -> + format_socket_opts(Tail, [Head | Acc]). + +pack_binding_props(<<"">>, []) -> + <<"~">>; +pack_binding_props(Key, []) -> + list_to_binary(quote_binding(Key)); +pack_binding_props(Key, Args) -> + ArgsEnc = args_hash(Args), + list_to_binary(quote_binding(Key) ++ "~" ++ quote_binding(ArgsEnc)). + +quote_binding(Name) -> + re:replace(rabbit_http_util:quote_plus(Name), "~", "%7E", [global]). + +%% Unfortunately string:tokens("foo~~bar", "~"). -> ["foo","bar"], we lose +%% the fact that there's a double ~. +tokenise("") -> + []; +tokenise(Str) -> + Count = string:cspan(Str, "~"), + case length(Str) of + Count -> [Str]; + _ -> [string:sub_string(Str, 1, Count) | + tokenise(string:sub_string(Str, Count + 2))] + end. + +to_amqp_table(V) -> rabbit_misc:to_amqp_table(V). + +url(Fmt, Vals) -> + print(Fmt, [rabbit_http_util:quote_plus(V) || V <- Vals]). + +exchange(X) -> + format(X, {fun format_exchange_and_queue/1, false}). + +%% We get queues using rabbit_amqqueue:list/1 rather than :info_all/1 since +%% the latter wakes up each queue. Therefore we have a record rather than a +%% proplist to deal with. +queue(Q) when ?is_amqqueue(Q) -> + Name = amqqueue:get_name(Q), + Durable = amqqueue:is_durable(Q), + AutoDelete = amqqueue:is_auto_delete(Q), + ExclusiveOwner = amqqueue:get_exclusive_owner(Q), + Arguments = amqqueue:get_arguments(Q), + Pid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + %% TODO: in the future queue types should be registered with their + %% full and short names and this hard-coded translation should not be + %% necessary + Type = case amqqueue:get_type(Q) of + rabbit_classic_queue -> classic; + rabbit_quorum_queue -> quorum; + T -> T + end, + format( + [{name, Name}, + {durable, Durable}, + {auto_delete, AutoDelete}, + {exclusive, is_pid(ExclusiveOwner)}, + {owner_pid, ExclusiveOwner}, + {arguments, Arguments}, + {pid, Pid}, + {type, Type}, + {state, State}] ++ rabbit_amqqueue:format(Q), + {fun format_exchange_and_queue/1, false}). + +queue_info(List) -> + format(List, {fun format_exchange_and_queue/1, false}). + +queue_state({syncing, Msgs}) -> [{state, syncing}, + {sync_messages, Msgs}]; +queue_state({terminated_by, Name}) -> + [{state, terminated}, + {terminated_by, Name}]; +queue_state(Status) -> [{state, Status}]. + +%% We get bindings using rabbit_binding:list_*/1 rather than :info_all/1 since +%% there are no per-exchange / queue / etc variants for the latter. Therefore +%% we have a record rather than a proplist to deal with. +binding(#binding{source = S, + key = Key, + destination = D, + args = Args}) -> + format( + [{source, S}, + {destination, D#resource.name}, + {destination_type, D#resource.kind}, + {routing_key, Key}, + {arguments, Args}, + {properties_key, pack_binding_props(Key, Args)}], + {fun format_binding/1, false}). + +basic_properties(Props = #'P_basic'{}) -> + Res = record(Props, record_info(fields, 'P_basic')), + format(Res, {fun format_basic_properties/1, true}). + +record(Record, Fields) -> + {Res, _Ix} = lists:foldl(fun (K, {L, Ix}) -> + {case element(Ix, Record) of + undefined -> L; + V -> [{K, V}|L] + end, Ix + 1} + end, {[], 2}, Fields), + Res. + +to_basic_properties(Props) when is_map(Props) -> + E = fun err/2, + Fmt = fun (headers, H) -> to_amqp_table(H); + (delivery_mode, V) when is_integer(V) -> V; + (delivery_mode, _V) -> E(not_int,delivery_mode); + (priority, V) when is_integer(V) -> V; + (priority, _V) -> E(not_int, priority); + (timestamp, V) when is_integer(V) -> V; + (timestamp, _V) -> E(not_int, timestamp); + (_, V) when is_binary(V) -> V; + (K, _V) -> E(not_string, K) + end, + {Res, _Ix} = lists:foldl( + fun (K, {P, Ix}) -> + {case maps:get(a2b(K), Props, undefined) of + undefined -> P; + V -> setelement(Ix, P, Fmt(K, V)) + end, Ix + 1} + end, {#'P_basic'{}, 2}, + record_info(fields, 'P_basic')), + Res. + +-spec err(term(), term()) -> no_return(). +err(A, B) -> + throw({error, {A, B}}). + +a2b(A) -> + list_to_binary(atom_to_list(A)). + +strip_queue_pids(Item) -> + strip_queue_pids(Item, []). + +strip_queue_pids([{_, unknown} | T], Acc) -> + strip_queue_pids(T, Acc); +strip_queue_pids([{pid, Pid} | T], Acc0) when is_pid(Pid) -> + Acc = case proplists:is_defined(node, Acc0) of + false -> [{node, node(Pid)} | Acc0]; + true -> Acc0 + end, + strip_queue_pids(T, Acc); +strip_queue_pids([{pid, _} | T], Acc) -> + strip_queue_pids(T, Acc); +strip_queue_pids([{owner_pid, _} | T], Acc) -> + strip_queue_pids(T, Acc); +strip_queue_pids([Any | T], Acc) -> + strip_queue_pids(T, [Any | Acc]); +strip_queue_pids([], Acc) -> + Acc. + +%% Items can be connections, channels, consumers or queues, hence remove takes +%% various items. +strip_pids(Item = [T | _]) when is_tuple(T) -> + lists:usort(strip_pids(Item, [])); + +strip_pids(Items) -> [lists:usort(strip_pids(I)) || I <- Items]. + +strip_pids([{_, unknown} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{pid, Pid} | T], Acc) when is_pid(Pid) -> + strip_pids(T, [{node, node(Pid)} | Acc]); +strip_pids([{pid, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{connection, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{owner_pid, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{channel, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{channel_pid, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{exclusive_consumer_pid, _} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{slave_pids, ''} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{slave_pids, Pids} | T], Acc) -> + strip_pids(T, [{slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]); +strip_pids([{synchronised_slave_pids, ''} | T], Acc) -> + strip_pids(T, Acc); +strip_pids([{synchronised_slave_pids, Pids} | T], Acc) -> + strip_pids(T, [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]); +strip_pids([{K, [P|_] = Nested} | T], Acc) when is_tuple(P) -> % recurse + strip_pids(T, [{K, strip_pids(Nested)} | Acc]); +strip_pids([{K, [L|_] = Nested} | T], Acc) when is_list(L) -> % recurse + strip_pids(T, [{K, strip_pids(Nested)} | Acc]); +strip_pids([Any | T], Acc) -> + strip_pids(T, [Any | Acc]); +strip_pids([], Acc) -> + Acc. + +%% Format for JSON replies. Transforms '' into null +format_nulls(Items) when is_list(Items) -> + [format_null_item(Pair) || Pair <- Items]; +format_nulls(Item) -> + format_null_item(Item). + +format_null_item({Key, ''}) -> + {Key, null}; +format_null_item({Key, Value}) when is_list(Value) -> + {Key, format_nulls(Value)}; +format_null_item({Key, Value}) -> + {Key, Value}; +format_null_item([{_K, _V} | _T] = L) -> + format_nulls(L); +format_null_item(Value) -> + Value. + + +-spec escape_html_tags(string()) -> binary(). + +escape_html_tags(S) -> + escape_html_tags(rabbit_data_coercion:to_list(S), []). + + +-spec escape_html_tags(string(), string()) -> binary(). + +escape_html_tags([], Acc) -> + rabbit_data_coercion:to_binary(lists:reverse(Acc)); +escape_html_tags("<" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse("<", Acc)); +escape_html_tags(">" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse(">", Acc)); +escape_html_tags("&" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse("&", Acc)); +escape_html_tags([C | Rest], Acc) -> + escape_html_tags(Rest, [C | Acc]). + + +-spec clean_consumer_details(proplists:proplist()) -> proplists:proplist(). +clean_consumer_details(Obj) -> + case pget(consumer_details, Obj) of + undefined -> Obj; + Cds -> + Cons = [format_consumer_arguments(clean_channel_details(Con)) || Con <- Cds], + pset(consumer_details, Cons, Obj) + end. + +-spec clean_channel_details(proplists:proplist()) -> proplists:proplist(). +clean_channel_details(Obj) -> + Obj0 = lists:keydelete(channel_pid, 1, Obj), + case pget(channel_details, Obj0) of + undefined -> Obj0; + Chd -> + pset(channel_details, + lists:keydelete(pid, 1, Chd), + Obj0) + end. + +-spec format_consumer_arguments(proplists:proplist()) -> proplists:proplist(). +format_consumer_arguments(Obj) -> + case pget(arguments, Obj) of + undefined -> Obj; + #{} -> Obj; + [] -> pset(arguments, #{}, Obj); + Args -> pset(arguments, amqp_table(Args), Obj) + end. + + +parse_bool(<<"true">>) -> true; +parse_bool(<<"false">>) -> false; +parse_bool(true) -> true; +parse_bool(false) -> false; +parse_bool(undefined) -> undefined; +parse_bool(V) -> throw({error, {not_boolean, V}}). + +args_hash(Args) -> + list_to_binary(rabbit_misc:base64url(<<(erlang:phash2(Args, 1 bsl 32)):32>>)). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl new file mode 100644 index 0000000000..99ddc89a8e --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl @@ -0,0 +1,230 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_gc). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-record(state, {timer, + interval + }). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + Interval = rabbit_misc:get_env(rabbitmq_management_agent, metrics_gc_interval, 120000), + {ok, start_timer(#state{interval = Interval})}. + +handle_call(test, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(start_gc, State) -> + gc_connections(), + gc_vhosts(), + gc_channels(), + gc_queues(), + gc_exchanges(), + gc_nodes(), + {noreply, start_timer(State)}. + +terminate(_Reason, #state{timer = TRef}) -> + _ = erlang:cancel_timer(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_timer(#state{interval = Interval} = St) -> + TRef = erlang:send_after(Interval, self(), start_gc), + St#state{timer = TRef}. + +gc_connections() -> + gc_process(connection_stats_coarse_conn_stats), + gc_process(connection_created_stats), + gc_process(connection_stats). + +gc_vhosts() -> + VHosts = rabbit_vhost:list(), + GbSet = gb_sets:from_list(VHosts), + gc_entity(vhost_stats_coarse_conn_stats, GbSet), + gc_entity(vhost_stats_fine_stats, GbSet), + gc_entity(vhost_msg_stats, GbSet), + gc_entity(vhost_msg_rates, GbSet), + gc_entity(vhost_stats_deliver_stats, GbSet). + +gc_channels() -> + gc_process(channel_created_stats), + gc_process(channel_stats), + gc_process(channel_stats_fine_stats), + gc_process(channel_process_stats), + gc_process(channel_stats_deliver_stats), + ok. + +gc_queues() -> + Queues = rabbit_amqqueue:list_names(), + GbSet = gb_sets:from_list(Queues), + LocalQueues = rabbit_amqqueue:list_local_names(), + LocalGbSet = gb_sets:from_list(LocalQueues), + gc_entity(queue_stats_publish, GbSet), + gc_entity(queue_stats, LocalGbSet), + gc_entity(queue_msg_stats, LocalGbSet), + gc_entity(queue_process_stats, LocalGbSet), + gc_entity(queue_msg_rates, LocalGbSet), + gc_entity(queue_stats_deliver_stats, GbSet), + gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet), + gc_process_and_entity(consumer_stats_queue_index, GbSet), + gc_process_and_entity(consumer_stats_channel_index, GbSet), + gc_process_and_entity(consumer_stats, GbSet), + gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet), + gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet), + gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet), + ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), + gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet), + gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet), + gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet). + +gc_exchanges() -> + Exchanges = rabbit_exchange:list_names(), + GbSet = gb_sets:from_list(Exchanges), + gc_entity(exchange_stats_publish_in, GbSet), + gc_entity(exchange_stats_publish_out, GbSet), + gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet), + gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet). + +gc_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + GbSet = gb_sets:from_list(Nodes), + gc_entity(node_stats, GbSet), + gc_entity(node_coarse_stats, GbSet), + gc_entity(node_persister_stats, GbSet), + gc_entity(node_node_coarse_stats_node_index, GbSet), + gc_entity(node_node_stats, GbSet), + gc_entity(node_node_coarse_stats, GbSet). + +gc_process(Table) -> + ets:foldl(fun({{Pid, _} = Key, _}, none) -> + gc_process(Pid, Table, Key); + ({Pid = Key, _}, none) -> + gc_process(Pid, Table, Key); + ({Pid = Key, _, _}, none) -> + gc_process(Pid, Table, Key); + ({{Pid, _} = Key, _, _, _, _}, none) -> + gc_process(Pid, Table, Key) + end, none, Table). + +gc_process(Pid, Table, Key) -> + case rabbit_misc:is_process_alive(Pid) of + true -> + none; + false -> + ets:delete(Table, Key), + none + end. + +gc_entity(Table, GbSet) -> + ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats -> + gc_entity(Id, Table, Key, GbSet); + ({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats -> + gc_entity(Id, Table, Key, GbSet); + ({{Id, _} = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({{Id, _} = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet) + end, none, Table). + +gc_entity(Id, Table, Key, GbSet) -> + case gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + ets:delete(Table, Key), + none + end. + +gc_process_and_entity(Table, GbSet) -> + ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet); + ({Id = Key, {_, Pid, _}} = Object, none) + when Table == consumer_stats_queue_index -> + gc_object(Pid, Table, Object), + gc_entity(Id, Table, Key, GbSet); + ({Pid = Key, {Id, _, _}} = Object, none) + when Table == consumer_stats_channel_index -> + gc_object(Id, Table, Object, GbSet), + gc_process(Pid, Table, Key); + ({Id = Key, {{Pid, _}, _}} = Object, none) + when Table == channel_exchange_stats_fine_stats_exchange_index; + Table == channel_queue_stats_deliver_stats_queue_index -> + gc_object(Pid, Table, Object), + gc_entity(Id, Table, Key, GbSet); + ({Pid = Key, {{_, Id}, _}} = Object, none) + when Table == channel_exchange_stats_fine_stats_channel_index; + Table == channel_queue_stats_deliver_stats_channel_index -> + gc_object(Id, Table, Object, GbSet), + gc_process(Pid, Table, Key); + ({{{Pid, Id}, _} = Key, _}, none) + when Table == channel_queue_stats_deliver_stats; + Table == channel_exchange_stats_fine_stats -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet); + ({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet); + ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet) + end, none, Table). + +gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> + case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + ets:delete(Table, Key), + none + end. + +gc_object(Pid, Table, Object) -> + case rabbit_misc:is_process_alive(Pid) of + true -> + none; + false -> + ets:delete_object(Table, Object), + none + end. + +gc_object(Id, Table, Object, GbSet) -> + case gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + ets:delete_object(Table, Object), + none + end. + +gc_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{{Q, X}, _} = Key, _}, none) + when Table == queue_exchange_stats_publish -> + gc_entity(Q, Table, Key, QueueGbSet), + gc_entity(X, Table, Key, ExchangeGbSet); + ({Q, {{_, X}, _}} = Object, none) + when Table == queue_exchange_stats_publish_queue_index -> + gc_object(X, Table, Object, ExchangeGbSet), + gc_entity(Q, Table, Q, QueueGbSet); + ({X, {{Q, _}, _}} = Object, none) + when Table == queue_exchange_stats_publish_exchange_index -> + gc_object(Q, Table, Object, QueueGbSet), + gc_entity(X, Table, X, ExchangeGbSet) + end, none, Table). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl new file mode 100644 index 0000000000..298f17a18d --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl @@ -0,0 +1,712 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_metrics_collector). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). +-include("rabbit_mgmt_metrics.hrl"). + +-behaviour(gen_server). +-compile({no_auto_import, [ceil/1]}). + +-spec start_link(atom()) -> rabbit_types:ok_pid_or_error(). + +-export([name/1]). +-export([start_link/1]). +-export([override_lookups/2, reset_lookups/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). +-export([index_table/2]). +-export([reset_all/0]). + +-import(rabbit_mgmt_data, [lookup_element/3]). + +-record(state, {table, interval, policies, rates_mode, lookup_queue, + lookup_exchange, old_aggr_stats}). + +%% Data is stored in ETS tables: +%% * One ETS table per metric (queue_stats, channel_stats_deliver_stats...) +%% (see ?TABLES in rabbit_mgmt_metrics.hrl) +%% * Stats are stored as key value pairs where the key is a tuple of +%% some value (such as a channel pid) and the retention interval. +%% The value is an instance of an exometer_slide providing a sliding window +%% of samples for some {Object, Interval}. +%% * Each slide can store multiple stats. See stats_per_table in +%% rabbit_mgmt_metrics.hrl for a map of which stats are recorded in which +%% table. + +reset_all() -> + [reset(Table) || {Table, _} <- ?CORE_TABLES]. + +reset(Table) -> + gen_server:cast(name(Table), reset). + +name(Table) -> + list_to_atom((atom_to_list(Table) ++ "_metrics_collector")). + + +start_link(Table) -> + gen_server:start_link({local, name(Table)}, ?MODULE, [Table], []). + +override_lookups(Table, Lookups) -> + gen_server:call(name(Table), {override_lookups, Lookups}, infinity). + +reset_lookups(Table) -> + gen_server:call(name(Table), reset_lookups, infinity). + +init([Table]) -> + {RatesMode, Policies} = load_config(), + Policy = retention_policy(Table), + Interval = take_smaller(proplists:get_value(Policy, Policies, [])) * 1000, + erlang:send_after(Interval, self(), collect_metrics), + {ok, #state{table = Table, interval = Interval, + policies = {proplists:get_value(basic, Policies), + proplists:get_value(detailed, Policies), + proplists:get_value(global, Policies)}, + rates_mode = RatesMode, + old_aggr_stats = #{}, + lookup_queue = fun queue_exists/1, + lookup_exchange = fun exchange_exists/1}}. + +handle_call(reset_lookups, _From, State) -> + {reply, ok, State#state{lookup_queue = fun queue_exists/1, + lookup_exchange = fun exchange_exists/1}}; +handle_call({override_lookups, Lookups}, _From, State) -> + {reply, ok, State#state{lookup_queue = pget(queue, Lookups), + lookup_exchange = pget(exchange, Lookups)}}; +handle_call({submit, Fun}, _From, State) -> + {reply, Fun(), State}; +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(reset, State) -> + {noreply, State#state{old_aggr_stats = #{}}}; +handle_cast(_Msg, State) -> + {noreply, State}. + + +handle_info(collect_metrics, #state{interval = Interval} = State0) -> + Timestamp = exometer_slide:timestamp(), + State = aggregate_metrics(Timestamp, State0), + erlang:send_after(Interval, self(), collect_metrics), + {noreply, State}; +handle_info(purge_old_stats, State) -> + {noreply, State#state{old_aggr_stats = #{}}}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +retention_policy(connection_created) -> basic; %% really nothing +retention_policy(connection_metrics) -> basic; +retention_policy(connection_coarse_metrics) -> basic; +retention_policy(channel_created) -> basic; +retention_policy(channel_metrics) -> basic; +retention_policy(channel_queue_exchange_metrics) -> detailed; +retention_policy(channel_exchange_metrics) -> detailed; +retention_policy(channel_queue_metrics) -> detailed; +retention_policy(channel_process_metrics) -> basic; +retention_policy(consumer_created) -> basic; +retention_policy(queue_metrics) -> basic; +retention_policy(queue_coarse_metrics) -> basic; +retention_policy(node_persister_metrics) -> global; +retention_policy(node_coarse_metrics) -> global; +retention_policy(node_metrics) -> basic; +retention_policy(node_node_metrics) -> global; +retention_policy(connection_churn_metrics) -> basic. + +take_smaller(Policies) -> + Intervals = [I || {_, I} <- Policies], + case Intervals of + [] -> throw(missing_sample_retention_policies); + _ -> lists:min(Intervals) + end. + +insert_old_aggr_stats(NextStats, Id, Stat) -> + NextStats#{Id => Stat}. + +handle_deleted_queues(queue_coarse_metrics, Remainders, + #state{policies = {BPolicies, _, GPolicies}}) -> + TS = exometer_slide:timestamp(), + lists:foreach(fun ({Queue, {R, U, M}}) -> + NegStats = ?vhost_msg_stats(-R, -U, -M), + [insert_entry(vhost_msg_stats, vhost(Queue), TS, + NegStats, Size, Interval, true) + || {Size, Interval} <- GPolicies], + % zero out msg stats to avoid duplicating msg + % stats when a master queue is migrated + QNegStats = ?queue_msg_stats(0, 0, 0), + [insert_entry(queue_msg_stats, Queue, TS, + QNegStats, Size, Interval, false) + || {Size, Interval} <- BPolicies], + ets:delete(queue_stats, Queue), + ets:delete(queue_process_stats, Queue) + end, maps:to_list(Remainders)); +handle_deleted_queues(_T, _R, _P) -> ok. + +aggregate_metrics(Timestamp, #state{table = Table, + policies = {_, _, _GPolicies}} = State0) -> + Table = State0#state.table, + {Next, Ops, #state{old_aggr_stats = Remainders}} = + ets:foldl(fun(R, {NextStats, O, State}) -> + aggregate_entry(R, NextStats, O, State) + end, {#{}, #{}, State0}, Table), + maps:fold(fun(Tbl, TblOps, Acc) -> + _ = exec_table_ops(Tbl, Timestamp, TblOps), + Acc + end, no_acc, Ops), + + handle_deleted_queues(Table, Remainders, State0), + State0#state{old_aggr_stats = Next}. + +exec_table_ops(Table, Timestamp, TableOps) -> + maps:fold(fun(_Id, {insert, Entry}, A) -> + ets:insert(Table, Entry), + A; + (Id, {insert_with_index, Entry}, A) -> + insert_with_index(Table, Id, Entry), + A; + ({Id, Size, Interval, Incremental}, + {insert_entry, Entry}, A) -> + insert_entry(Table, Id, Timestamp, Entry, Size, + Interval, Incremental), + A + end, no_acc, TableOps). + + +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = connection_created} = State) -> + case ets:lookup(connection_created_stats, Id) of + [] -> + Ftd = rabbit_mgmt_format:format( + Metrics, + {fun rabbit_mgmt_format:format_connection_created/1, true}), + Entry = ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd), + Ops = insert_op(connection_created_stats, Id, Entry, Ops0), + {NextStats, Ops, State}; + _ -> + {NextStats, Ops0, State} + end; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = connection_metrics} = State) -> + Entry = ?connection_stats(Id, Metrics), + Ops = insert_op(connection_stats, Id, Entry, Ops0), + {NextStats, Ops, State}; +aggregate_entry({Id, RecvOct, SendOct, Reductions, 0}, NextStats, Ops0, + #state{table = connection_coarse_metrics, + policies = {BPolicies, _, GPolicies}} = State) -> + Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct), + Diff = get_difference(Id, Stats, State), + + Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats, + vhost({connection_created, Id}), true, Diff, Ops0, + GPolicies), + + Entry = ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions), + Ops2 = insert_entry_ops(connection_stats_coarse_conn_stats, Id, false, Entry, + Ops1, BPolicies), + {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State}; +aggregate_entry({Id, RecvOct, SendOct, _Reductions, 1}, NextStats, Ops0, + #state{table = connection_coarse_metrics, + policies = {_BPolicies, _, GPolicies}} = State) -> + Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct), + Diff = get_difference(Id, Stats, State), + Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats, + vhost({connection_created, Id}), true, Diff, Ops0, + GPolicies), + rabbit_core_metrics:delete(connection_coarse_metrics, Id), + {NextStats, Ops1, State}; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = channel_created} = State) -> + case ets:lookup(channel_created_stats, Id) of + [] -> + Ftd = rabbit_mgmt_format:format(Metrics, {[], false}), + Entry = ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd), + Ops = insert_op(channel_created_stats, Id, Entry, Ops0), + {NextStats, Ops, State}; + _ -> + {NextStats, Ops0, State} + end; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = channel_metrics} = State) -> + %% First metric must be `idle_since` (if available), as expected by + %% `rabbit_mgmt_format:format_channel_stats`. This is a performance + %% optimisation that avoids traversing the whole list when only + %% one element has to be formatted. + Ftd = rabbit_mgmt_format:format_channel_stats(Metrics), + Entry = ?channel_stats(Id, Ftd), + Ops = insert_op(channel_stats, Id, Entry, Ops0), + {NextStats, Ops, State}; +aggregate_entry({{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutable, 0}, + NextStats, Ops0, + #state{table = channel_exchange_metrics, + policies = {BPolicies, DPolicies, GPolicies}, + rates_mode = RatesMode, + lookup_exchange = ExchangeFun} = State) -> + Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable, DropUnroutable), + {Publish, _, _, _} = Diff = get_difference(Id, Stats, State), + + Ops1 = insert_entry_ops(channel_stats_fine_stats, Ch, true, Diff, Ops0, + BPolicies), + Ops2 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops1, + GPolicies), + Ops3 = case {ExchangeFun(X), RatesMode} of + {true, basic} -> + Entry = ?exchange_stats_publish_in(Publish), + insert_entry_ops(exchange_stats_publish_in, X, true, Entry, + Ops2, DPolicies); + {true, _} -> + Entry = ?exchange_stats_publish_in(Publish), + O = insert_entry_ops(exchange_stats_publish_in, X, true, + Entry, Ops2, DPolicies), + insert_entry_ops(channel_exchange_stats_fine_stats, Id, + false, Stats, O, DPolicies); + _ -> + Ops2 + end, + {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State}; +aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutable, 1}, + NextStats, Ops0, + #state{table = channel_exchange_metrics, + policies = {_BPolicies, DPolicies, GPolicies}, + lookup_exchange = ExchangeFun} = State) -> + Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable, DropUnroutable), + {Publish, _, _, _} = Diff = get_difference(Id, Stats, State), + Ops1 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops0, + GPolicies), + Ops2 = case ExchangeFun(X) of + true -> + Entry = ?exchange_stats_publish_in(Publish), + insert_entry_ops(exchange_stats_publish_in, X, true, Entry, + Ops1, DPolicies); + _ -> + Ops1 + end, + rabbit_core_metrics:delete(channel_exchange_metrics, Id), + {NextStats, Ops2, State}; +aggregate_entry({{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, + Redeliver, Ack, GetEmpty, 0}, NextStats, Ops0, + #state{table = channel_queue_metrics, + policies = {BPolicies, DPolicies, GPolicies}, + rates_mode = RatesMode, + lookup_queue = QueueFun} = State) -> + Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck, + Redeliver, Ack, + Deliver + DeliverNoAck + Get + GetNoAck, + GetEmpty), + Diff = get_difference(Id, Stats, State), + + Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff, + Ops0, GPolicies), + + Ops2 = insert_entry_ops(channel_stats_deliver_stats, Ch, true, Diff, Ops1, + BPolicies), + + Ops3 = case {QueueFun(Q), RatesMode} of + {true, basic} -> + insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff, + Ops2, BPolicies); + {true, _} -> + O = insert_entry_ops(queue_stats_deliver_stats, Q, true, + Diff, Ops2, BPolicies), + insert_entry_ops(channel_queue_stats_deliver_stats, Id, + false, Stats, O, DPolicies); + _ -> + Ops2 + end, + {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State}; +aggregate_entry({{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, + Redeliver, Ack, GetEmpty, 1}, NextStats, Ops0, + #state{table = channel_queue_metrics, + policies = {BPolicies, _, GPolicies}, + lookup_queue = QueueFun} = State) -> + Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck, + Redeliver, Ack, + Deliver + DeliverNoAck + Get + GetNoAck, + GetEmpty), + Diff = get_difference(Id, Stats, State), + + Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff, + Ops0, GPolicies), + Ops2 = case QueueFun(Q) of + true -> + insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff, + Ops1, BPolicies); + _ -> + Ops1 + end, + rabbit_core_metrics:delete(channel_queue_metrics, Id), + {NextStats, Ops2, State}; +aggregate_entry({{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, Ops0, + #state{table = channel_queue_exchange_metrics, + policies = {BPolicies, _, _}, + rates_mode = RatesMode, + lookup_queue = QueueFun, + lookup_exchange = ExchangeFun} = State) -> + Stats = ?queue_stats_publish(Publish), + Diff = get_difference(Id, Stats, State), + Ops1 = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of + {true, false, _, _} -> + insert_entry_ops(queue_stats_publish, Q, true, Diff, + Ops0, BPolicies); + {false, true, _, _} -> + insert_entry_ops(exchange_stats_publish_out, X, true, Diff, + Ops0, BPolicies); + {true, true, basic, _} -> + O = insert_entry_ops(queue_stats_publish, Q, true, Diff, + Ops0, BPolicies), + insert_entry_ops(exchange_stats_publish_out, X, true, Diff, + O, BPolicies); + {true, true, _, 0} -> + O1 = insert_entry_ops(queue_stats_publish, Q, true, Diff, + Ops0, BPolicies), + O2 = insert_entry_ops(exchange_stats_publish_out, X, true, + Diff, O1, BPolicies), + insert_entry_ops(queue_exchange_stats_publish, {Q, X}, + true, Diff, O2, BPolicies); + {true, true, _, 1} -> + O = insert_entry_ops(queue_stats_publish, Q, true, Diff, + Ops0, BPolicies), + insert_entry_ops(exchange_stats_publish_out, X, true, + Diff, O, BPolicies); + _ -> + Ops0 + end, + case ToDelete of + 0 -> + {insert_old_aggr_stats(NextStats, Id, Stats), Ops1, State}; + 1 -> + rabbit_core_metrics:delete(channel_queue_exchange_metrics, Id), + {NextStats, Ops1, State} + end; +aggregate_entry({Id, Reductions}, NextStats, Ops0, + #state{table = channel_process_metrics, + policies = {BPolicies, _, _}} = State) -> + Entry = ?channel_process_stats(Reductions), + Ops = insert_entry_ops(channel_process_stats, Id, false, + Entry, Ops0, BPolicies), + {NextStats, Ops, State}; +aggregate_entry({Id, Exclusive, AckRequired, PrefetchCount, + Active, ActivityStatus, Args}, + NextStats, Ops0, + #state{table = consumer_created} = State) -> + case ets:lookup(consumer_stats, Id) of + [] -> + Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, + {ack_required, AckRequired}, + {prefetch_count, PrefetchCount}, + {active, Active}, + {activity_status, ActivityStatus}, + {arguments, Args}], {[], false}), + Entry = ?consumer_stats(Id, Fmt), + Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0), + {NextStats, Ops , State}; + [{_K, V}] -> + CurrentActive = proplists:get_value(active, V, undefined), + case Active =:= CurrentActive of + false -> + Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, + {ack_required, AckRequired}, + {prefetch_count, PrefetchCount}, + {active, Active}, + {activity_status, ActivityStatus}, + {arguments, Args}], {[], false}), + Entry = ?consumer_stats(Id, Fmt), + Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0), + {NextStats, Ops , State}; + _ -> + {NextStats, Ops0, State} + end; + _ -> + {NextStats, Ops0, State} + end; +aggregate_entry({Id, Metrics, 0}, NextStats, Ops0, + #state{table = queue_metrics, + policies = {BPolicies, _, GPolicies}, + lookup_queue = QueueFun} = State) -> + Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), + pget(disk_writes, Metrics, 0)), + Diff = get_difference(Id, Stats, State), + Ops1 = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0, + GPolicies), + Ops2 = case QueueFun(Id) of + true -> + O = insert_entry_ops(queue_msg_rates, Id, false, Stats, Ops1, + BPolicies), + Fmt = rabbit_mgmt_format:format( + Metrics, + {fun rabbit_mgmt_format:format_queue_stats/1, false}), + insert_op(queue_stats, Id, ?queue_stats(Id, Fmt), O); + false -> + Ops1 + end, + {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State}; +aggregate_entry({Id, Metrics, 1}, NextStats, Ops0, + #state{table = queue_metrics, + policies = {_, _, GPolicies}} = State) -> + Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), + pget(disk_writes, Metrics, 0)), + Diff = get_difference(Id, Stats, State), + Ops = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0, + GPolicies), + rabbit_core_metrics:delete(queue_metrics, Id), + {NextStats, Ops, State}; +aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0, + #state{table = queue_coarse_metrics, + old_aggr_stats = Old, + policies = {BPolicies, _, GPolicies}, + lookup_queue = QueueFun} = State) -> + Stats = ?vhost_msg_stats(Ready, Unack, Msgs), + Diff = get_difference(Name, Stats, State), + Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0, + GPolicies), + Ops2 = case QueueFun(Name) of + true -> + QPS =?queue_process_stats(Red), + O1 = insert_entry_ops(queue_process_stats, Name, false, QPS, + Ops1, BPolicies), + QMS = ?queue_msg_stats(Ready, Unack, Msgs), + insert_entry_ops(queue_msg_stats, Name, false, QMS, + O1, BPolicies); + _ -> + Ops1 + end, + State1 = State#state{old_aggr_stats = maps:remove(Name, Old)}, + {insert_old_aggr_stats(NextStats, Name, Stats), Ops2, State1}; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = node_metrics} = State) -> + Ops = insert_op(node_stats, Id, {Id, Metrics}, Ops0), + {NextStats, Ops, State}; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = node_coarse_metrics, + policies = {_, _, GPolicies}} = State) -> + Stats = ?node_coarse_stats( + pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0), + pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0), + pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0), + pget(gc_bytes_reclaimed, Metrics, 0), + pget(context_switches, Metrics, 0)), + Ops = insert_entry_ops(node_coarse_stats, Id, false, Stats, Ops0, + GPolicies), + {NextStats, Ops, State}; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = node_persister_metrics, + policies = {_, _, GPolicies}} = State) -> + Stats = ?node_persister_stats( + pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0), + pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0), + pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0), + pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0), + pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0), + pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0), + pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0), + pget(msg_store_write_count, Metrics, 0), + pget(queue_index_journal_write_count, Metrics, 0), + pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0), + pget(io_file_handle_open_attempt_count, Metrics, 0), + pget(io_file_handle_open_attempt_time, Metrics, 0)), + Ops = insert_entry_ops(node_persister_stats, Id, false, Stats, Ops0, + GPolicies), + {NextStats, Ops, State}; +aggregate_entry({Id, Metrics}, NextStats, Ops0, + #state{table = node_node_metrics, + policies = {_, _, GPolicies}} = State) -> + Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), + pget(recv_bytes, Metrics, 0)), + CleanMetrics = lists:keydelete(recv_bytes, 1, + lists:keydelete(send_bytes, 1, Metrics)), + Ops1 = insert_op(node_node_stats, Id, ?node_node_stats(Id, CleanMetrics), + Ops0), + Ops = insert_entry_ops(node_node_coarse_stats, Id, false, Stats, Ops1, + GPolicies), + {NextStats, Ops, State}; +aggregate_entry({Id, ConnCreated, ConnClosed, ChCreated, ChClosed, + QueueDeclared, QueueCreated, QueueDeleted}, NextStats, Ops0, + #state{table = connection_churn_metrics, + policies = {_, _, GPolicies}} = State) -> + %% Id is the local node. There is only one entry on every ETS table. + Stats = ?connection_churn_rates(ConnCreated, ConnClosed, ChCreated, ChClosed, + QueueDeclared, QueueCreated, QueueDeleted), + Diff = get_difference(Id, Stats, State), + Ops = insert_entry_ops(connection_churn_rates, Id, true, Diff, Ops0, + GPolicies), + {insert_old_aggr_stats(NextStats, Id, Stats), Ops, State}. + +insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) -> + Key = {Id, Interval0}, + Slide = + case ets:lookup(Table, Key) of + [{Key, S}] -> + S; + [] -> + IntervalMs = Interval0 * 1000, + % add some margin to Size and max_n to reduce chances of off-by-one errors + exometer_slide:new(TS - IntervalMs, (Size + Interval0) * 1000, + [{interval, IntervalMs}, + {max_n, ceil(Size / Interval0) + 1}, + {incremental, Incremental}]) + end, + insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry, + Slide)}). + +update_op(Table, Key, Op, Ops) -> + TableOps = case maps:find(Table, Ops) of + {ok, Inner} -> + maps:put(Key, Op, Inner); + error -> + Inner = #{}, + maps:put(Key, Op, Inner) + end, + maps:put(Table, TableOps, Ops). + +insert_with_index_op(Table, Key, Entry, Ops) -> + update_op(Table, Key, {insert_with_index, Entry}, Ops). + +insert_op(Table, Key, Entry, Ops) -> + update_op(Table, Key, {insert, Entry}, Ops). + +insert_entry_op(Table, Key, Entry, Ops) -> + TableOps0 = case maps:find(Table, Ops) of + {ok, Inner} -> Inner; + error -> #{} + end, + TableOps = maps:update_with(Key, fun({insert_entry, Entry0}) -> + {insert_entry, sum_entry(Entry0, Entry)} + end, {insert_entry, Entry}, TableOps0), + maps:put(Table, TableOps, Ops). + +insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) -> + lists:foldl(fun({Size, Interval}, Acc) -> + Key = {Id, Size, Interval, Incr}, + insert_entry_op(Table, Key, Entry, Acc) + end, Ops, Policies). + +get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) -> + case maps:find(Id, OldStats) of + error -> + Stats; + {ok, OldStat} -> + difference(OldStat, Stats) + end. + +sum_entry({A0}, {B0}) -> + {B0 + A0}; +sum_entry({A0, A1}, {B0, B1}) -> + {B0 + A0, B1 + A1}; +sum_entry({A0, A1, A2}, {B0, B1, B2}) -> + {B0 + A0, B1 + A1, B2 + A2}; +sum_entry({A0, A1, A2, A3}, {B0, B1, B2, B3}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3}; +sum_entry({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4}; +sum_entry({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5}; +sum_entry({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6}; +sum_entry({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) -> + {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7}. + +difference({A0}, {B0}) -> + {B0 - A0}; +difference({A0, A1}, {B0, B1}) -> + {B0 - A0, B1 - A1}; +difference({A0, A1, A2}, {B0, B1, B2}) -> + {B0 - A0, B1 - A1, B2 - A2}; +difference({A0, A1, A2, A3}, {B0, B1, B2, B3}) -> + {B0 - A0, B1 - A1, B2 - A2, B3 - A3}; +difference({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) -> + {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4}; +difference({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) -> + {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5}; +difference({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) -> + {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5, B6 - A6}; +difference({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) -> + {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5, B6 - A6, B7 - A7}. + +vhost(#resource{virtual_host = VHost}) -> + VHost; +vhost({queue_stats, #resource{virtual_host = VHost}}) -> + VHost; +vhost({TName, Pid}) -> + pget(vhost, lookup_element(TName, Pid, 2)). + +exchange_exists(Name) -> + case rabbit_exchange:lookup(Name) of + {ok, _} -> + true; + _ -> + false + end. + +queue_exists(Name) -> + case rabbit_amqqueue:lookup(Name) of + {ok, _} -> + true; + _ -> + false + end. + +insert_with_index(Table, Key, Tuple) -> + Insert = ets:insert(Table, Tuple), + insert_index(Table, Key), + Insert. + +insert_index(consumer_stats, {Q, Ch, _} = Key) -> + ets:insert(index_table(consumer_stats, queue), {Q, Key}), + ets:insert(index_table(consumer_stats, channel), {Ch, Key}); +insert_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) -> + ets:insert(index_table(channel_exchange_stats_fine_stats, exchange), {Ex, Key}), + ets:insert(index_table(channel_exchange_stats_fine_stats, channel), {Ch, Key}); +insert_index(channel_queue_stats_deliver_stats, {{Ch, Q}, _} = Key) -> + ets:insert(index_table(channel_queue_stats_deliver_stats, queue), {Q, Key}), + ets:insert(index_table(channel_queue_stats_deliver_stats, channel), {Ch, Key}); +insert_index(queue_exchange_stats_publish, {{Q, Ex}, _} = Key) -> + ets:insert(index_table(queue_exchange_stats_publish, queue), {Q, Key}), + ets:insert(index_table(queue_exchange_stats_publish, exchange), {Ex, Key}); +insert_index(node_node_coarse_stats, {{_, Node}, _} = Key) -> + ets:insert(index_table(node_node_coarse_stats, node), {Node, Key}); +insert_index(_, _) -> ok. + +index_table(consumer_stats, queue) -> consumer_stats_queue_index; +index_table(consumer_stats, channel) -> consumer_stats_channel_index; +index_table(channel_exchange_stats_fine_stats, exchange) -> channel_exchange_stats_fine_stats_exchange_index; +index_table(channel_exchange_stats_fine_stats, channel) -> channel_exchange_stats_fine_stats_channel_index; +index_table(channel_queue_stats_deliver_stats, queue) -> channel_queue_stats_deliver_stats_queue_index; +index_table(channel_queue_stats_deliver_stats, channel) -> channel_queue_stats_deliver_stats_channel_index; +index_table(queue_exchange_stats_publish, queue) -> queue_exchange_stats_publish_queue_index; +index_table(queue_exchange_stats_publish, exchange) -> queue_exchange_stats_publish_exchange_index; +index_table(node_node_coarse_stats, node) -> node_node_coarse_stats_node_index. + +load_config() -> + RatesMode = rabbit_mgmt_agent_config:get_env(rates_mode), + Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies, []), + {RatesMode, Policies}. + +ceil(X) when X < 0 -> + trunc(X); +ceil(X) -> + T = trunc(X), + case X - T == 0 of + true -> T; + false -> T + 1 + end. + +pget(Key, List) -> pget(Key, List, unknown). + +pget(Key, List, Default) when is_number(Default) -> + case rabbit_misc:pget(Key, List) of + Number when is_number(Number) -> + Number; + _Other -> + Default + end; +pget(Key, List, Default) -> + rabbit_misc:pget(Key, List, Default). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl new file mode 100644 index 0000000000..f1ae48e0e4 --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl @@ -0,0 +1,175 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_metrics_gc). + +-record(state, {basic_i, + detailed_i, + global_i}). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-spec start_link(atom()) -> rabbit_types:ok_pid_or_error(). + +-export([name/1]). +-export([start_link/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +name(EventType) -> + list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")). + +start_link(EventType) -> + gen_server:start_link({local, name(EventType)}, ?MODULE, [], []). + +init(_) -> + Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies), + {ok, #state{basic_i = intervals(basic, Policies), + global_i = intervals(global, Policies), + detailed_i = intervals(detailed, Policies)}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast({event, #event{type = connection_closed, props = Props}}, + State = #state{basic_i = BIntervals}) -> + Pid = pget(pid, Props), + remove_connection(Pid, BIntervals), + {noreply, State}; +handle_cast({event, #event{type = channel_closed, props = Props}}, + State = #state{basic_i = BIntervals}) -> + Pid = pget(pid, Props), + remove_channel(Pid, BIntervals), + {noreply, State}; +handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) -> + remove_consumer(Props), + {noreply, State}; +handle_cast({event, #event{type = exchange_deleted, props = Props}}, + State = #state{basic_i = BIntervals}) -> + Name = pget(name, Props), + remove_exchange(Name, BIntervals), + {noreply, State}; +handle_cast({event, #event{type = queue_deleted, props = Props}}, + State = #state{basic_i = BIntervals}) -> + Name = pget(name, Props), + remove_queue(Name, BIntervals), + {noreply, State}; +handle_cast({event, #event{type = vhost_deleted, props = Props}}, + State = #state{global_i = GIntervals}) -> + Name = pget(name, Props), + remove_vhost(Name, GIntervals), + {noreply, State}; +handle_cast({event, #event{type = node_node_deleted, props = Props}}, State) -> + Name = pget(route, Props), + remove_node_node(Name), + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +remove_connection(Id, BIntervals) -> + ets:delete(connection_created_stats, Id), + ets:delete(connection_stats, Id), + delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals), + ok. + +remove_channel(Id, BIntervals) -> + ets:delete(channel_created_stats, Id), + ets:delete(channel_stats, Id), + delete_samples(channel_process_stats, Id, BIntervals), + delete_samples(channel_stats_fine_stats, Id, BIntervals), + delete_samples(channel_stats_deliver_stats, Id, BIntervals), + index_delete(consumer_stats, channel, Id), + index_delete(channel_exchange_stats_fine_stats, channel, Id), + index_delete(channel_queue_stats_deliver_stats, channel, Id), + ok. + +remove_consumer(Props) -> + Id = {pget(queue, Props), pget(channel, Props), pget(consumer_tag, Props)}, + ets:delete(consumer_stats, Id), + cleanup_index(consumer_stats, Id), + ok. + +remove_exchange(Name, BIntervals) -> + delete_samples(exchange_stats_publish_out, Name, BIntervals), + delete_samples(exchange_stats_publish_in, Name, BIntervals), + index_delete(queue_exchange_stats_publish, exchange, Name), + index_delete(channel_exchange_stats_fine_stats, exchange, Name), + ok. + +remove_queue(Name, BIntervals) -> + ets:delete(queue_stats, Name), + delete_samples(queue_stats_publish, Name, BIntervals), + delete_samples(queue_stats_deliver_stats, Name, BIntervals), + delete_samples(queue_process_stats, Name, BIntervals), + delete_samples(queue_msg_stats, Name, BIntervals), + delete_samples(queue_msg_rates, Name, BIntervals), + index_delete(channel_queue_stats_deliver_stats, queue, Name), + index_delete(queue_exchange_stats_publish, queue, Name), + index_delete(consumer_stats, queue, Name), + + ok. + +remove_vhost(Name, GIntervals) -> + delete_samples(vhost_stats_coarse_conn_stats, Name, GIntervals), + delete_samples(vhost_stats_fine_stats, Name, GIntervals), + delete_samples(vhost_stats_deliver_stats, Name, GIntervals), + ok. + +remove_node_node(Name) -> + index_delete(node_node_coarse_stats, node, Name), + ok. + +intervals(Type, Policies) -> + [I || {_, I} <- proplists:get_value(Type, Policies)]. + +delete_samples(Table, Id, Intervals) -> + [ets:delete(Table, {Id, I}) || I <- Intervals], + ok. + +index_delete(Table, Type, Id) -> + IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type), + Keys = ets:lookup(IndexTable, Id), + [ begin + ets:delete(Table, Key), + cleanup_index(Table, Key) + end + || {_Index, Key} <- Keys ], + ets:delete(IndexTable, Id), + ok. + +cleanup_index(consumer_stats, {Q, Ch, _} = Key) -> + delete_index(consumer_stats, queue, {Q, Key}), + delete_index(consumer_stats, channel, {Ch, Key}), + ok; +cleanup_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) -> + delete_index(channel_exchange_stats_fine_stats, exchange, {Ex, Key}), + delete_index(channel_exchange_stats_fine_stats, channel, {Ch, Key}), + ok; +cleanup_index(channel_queue_stats_deliver_stats, {{Ch, Q}, _} = Key) -> + delete_index(channel_queue_stats_deliver_stats, queue, {Q, Key}), + delete_index(channel_queue_stats_deliver_stats, channel, {Ch, Key}), + ok; +cleanup_index(queue_exchange_stats_publish, {{Q, Ex}, _} = Key) -> + delete_index(queue_exchange_stats_publish, queue, {Q, Key}), + delete_index(queue_exchange_stats_publish, exchange, {Ex, Key}), + ok; +cleanup_index(node_node_coarse_stats, {{_, Node}, _} = Key) -> + delete_index(node_node_coarse_stats, node, {Node, Key}), + ok; +cleanup_index(_, _) -> ok. + +delete_index(Table, Index, Obj) -> + ets:delete_object(rabbit_mgmt_metrics_collector:index_table(Table, Index), + Obj). + +pget(Key, List) -> rabbit_misc:pget(Key, List, unknown). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl new file mode 100644 index 0000000000..4c5c8c18ef --- /dev/null +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl @@ -0,0 +1,57 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_storage). +-behaviour(gen_server2). +-record(state, {}). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +-export([start_link/0]). +-export([reset/0, reset_all/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-include("rabbit_mgmt_metrics.hrl"). + +%% ETS owner +start_link() -> + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). + +reset() -> + rabbit_log:warning("Resetting RabbitMQ management storage"), + [ets:delete_all_objects(IndexTable) || IndexTable <- ?INDEX_TABLES], + [ets:delete_all_objects(Table) || {Table, _} <- ?TABLES], + _ = rabbit_mgmt_metrics_collector:reset_all(), + ok. + +reset_all() -> + _ = [rpc:call(Node, rabbit_mgmt_storage, reset, []) + || Node <- rabbit_nodes:all_running()], + ok. + +init(_) -> + _ = [ets:new(IndexTable, [public, bag, named_table]) + || IndexTable <- ?INDEX_TABLES], + _ = [ets:new(Table, [public, Type, named_table]) + || {Table, Type} <- ?TABLES], + _ = ets:new(rabbit_mgmt_db_cache, [public, set, named_table]), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. |