diff options
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 163 |
2 files changed, 169 insertions, 0 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f66f26c4c8..9e6bcc3940 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -194,6 +194,12 @@ [background_gc]}}, {enables, networking}]}). +-rabbit_boot_step({rabbit_core_metrics_gc, + [{description, "background core metrics garbage collection"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_core_metrics_gc]}}, + {enables, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl new file mode 100644 index 0000000000..7c5e370805 --- /dev/null +++ b/src/rabbit_core_metrics_gc.erl @@ -0,0 +1,163 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_core_metrics_gc). + +-record(state, {timer, + interval + }). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + Interval = rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 30000), + {ok, start_timer(#state{interval = Interval})}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(start_gc, State) -> + gc_connections(), + gc_channels(), + gc_queues(), + gc_exchanges(), + gc_nodes(), + {noreply, start_timer(State)}. + +terminate(_Reason, #state{timer = TRef}) -> + erlang:cancel_timer(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_timer(#state{interval = Interval} = St) -> + TRef = erlang:send_after(Interval, self(), start_gc), + St#state{timer = TRef}. + +gc_connections() -> + gc_process(connection_created), + gc_process(connection_metrics), + gc_process(connection_coarse_metrics). + +gc_channels() -> + %% TODO channel stats + gc_process(channel_created), + gc_process(channel_metrics), + gc_process(channel_process_metrics), + ok. + +gc_queues() -> + Queues = rabbit_amqqueue:list_names(), + GbSet = gb_sets:from_list(Queues), + gc_entity(queue_metrics, GbSet), + gc_entity(queue_coarse_metrics, GbSet), + gc_process_and_entity(channel_queue_metrics, GbSet), + gc_process_and_entity(consumer_created, GbSet), + ExchangeGbSet = gb_sets:from_list(rabbit_amqqueue:list_names()), + gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet). + +gc_exchanges() -> + Exchanges = rabbit_exchange:list_names(), + GbSet = gb_sets:from_list(Exchanges), + gc_process_and_entity(channel_exchange_metrics, GbSet). + +gc_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + GbSet = gb_sets:from_list(Nodes), + gc_entity(node_persister_metrics, GbSet), + gc_entity(node_coarse_metrics, GbSet), + gc_entity(node_metrics, GbSet), + gc_entity(node_node_metrics, GbSet). + +gc_process(Table) -> + ets:foldl(fun({Pid = Key, _}, none) -> + gc_process(Pid, Table, Key); + ({Pid = Key, _, _, _}, none) -> + gc_process(Pid, Table, Key) + end, none, Table). + +gc_process(Pid, Table, Key) -> + case erlang:is_process_alive(Pid) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_entity(Table, GbSet) -> + ets:foldl(fun({{_, Id} = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet) + end, none, Table). + +gc_entity(Id, Table, Key, GbSet) -> + case gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entity(Table, GbSet) -> + ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none) + when Table == channel_queue_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Pid, Id} = Key, _, _, _}, none) + when Table == channel_exchange_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Id, Pid, _} = Key, _, _, _, _}, none) + when Table == consumer_created -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet) + end, none, Table). + +gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> + case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) -> + gc_process(Pid, Table, Key), + gc_entity(Q, Table, Key, QueueGbSet), + gc_entity(X, Table, Key, ExchangeGbSet) + end, none, Table). |
