diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 164 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 |
3 files changed, 174 insertions, 1 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8498e54c09..1f8181dafd 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -213,6 +213,12 @@ [background_gc]}}, {enables, networking}]}). +-rabbit_boot_step({rabbit_core_metrics_gc, + [{description, "background core metrics garbage collection"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_core_metrics_gc]}}, + {enables, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl new file mode 100644 index 0000000000..e7f848bbc3 --- /dev/null +++ b/src/rabbit_core_metrics_gc.erl @@ -0,0 +1,164 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_core_metrics_gc). + +-record(state, {timer, + interval + }). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + Interval = rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 120000), + {ok, start_timer(#state{interval = Interval})}. + +handle_call(test, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(start_gc, State) -> + gc_connections(), + gc_channels(), + gc_queues(), + gc_exchanges(), + gc_nodes(), + gc_gen_server2(), + {noreply, start_timer(State)}. + +terminate(_Reason, #state{timer = TRef}) -> + erlang:cancel_timer(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_timer(#state{interval = Interval} = St) -> + TRef = erlang:send_after(Interval, self(), start_gc), + St#state{timer = TRef}. + +gc_connections() -> + gc_process(connection_created), + gc_process(connection_metrics), + gc_process(connection_coarse_metrics). + +gc_channels() -> + %% TODO channel stats + gc_process(channel_created), + gc_process(channel_metrics), + gc_process(channel_process_metrics), + ok. + +gc_queues() -> + Queues = rabbit_amqqueue:list_names(), + GbSet = gb_sets:from_list(Queues), + gc_entity(queue_metrics, GbSet), + gc_entity(queue_coarse_metrics, GbSet), + gc_process_and_entity(channel_queue_metrics, GbSet), + gc_process_and_entity(consumer_created, GbSet), + ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), + gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet). + +gc_exchanges() -> + Exchanges = rabbit_exchange:list_names(), + GbSet = gb_sets:from_list(Exchanges), + gc_process_and_entity(channel_exchange_metrics, GbSet). + +gc_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + GbSet = gb_sets:from_list(Nodes), + gc_entity(node_node_metrics, GbSet). + +gc_gen_server2() -> + gc_process(gen_server2_metrics). + +gc_process(Table) -> + ets:foldl(fun({Pid = Key, _}, none) -> + gc_process(Pid, Table, Key); + ({Pid = Key, _, _, _}, none) -> + gc_process(Pid, Table, Key) + end, none, Table). + +gc_process(Pid, Table, Key) -> + case erlang:is_process_alive(Pid) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_entity(Table, GbSet) -> + ets:foldl(fun({{_, Id} = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet) + end, none, Table). + +gc_entity(Id, Table, Key, GbSet) -> + case gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entity(Table, GbSet) -> + ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none) + when Table == channel_queue_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Pid, Id} = Key, _, _, _}, none) + when Table == channel_exchange_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Id, Pid, _} = Key, _, _, _, _}, none) + when Table == consumer_created -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet) + end, none, Table). + +gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> + case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) -> + gc_process(Pid, Table, Key), + gc_entity(Q, Table, Key, QueueGbSet), + gc_entity(X, Table, Key, ExchangeGbSet) + end, none, Table). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 85f061a17b..7a3f194e01 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -23,7 +23,7 @@ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, - route/2, delete/3, validate_binding/2]). +-export([list_names/0]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). @@ -61,6 +61,7 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit(). -spec list() -> [rabbit_types:exchange()]. +-spec list_names() -> [rabbit_exchange:name()]. -spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()]. -spec lookup_scratch(name(), atom()) -> rabbit_types:ok(term()) | @@ -261,6 +262,8 @@ lookup_or_die(Name) -> list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). +list_names() -> mnesia:dirty_all_keys(rabbit_exchange). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> |
