diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_vm.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_vm.erl | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_vm.erl b/deps/rabbit/src/rabbit_vm.erl new file mode 100644 index 0000000000..b014e090c5 --- /dev/null +++ b/deps/rabbit/src/rabbit_vm.erl @@ -0,0 +1,427 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_vm). + +-export([memory/0, binary/0, ets_tables_memory/1]). + +-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]). + +%%---------------------------------------------------------------------------- + +-spec memory() -> rabbit_types:infos(). + +memory() -> + All = interesting_sups(), + {Sums, _Other} = sum_processes( + lists:append(All), distinguishers(), [memory]), + + [Qs, QsSlave, Qqs, Ssqs, Srqs, SCoor, ConnsReader, ConnsWriter, ConnsChannel, + ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = + [aggregate(Names, Sums, memory, fun (X) -> X end) + || Names <- distinguished_interesting_sups()], + + MnesiaETS = mnesia_memory(), + MsgIndexETS = ets_memory(msg_stores()), + MetricsETS = ets_memory([rabbit_metrics]), + QuorumETS = ets_memory([ra_log_ets]), + MetricsProc = try + [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), + M + catch + error:badarg -> + 0 + end, + MgmtDbETS = ets_memory([rabbit_mgmt_storage]), + [{total, ErlangTotal}, + {processes, Processes}, + {ets, ETS}, + {atom, Atom}, + {binary, Bin}, + {code, Code}, + {system, System}] = + erlang:memory([total, processes, ets, atom, binary, code, system]), + + Strategy = vm_memory_monitor:get_memory_calculation_strategy(), + Allocated = recon_alloc:memory(allocated), + Rss = vm_memory_monitor:get_rss_memory(), + + AllocatedUnused = max(Allocated - ErlangTotal, 0), + OSReserved = max(Rss - Allocated, 0), + + OtherProc = Processes + - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther + - Qs - QsSlave - Qqs - Ssqs - Srqs - SCoor - MsgIndexProc - Plugins + - MgmtDbProc - MetricsProc, + + [ + %% Connections + {connection_readers, ConnsReader}, + {connection_writers, ConnsWriter}, + {connection_channels, ConnsChannel}, + {connection_other, ConnsOther}, + + %% Queues + {queue_procs, Qs}, + {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, + {stream_queue_procs, Ssqs}, + {stream_queue_replica_reader_procs, Srqs}, + {stream_queue_coordinator_procs, SCoor}, + + %% Processes + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] + + %% Metrics + {metrics, MetricsETS + MetricsProc}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, + + %% ETS + {mnesia, MnesiaETS}, + {quorum_ets, QuorumETS}, + {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS}, + + %% Messages (mostly, some binaries are not messages) + {binary, Bin}, + {msg_index, MsgIndexETS + MsgIndexProc}, + + %% System + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Bin - Code - Atom}, + {allocated_unused, AllocatedUnused}, + {reserved_unallocated, OSReserved}, + {strategy, Strategy}, + {total, [{erlang, ErlangTotal}, + {rss, Rss}, + {allocated, Allocated}]} + ]. +%% [1] - erlang:memory(processes) can be less than the sum of its +%% parts. Rather than display something nonsensical, just silence any +%% claims about negative memory. See +%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html + +-spec binary() -> rabbit_types:infos(). + +binary() -> + All = interesting_sups(), + {Sums, Rest} = + sum_processes( + lists:append(All), + fun (binary, Info, Acc) -> + lists:foldl(fun ({Ptr, Sz, _RefCnt}, Acc0) -> + sets:add_element({Ptr, Sz}, Acc0) + end, Acc, Info) + end, distinguishers(), [{binary, sets:new()}]), + [Other, Qs, QsSlave, Qqs, Ssqs, Srqs, Scoor, ConnsReader, ConnsWriter, + ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = + [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1) + || Names <- [[other] | distinguished_interesting_sups()]], + [{connection_readers, ConnsReader}, + {connection_writers, ConnsWriter}, + {connection_channels, ConnsChannel}, + {connection_other, ConnsOther}, + {queue_procs, Qs}, + {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, + {stream_queue_procs, Ssqs}, + {stream_queue_replica_reader_procs, Srqs}, + {stream_queue_coordinator_procs, Scoor}, + {plugins, Plugins}, + {mgmt_db, MgmtDbProc}, + {msg_index, MsgIndexProc}, + {other, Other}]. + +%%---------------------------------------------------------------------------- + +mnesia_memory() -> + case mnesia:system_info(is_running) of + yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) || + Tab <- mnesia:system_info(tables)]); + _ -> 0 + end. + +ets_memory(Owners) -> + lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]). + +-spec ets_tables_memory(Owners) -> rabbit_types:infos() + when Owners :: all | OwnerProcessName | [OwnerProcessName], + OwnerProcessName :: atom(). + +ets_tables_memory(all) -> + [{ets:info(T, name), bytes(ets:info(T, memory))} + || T <- ets:all(), + is_atom(T)]; +ets_tables_memory(OwnerName) when is_atom(OwnerName) -> + ets_tables_memory([OwnerName]); +ets_tables_memory(Owners) when is_list(Owners) -> + OwnerPids = lists:map(fun(O) when is_pid(O) -> O; + (O) when is_atom(O) -> whereis(O) + end, + Owners), + [{ets:info(T, name), bytes(ets:info(T, memory))} + || T <- ets:all(), + lists:member(ets:info(T, owner), OwnerPids)]. + +bytes(Words) -> try + Words * erlang:system_info(wordsize) + catch + _:_ -> 0 + end. + +interesting_sups() -> + [queue_sups(), quorum_sups(), stream_server_sups(), stream_reader_sups(), + conn_sups() | interesting_sups0()]. + +queue_sups() -> + all_vhosts_children(rabbit_amqqueue_sup_sup). + +quorum_sups() -> + %% TODO: in the future not all ra servers may be queues and we needs + %% some way to filter this + case whereis(ra_server_sup_sup) of + undefined -> + []; + _ -> + [Pid || {_, Pid, _, _} <- + supervisor:which_children(ra_server_sup_sup)] + end. + +stream_server_sups() -> [osiris_server_sup]. +stream_reader_sups() -> [osiris_replica_reader_sup]. + +msg_stores() -> + all_vhosts_children(msg_store_transient) + ++ + all_vhosts_children(msg_store_persistent). + +all_vhosts_children(Name) -> + case whereis(rabbit_vhost_sup_sup) of + undefined -> []; + Pid when is_pid(Pid) -> + lists:filtermap( + fun({_, VHostSupWrapper, _, _}) -> + case supervisor2:find_child(VHostSupWrapper, + rabbit_vhost_sup) of + [] -> false; + [VHostSup] -> + case supervisor2:find_child(VHostSup, Name) of + [QSup] -> {true, QSup}; + [] -> false + end + end + end, + supervisor:which_children(rabbit_vhost_sup_sup)) + end. + +interesting_sups0() -> + MsgIndexProcs = msg_stores(), + MgmtDbProcs = [rabbit_mgmt_sup_sup], + PluginProcs = plugin_sups(), + [MsgIndexProcs, MgmtDbProcs, PluginProcs]. + +conn_sups() -> + Ranches = lists:flatten(ranch_server_sups()), + [amqp_sup|Ranches]. + +ranch_server_sups() -> + try + ets:match(ranch_server, {{conns_sup, '_'}, '$1'}) + catch + %% Ranch ETS table doesn't exist yet + error:badarg -> [] + end. + +with(Sups, With) -> [{Sup, With} || Sup <- Sups]. + +distinguishers() -> with(queue_sups(), fun queue_type/1) ++ + with(conn_sups(), fun conn_type/1) ++ + with(quorum_sups(), fun ra_type/1). + +distinguished_interesting_sups() -> + [ + with(queue_sups(), master), + with(queue_sups(), slave), + with(quorum_sups(), quorum), + stream_server_sups(), + stream_reader_sups(), + with(quorum_sups(), stream), + with(conn_sups(), reader), + with(conn_sups(), writer), + with(conn_sups(), channel), + with(conn_sups(), other)] + ++ interesting_sups0(). + +plugin_sups() -> + lists:append([plugin_sup(App) || + {App, _, _} <- rabbit_misc:which_applications(), + is_plugin(atom_to_list(App))]). + +plugin_sup(App) -> + case application_controller:get_master(App) of + undefined -> []; + Master -> case application_master:get_child(Master) of + {Pid, _} when is_pid(Pid) -> [process_name(Pid)]; + Pid when is_pid(Pid) -> [process_name(Pid)]; + _ -> [] + end + end. + +process_name(Pid) -> + case process_info(Pid, registered_name) of + {registered_name, Name} -> Name; + _ -> Pid + end. + +is_plugin("rabbitmq_" ++ _) -> true; +is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS). + +aggregate(Names, Sums, Key, Fun) -> + lists:sum([extract(Name, Sums, Key, Fun) || Name <- Names]). + +extract(Name, Sums, Key, Fun) -> + case keyfind(Name, Sums) of + {value, Accs} -> Fun(keyfetch(Key, Accs)); + false -> 0 + end. + +sum_binary(Set) -> + sets:fold(fun({_Pt, Sz}, Acc) -> Acc + Sz end, 0, Set). + +queue_type(PDict) -> + case keyfind(process_name, PDict) of + {value, {rabbit_mirror_queue_slave, _}} -> slave; + _ -> master + end. + +conn_type(PDict) -> + case keyfind(process_name, PDict) of + {value, {rabbit_reader, _}} -> reader; + {value, {rabbit_writer, _}} -> writer; + {value, {rabbit_channel, _}} -> channel; + _ -> other + end. + +ra_type(PDict) -> + case keyfind('$rabbit_vm_category', PDict) of + {value, rabbit_stream_coordinator} -> stream; + _ -> quorum + end. + +%%---------------------------------------------------------------------------- + +%% NB: this code is non-rabbit specific. + +-type process() :: pid() | atom(). +-type info_key() :: atom(). +-type info_value() :: any(). +-type info_item() :: {info_key(), info_value()}. +-type accumulate() :: fun ((info_key(), info_value(), info_value()) -> + info_value()). +-type distinguisher() :: fun (([{term(), term()}]) -> atom()). +-type distinguishers() :: [{info_key(), distinguisher()}]. +-spec sum_processes([process()], distinguishers(), [info_key()]) -> + {[{process(), [info_item()]}], [info_item()]}. +-spec sum_processes([process()], accumulate(), distinguishers(), + [info_item()]) -> + {[{process(), [info_item()]}], [info_item()]}. + +sum_processes(Names, Distinguishers, Items) -> + sum_processes(Names, fun (_, X, Y) -> X + Y end, Distinguishers, + [{Item, 0} || Item <- Items]). + +%% summarize the process_info of all processes based on their +%% '$ancestor' hierarchy, recorded in their process dictionary. +%% +%% The function takes +%% +%% 1) a list of names/pids of processes that are accumulation points +%% in the hierarchy. +%% +%% 2) a function that aggregates individual info items -taking the +%% info item key, value and accumulated value as the input and +%% producing a new accumulated value. +%% +%% 3) a list of info item key / initial accumulator value pairs. +%% +%% The process_info of a process is accumulated at the nearest of its +%% ancestors that is mentioned in the first argument, or, if no such +%% ancestor exists or the ancestor information is absent, in a special +%% 'other' bucket. +%% +%% The result is a pair consisting of +%% +%% 1) a k/v list, containing for each of the accumulation names/pids a +%% list of info items, containing the accumulated data, and +%% +%% 2) the 'other' bucket - a list of info items containing the +%% accumulated data of all processes with no matching ancestors +%% +%% Note that this function operates on names as well as pids, but +%% these must match whatever is contained in the '$ancestor' process +%% dictionary entry. Generally that means for all registered processes +%% the name should be used. +sum_processes(Names, Fun, Distinguishers, Acc0) -> + Items = [Item || {Item, _Blank0} <- Acc0], + {NameAccs, OtherAcc} = + lists:foldl( + fun (Pid, Acc) -> + InfoItems = [registered_name, dictionary | Items], + case process_info(Pid, InfoItems) of + undefined -> + Acc; + [{registered_name, RegName}, {dictionary, D} | Vals] -> + %% see docs for process_info/2 for the + %% special handling of 'registered_name' + %% info items + Extra = case RegName of + [] -> []; + N -> [N] + end, + Name0 = find_ancestor(Extra, D, Names), + Name = case keyfind(Name0, Distinguishers) of + {value, DistFun} -> {Name0, DistFun(D)}; + false -> Name0 + end, + accumulate( + Name, Fun, orddict:from_list(Vals), Acc, Acc0) + end + end, {orddict:new(), Acc0}, processes()), + %% these conversions aren't strictly necessary; we do them simply + %% for the sake of encapsulating the representation. + {[{Name, orddict:to_list(Accs)} || + {Name, Accs} <- orddict:to_list(NameAccs)], + orddict:to_list(OtherAcc)}. + +find_ancestor(Extra, D, Names) -> + Ancestors = case keyfind('$ancestors', D) of + {value, Ancs} -> Ancs; + false -> [] + end, + case lists:splitwith(fun (A) -> not lists:member(A, Names) end, + Extra ++ Ancestors) of + {_, []} -> undefined; + {_, [Name | _]} -> Name + end. + +accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}, _Acc0) -> + {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)}; +accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}, Acc0) -> + F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end, + {case orddict:is_key(Name, NameAccs) of + true -> orddict:update(Name, F, NameAccs); + false -> orddict:store( Name, F(Acc0), NameAccs) + end, OtherAcc}. + +keyfetch(K, L) -> {value, {_, V}} = lists:keysearch(K, 1, L), + V. + +keyfind(K, L) -> case lists:keysearch(K, 1, L) of + {value, {_, V}} -> {value, V}; + false -> false + end. |