summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_osiris_metrics.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_osiris_metrics.erl')
-rw-r--r--deps/rabbit/src/rabbit_osiris_metrics.erl103
1 files changed, 103 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl
new file mode 100644
index 0000000000..7b2574c7e1
--- /dev/null
+++ b/deps/rabbit/src/rabbit_osiris_metrics.erl
@@ -0,0 +1,103 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_osiris_metrics).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-define(TICK_TIMEOUT, 5000).
+-define(SERVER, ?MODULE).
+
+-define(STATISTICS_KEYS,
+ [policy,
+ operator_policy,
+ effective_policy_definition,
+ state,
+ leader,
+ online,
+ members
+ ]).
+
+-record(state, {timeout :: non_neg_integer()}).
+
+%%----------------------------------------------------------------------------
+%% Starts the raw metrics storage and owns the ETS tables.
+%%----------------------------------------------------------------------------
+
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ Timeout = application:get_env(rabbit, stream_tick_interval,
+ ?TICK_TIMEOUT),
+ erlang:send_after(Timeout, self(), tick),
+ {ok, #state{timeout = Timeout}}.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(tick, #state{timeout = Timeout} = State) ->
+ Data = osiris_counters:overview(),
+ maps:map(
+ fun ({osiris_writer, QName}, #{offset := Offs,
+ first_offset := FstOffs}) ->
+ COffs = Offs + 1 - FstOffs,
+ rabbit_core_metrics:queue_stats(QName, COffs, 0, COffs, 0),
+ Infos = try
+ %% TODO complete stats!
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ rabbit_stream_queue:info(Q, ?STATISTICS_KEYS);
+ _ ->
+ []
+ end
+ catch
+ _:_ ->
+ %% It's possible that the writer has died but
+ %% it's still on the amqqueue record, so the
+ %% `erlang:process_info/2` calls will return
+ %% `undefined` and crash with a badmatch.
+ %% At least for now, skipping the metrics might
+ %% be the best option. Otherwise this brings
+ %% down `rabbit_sup` and the whole `rabbit` app.
+ []
+ end,
+ rabbit_core_metrics:queue_stats(QName, Infos),
+ rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
+ {messages, COffs},
+ {messages_ready, COffs},
+ {messages_unacknowledged, 0}]),
+ ok;
+ (_, _V) ->
+ ok
+ end, Data),
+ erlang:send_after(Timeout, self(), tick),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.