diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_osiris_metrics.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_osiris_metrics.erl | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl new file mode 100644 index 0000000000..7b2574c7e1 --- /dev/null +++ b/deps/rabbit/src/rabbit_osiris_metrics.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_osiris_metrics). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(TICK_TIMEOUT, 5000). +-define(SERVER, ?MODULE). + +-define(STATISTICS_KEYS, + [policy, + operator_policy, + effective_policy_definition, + state, + leader, + online, + members + ]). + +-record(state, {timeout :: non_neg_integer()}). + +%%---------------------------------------------------------------------------- +%% Starts the raw metrics storage and owns the ETS tables. +%%---------------------------------------------------------------------------- + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + Timeout = application:get_env(rabbit, stream_tick_interval, + ?TICK_TIMEOUT), + erlang:send_after(Timeout, self(), tick), + {ok, #state{timeout = Timeout}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(tick, #state{timeout = Timeout} = State) -> + Data = osiris_counters:overview(), + maps:map( + fun ({osiris_writer, QName}, #{offset := Offs, + first_offset := FstOffs}) -> + COffs = Offs + 1 - FstOffs, + rabbit_core_metrics:queue_stats(QName, COffs, 0, COffs, 0), + Infos = try + %% TODO complete stats! + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + rabbit_stream_queue:info(Q, ?STATISTICS_KEYS); + _ -> + [] + end + catch + _:_ -> + %% It's possible that the writer has died but + %% it's still on the amqqueue record, so the + %% `erlang:process_info/2` calls will return + %% `undefined` and crash with a badmatch. + %% At least for now, skipping the metrics might + %% be the best option. Otherwise this brings + %% down `rabbit_sup` and the whole `rabbit` app. + [] + end, + rabbit_core_metrics:queue_stats(QName, Infos), + rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, + {messages, COffs}, + {messages_ready, COffs}, + {messages_unacknowledged, 0}]), + ok; + (_, _V) -> + ok + end, Data), + erlang:send_after(Timeout, self(), tick), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. |