summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_vm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_vm.erl')
-rw-r--r--deps/rabbit/src/rabbit_vm.erl427
1 files changed, 427 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_vm.erl b/deps/rabbit/src/rabbit_vm.erl
new file mode 100644
index 0000000000..b014e090c5
--- /dev/null
+++ b/deps/rabbit/src/rabbit_vm.erl
@@ -0,0 +1,427 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_vm).
+
+-export([memory/0, binary/0, ets_tables_memory/1]).
+
+-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
+
+%%----------------------------------------------------------------------------
+
+-spec memory() -> rabbit_types:infos().
+
+memory() ->
+ All = interesting_sups(),
+ {Sums, _Other} = sum_processes(
+ lists:append(All), distinguishers(), [memory]),
+
+ [Qs, QsSlave, Qqs, Ssqs, Srqs, SCoor, ConnsReader, ConnsWriter, ConnsChannel,
+ ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
+ [aggregate(Names, Sums, memory, fun (X) -> X end)
+ || Names <- distinguished_interesting_sups()],
+
+ MnesiaETS = mnesia_memory(),
+ MsgIndexETS = ets_memory(msg_stores()),
+ MetricsETS = ets_memory([rabbit_metrics]),
+ QuorumETS = ets_memory([ra_log_ets]),
+ MetricsProc = try
+ [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
+ M
+ catch
+ error:badarg ->
+ 0
+ end,
+ MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
+ [{total, ErlangTotal},
+ {processes, Processes},
+ {ets, ETS},
+ {atom, Atom},
+ {binary, Bin},
+ {code, Code},
+ {system, System}] =
+ erlang:memory([total, processes, ets, atom, binary, code, system]),
+
+ Strategy = vm_memory_monitor:get_memory_calculation_strategy(),
+ Allocated = recon_alloc:memory(allocated),
+ Rss = vm_memory_monitor:get_rss_memory(),
+
+ AllocatedUnused = max(Allocated - ErlangTotal, 0),
+ OSReserved = max(Rss - Allocated, 0),
+
+ OtherProc = Processes
+ - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
+ - Qs - QsSlave - Qqs - Ssqs - Srqs - SCoor - MsgIndexProc - Plugins
+ - MgmtDbProc - MetricsProc,
+
+ [
+ %% Connections
+ {connection_readers, ConnsReader},
+ {connection_writers, ConnsWriter},
+ {connection_channels, ConnsChannel},
+ {connection_other, ConnsOther},
+
+ %% Queues
+ {queue_procs, Qs},
+ {queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
+ {stream_queue_procs, Ssqs},
+ {stream_queue_replica_reader_procs, Srqs},
+ {stream_queue_coordinator_procs, SCoor},
+
+ %% Processes
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+
+ %% Metrics
+ {metrics, MetricsETS + MetricsProc},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+
+ %% ETS
+ {mnesia, MnesiaETS},
+ {quorum_ets, QuorumETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS},
+
+ %% Messages (mostly, some binaries are not messages)
+ {binary, Bin},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+
+ %% System
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Bin - Code - Atom},
+ {allocated_unused, AllocatedUnused},
+ {reserved_unallocated, OSReserved},
+ {strategy, Strategy},
+ {total, [{erlang, ErlangTotal},
+ {rss, Rss},
+ {allocated, Allocated}]}
+ ].
+%% [1] - erlang:memory(processes) can be less than the sum of its
+%% parts. Rather than display something nonsensical, just silence any
+%% claims about negative memory. See
+%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+
+-spec binary() -> rabbit_types:infos().
+
+binary() ->
+ All = interesting_sups(),
+ {Sums, Rest} =
+ sum_processes(
+ lists:append(All),
+ fun (binary, Info, Acc) ->
+ lists:foldl(fun ({Ptr, Sz, _RefCnt}, Acc0) ->
+ sets:add_element({Ptr, Sz}, Acc0)
+ end, Acc, Info)
+ end, distinguishers(), [{binary, sets:new()}]),
+ [Other, Qs, QsSlave, Qqs, Ssqs, Srqs, Scoor, ConnsReader, ConnsWriter,
+ ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
+ [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
+ || Names <- [[other] | distinguished_interesting_sups()]],
+ [{connection_readers, ConnsReader},
+ {connection_writers, ConnsWriter},
+ {connection_channels, ConnsChannel},
+ {connection_other, ConnsOther},
+ {queue_procs, Qs},
+ {queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
+ {stream_queue_procs, Ssqs},
+ {stream_queue_replica_reader_procs, Srqs},
+ {stream_queue_coordinator_procs, Scoor},
+ {plugins, Plugins},
+ {mgmt_db, MgmtDbProc},
+ {msg_index, MsgIndexProc},
+ {other, Other}].
+
+%%----------------------------------------------------------------------------
+
+mnesia_memory() ->
+ case mnesia:system_info(is_running) of
+ yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
+ Tab <- mnesia:system_info(tables)]);
+ _ -> 0
+ end.
+
+ets_memory(Owners) ->
+ lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]).
+
+-spec ets_tables_memory(Owners) -> rabbit_types:infos()
+ when Owners :: all | OwnerProcessName | [OwnerProcessName],
+ OwnerProcessName :: atom().
+
+ets_tables_memory(all) ->
+ [{ets:info(T, name), bytes(ets:info(T, memory))}
+ || T <- ets:all(),
+ is_atom(T)];
+ets_tables_memory(OwnerName) when is_atom(OwnerName) ->
+ ets_tables_memory([OwnerName]);
+ets_tables_memory(Owners) when is_list(Owners) ->
+ OwnerPids = lists:map(fun(O) when is_pid(O) -> O;
+ (O) when is_atom(O) -> whereis(O)
+ end,
+ Owners),
+ [{ets:info(T, name), bytes(ets:info(T, memory))}
+ || T <- ets:all(),
+ lists:member(ets:info(T, owner), OwnerPids)].
+
+bytes(Words) -> try
+ Words * erlang:system_info(wordsize)
+ catch
+ _:_ -> 0
+ end.
+
+interesting_sups() ->
+ [queue_sups(), quorum_sups(), stream_server_sups(), stream_reader_sups(),
+ conn_sups() | interesting_sups0()].
+
+queue_sups() ->
+ all_vhosts_children(rabbit_amqqueue_sup_sup).
+
+quorum_sups() ->
+ %% TODO: in the future not all ra servers may be queues and we needs
+ %% some way to filter this
+ case whereis(ra_server_sup_sup) of
+ undefined ->
+ [];
+ _ ->
+ [Pid || {_, Pid, _, _} <-
+ supervisor:which_children(ra_server_sup_sup)]
+ end.
+
+stream_server_sups() -> [osiris_server_sup].
+stream_reader_sups() -> [osiris_replica_reader_sup].
+
+msg_stores() ->
+ all_vhosts_children(msg_store_transient)
+ ++
+ all_vhosts_children(msg_store_persistent).
+
+all_vhosts_children(Name) ->
+ case whereis(rabbit_vhost_sup_sup) of
+ undefined -> [];
+ Pid when is_pid(Pid) ->
+ lists:filtermap(
+ fun({_, VHostSupWrapper, _, _}) ->
+ case supervisor2:find_child(VHostSupWrapper,
+ rabbit_vhost_sup) of
+ [] -> false;
+ [VHostSup] ->
+ case supervisor2:find_child(VHostSup, Name) of
+ [QSup] -> {true, QSup};
+ [] -> false
+ end
+ end
+ end,
+ supervisor:which_children(rabbit_vhost_sup_sup))
+ end.
+
+interesting_sups0() ->
+ MsgIndexProcs = msg_stores(),
+ MgmtDbProcs = [rabbit_mgmt_sup_sup],
+ PluginProcs = plugin_sups(),
+ [MsgIndexProcs, MgmtDbProcs, PluginProcs].
+
+conn_sups() ->
+ Ranches = lists:flatten(ranch_server_sups()),
+ [amqp_sup|Ranches].
+
+ranch_server_sups() ->
+ try
+ ets:match(ranch_server, {{conns_sup, '_'}, '$1'})
+ catch
+ %% Ranch ETS table doesn't exist yet
+ error:badarg -> []
+ end.
+
+with(Sups, With) -> [{Sup, With} || Sup <- Sups].
+
+distinguishers() -> with(queue_sups(), fun queue_type/1) ++
+ with(conn_sups(), fun conn_type/1) ++
+ with(quorum_sups(), fun ra_type/1).
+
+distinguished_interesting_sups() ->
+ [
+ with(queue_sups(), master),
+ with(queue_sups(), slave),
+ with(quorum_sups(), quorum),
+ stream_server_sups(),
+ stream_reader_sups(),
+ with(quorum_sups(), stream),
+ with(conn_sups(), reader),
+ with(conn_sups(), writer),
+ with(conn_sups(), channel),
+ with(conn_sups(), other)]
+ ++ interesting_sups0().
+
+plugin_sups() ->
+ lists:append([plugin_sup(App) ||
+ {App, _, _} <- rabbit_misc:which_applications(),
+ is_plugin(atom_to_list(App))]).
+
+plugin_sup(App) ->
+ case application_controller:get_master(App) of
+ undefined -> [];
+ Master -> case application_master:get_child(Master) of
+ {Pid, _} when is_pid(Pid) -> [process_name(Pid)];
+ Pid when is_pid(Pid) -> [process_name(Pid)];
+ _ -> []
+ end
+ end.
+
+process_name(Pid) ->
+ case process_info(Pid, registered_name) of
+ {registered_name, Name} -> Name;
+ _ -> Pid
+ end.
+
+is_plugin("rabbitmq_" ++ _) -> true;
+is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).
+
+aggregate(Names, Sums, Key, Fun) ->
+ lists:sum([extract(Name, Sums, Key, Fun) || Name <- Names]).
+
+extract(Name, Sums, Key, Fun) ->
+ case keyfind(Name, Sums) of
+ {value, Accs} -> Fun(keyfetch(Key, Accs));
+ false -> 0
+ end.
+
+sum_binary(Set) ->
+ sets:fold(fun({_Pt, Sz}, Acc) -> Acc + Sz end, 0, Set).
+
+queue_type(PDict) ->
+ case keyfind(process_name, PDict) of
+ {value, {rabbit_mirror_queue_slave, _}} -> slave;
+ _ -> master
+ end.
+
+conn_type(PDict) ->
+ case keyfind(process_name, PDict) of
+ {value, {rabbit_reader, _}} -> reader;
+ {value, {rabbit_writer, _}} -> writer;
+ {value, {rabbit_channel, _}} -> channel;
+ _ -> other
+ end.
+
+ra_type(PDict) ->
+ case keyfind('$rabbit_vm_category', PDict) of
+ {value, rabbit_stream_coordinator} -> stream;
+ _ -> quorum
+ end.
+
+%%----------------------------------------------------------------------------
+
+%% NB: this code is non-rabbit specific.
+
+-type process() :: pid() | atom().
+-type info_key() :: atom().
+-type info_value() :: any().
+-type info_item() :: {info_key(), info_value()}.
+-type accumulate() :: fun ((info_key(), info_value(), info_value()) ->
+ info_value()).
+-type distinguisher() :: fun (([{term(), term()}]) -> atom()).
+-type distinguishers() :: [{info_key(), distinguisher()}].
+-spec sum_processes([process()], distinguishers(), [info_key()]) ->
+ {[{process(), [info_item()]}], [info_item()]}.
+-spec sum_processes([process()], accumulate(), distinguishers(),
+ [info_item()]) ->
+ {[{process(), [info_item()]}], [info_item()]}.
+
+sum_processes(Names, Distinguishers, Items) ->
+ sum_processes(Names, fun (_, X, Y) -> X + Y end, Distinguishers,
+ [{Item, 0} || Item <- Items]).
+
+%% summarize the process_info of all processes based on their
+%% '$ancestor' hierarchy, recorded in their process dictionary.
+%%
+%% The function takes
+%%
+%% 1) a list of names/pids of processes that are accumulation points
+%% in the hierarchy.
+%%
+%% 2) a function that aggregates individual info items -taking the
+%% info item key, value and accumulated value as the input and
+%% producing a new accumulated value.
+%%
+%% 3) a list of info item key / initial accumulator value pairs.
+%%
+%% The process_info of a process is accumulated at the nearest of its
+%% ancestors that is mentioned in the first argument, or, if no such
+%% ancestor exists or the ancestor information is absent, in a special
+%% 'other' bucket.
+%%
+%% The result is a pair consisting of
+%%
+%% 1) a k/v list, containing for each of the accumulation names/pids a
+%% list of info items, containing the accumulated data, and
+%%
+%% 2) the 'other' bucket - a list of info items containing the
+%% accumulated data of all processes with no matching ancestors
+%%
+%% Note that this function operates on names as well as pids, but
+%% these must match whatever is contained in the '$ancestor' process
+%% dictionary entry. Generally that means for all registered processes
+%% the name should be used.
+sum_processes(Names, Fun, Distinguishers, Acc0) ->
+ Items = [Item || {Item, _Blank0} <- Acc0],
+ {NameAccs, OtherAcc} =
+ lists:foldl(
+ fun (Pid, Acc) ->
+ InfoItems = [registered_name, dictionary | Items],
+ case process_info(Pid, InfoItems) of
+ undefined ->
+ Acc;
+ [{registered_name, RegName}, {dictionary, D} | Vals] ->
+ %% see docs for process_info/2 for the
+ %% special handling of 'registered_name'
+ %% info items
+ Extra = case RegName of
+ [] -> [];
+ N -> [N]
+ end,
+ Name0 = find_ancestor(Extra, D, Names),
+ Name = case keyfind(Name0, Distinguishers) of
+ {value, DistFun} -> {Name0, DistFun(D)};
+ false -> Name0
+ end,
+ accumulate(
+ Name, Fun, orddict:from_list(Vals), Acc, Acc0)
+ end
+ end, {orddict:new(), Acc0}, processes()),
+ %% these conversions aren't strictly necessary; we do them simply
+ %% for the sake of encapsulating the representation.
+ {[{Name, orddict:to_list(Accs)} ||
+ {Name, Accs} <- orddict:to_list(NameAccs)],
+ orddict:to_list(OtherAcc)}.
+
+find_ancestor(Extra, D, Names) ->
+ Ancestors = case keyfind('$ancestors', D) of
+ {value, Ancs} -> Ancs;
+ false -> []
+ end,
+ case lists:splitwith(fun (A) -> not lists:member(A, Names) end,
+ Extra ++ Ancestors) of
+ {_, []} -> undefined;
+ {_, [Name | _]} -> Name
+ end.
+
+accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}, _Acc0) ->
+ {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)};
+accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}, Acc0) ->
+ F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end,
+ {case orddict:is_key(Name, NameAccs) of
+ true -> orddict:update(Name, F, NameAccs);
+ false -> orddict:store( Name, F(Acc0), NameAccs)
+ end, OtherAcc}.
+
+keyfetch(K, L) -> {value, {_, V}} = lists:keysearch(K, 1, L),
+ V.
+
+keyfind(K, L) -> case lists:keysearch(K, 1, L) of
+ {value, {_, V}} -> {value, V};
+ false -> false
+ end.