diff options
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 164 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 301 |
4 files changed, 475 insertions, 1 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f66f26c4c8..9e6bcc3940 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -194,6 +194,12 @@ [background_gc]}}, {enables, networking}]}). +-rabbit_boot_step({rabbit_core_metrics_gc, + [{description, "background core metrics garbage collection"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_core_metrics_gc]}}, + {enables, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl new file mode 100644 index 0000000000..e7f848bbc3 --- /dev/null +++ b/src/rabbit_core_metrics_gc.erl @@ -0,0 +1,164 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_core_metrics_gc). + +-record(state, {timer, + interval + }). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + Interval = rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 120000), + {ok, start_timer(#state{interval = Interval})}. + +handle_call(test, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(start_gc, State) -> + gc_connections(), + gc_channels(), + gc_queues(), + gc_exchanges(), + gc_nodes(), + gc_gen_server2(), + {noreply, start_timer(State)}. + +terminate(_Reason, #state{timer = TRef}) -> + erlang:cancel_timer(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_timer(#state{interval = Interval} = St) -> + TRef = erlang:send_after(Interval, self(), start_gc), + St#state{timer = TRef}. + +gc_connections() -> + gc_process(connection_created), + gc_process(connection_metrics), + gc_process(connection_coarse_metrics). + +gc_channels() -> + %% TODO channel stats + gc_process(channel_created), + gc_process(channel_metrics), + gc_process(channel_process_metrics), + ok. + +gc_queues() -> + Queues = rabbit_amqqueue:list_names(), + GbSet = gb_sets:from_list(Queues), + gc_entity(queue_metrics, GbSet), + gc_entity(queue_coarse_metrics, GbSet), + gc_process_and_entity(channel_queue_metrics, GbSet), + gc_process_and_entity(consumer_created, GbSet), + ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), + gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet). + +gc_exchanges() -> + Exchanges = rabbit_exchange:list_names(), + GbSet = gb_sets:from_list(Exchanges), + gc_process_and_entity(channel_exchange_metrics, GbSet). + +gc_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + GbSet = gb_sets:from_list(Nodes), + gc_entity(node_node_metrics, GbSet). + +gc_gen_server2() -> + gc_process(gen_server2_metrics). + +gc_process(Table) -> + ets:foldl(fun({Pid = Key, _}, none) -> + gc_process(Pid, Table, Key); + ({Pid = Key, _, _, _}, none) -> + gc_process(Pid, Table, Key) + end, none, Table). + +gc_process(Pid, Table, Key) -> + case erlang:is_process_alive(Pid) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_entity(Table, GbSet) -> + ets:foldl(fun({{_, Id} = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet) + end, none, Table). + +gc_entity(Id, Table, Key, GbSet) -> + case gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entity(Table, GbSet) -> + ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none) + when Table == channel_queue_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Pid, Id} = Key, _, _, _}, none) + when Table == channel_exchange_metrics -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{Id, Pid, _} = Key, _, _, _, _}, none) + when Table == consumer_created -> + gc_entity(Id, Table, Key, GbSet), + gc_process(Pid, Table, Key); + ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> + gc_process_and_entity(Id, Pid, Table, Key, GbSet) + end, none, Table). + +gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> + case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of + true -> + none; + false -> + %% TODO catch? + ets:delete(Table, Key), + none + end. + +gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) -> + gc_process(Pid, Table, Key), + gc_entity(Q, Table, Key, QueueGbSet), + gc_entity(X, Table, Key, ExchangeGbSet) + end, none, Table). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index aaea27f91a..7697621c1a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -23,7 +23,7 @@ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, - route/2, delete/2, validate_binding/2]). + route/2, delete/2, validate_binding/2, list_names/0]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). @@ -61,6 +61,7 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit(). -spec list() -> [rabbit_types:exchange()]. +-spec list_names() -> [rabbit_exchange:name()]. -spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()]. -spec lookup_scratch(name(), atom()) -> rabbit_types:ok(term()) | @@ -258,6 +259,8 @@ lookup_or_die(Name) -> list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). +list_names() -> mnesia:dirty_all_keys(rabbit_exchange). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl new file mode 100644 index 0000000000..b7311463d5 --- /dev/null +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -0,0 +1,301 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_core_metrics_gc_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], + [ queue_metrics, + connection_metrics, + channel_metrics, + node_metrics, + gen_server2_metrics + ] + } + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {core_metrics_gc_interval, 6000000}, + {collect_statistics_interval, 100}, + {collect_statistics, fine} + ]}). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps( + Config1, + [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase), + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +queue_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}), + amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>}, + #amqp_msg{payload = <<"hello">>}), + timer:sleep(150), + + Q = q(<<"myqueue">>), + + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, queue_stats, + [Q, infos]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, queue_stats, + [Q, 1, 1, 1, 1]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [queue_metrics, Q]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [queue_coarse_metrics, Q]), + + %% Trigger gc. When the gen_server:call returns, the gc has already finished. + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), + + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, + [queue_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, + [queue_coarse_metrics]), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [queue_metrics, Q]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [queue_coarse_metrics, Q]), + + amqp_channel:call(Ch, #'queue.delete'{queue = <<"queue_metrics">>}), + rabbit_ct_client_helpers:close_channel(Ch), + + ok. + +connection_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}), + amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>}, + #amqp_msg{payload = <<"hello">>}), + timer:sleep(200), + + DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []), + + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + connection_created, [DeadPid, infos]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + connection_stats, [DeadPid, infos]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + connection_stats, [DeadPid, 1, 1, 1]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_created, DeadPid]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_metrics, DeadPid]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_coarse_metrics, DeadPid]), + + %% Trigger gc. When the gen_server:call returns, the gc has already finished. + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_created, DeadPid]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_metrics, DeadPid]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [connection_coarse_metrics, DeadPid]), + + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [connection_created]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [connection_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [connection_coarse_metrics]), + + amqp_channel:call(Ch, #'queue.delete'{queue = <<"queue_metrics">>}), + rabbit_ct_client_helpers:close_channel(Ch), + + ok. + +channel_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}), + amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>}, + #amqp_msg{payload = <<"hello">>}), + {#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>, + no_ack=true}), + timer:sleep(150), + + DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []), + + Q = q(<<"myqueue">>), + X = x(<<"myexchange">>), + + + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_created, [DeadPid, infos]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_stats, [DeadPid, infos]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_stats, [reductions, DeadPid, 1]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_stats, [exchange_stats, publish, + {DeadPid, X}, 1]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_stats, [queue_stats, get, + {DeadPid, Q}, 1]), + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, + channel_stats, [queue_exchange_stats, publish, + {DeadPid, {Q, X}}, 1]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_created, DeadPid]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_metrics, DeadPid]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_process_metrics, DeadPid]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_exchange_metrics, {DeadPid, X}]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_queue_metrics, {DeadPid, Q}]), + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_queue_exchange_metrics, {DeadPid, {Q, X}}]), + + %% Trigger gc. When the gen_server:call returns, the gc has already finished. + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), + + + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_created]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_process_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_exchange_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_queue_metrics]), + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [channel_queue_exchange_metrics]), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_created, DeadPid]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_metrics, DeadPid]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_process_metrics, DeadPid]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_exchange_metrics, {DeadPid, X}]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_queue_metrics, {DeadPid, Q}]), + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [channel_queue_exchange_metrics, {DeadPid, {Q, X}}]), + + amqp_channel:call(Ch, #'queue.delete'{queue = <<"queue_metrics">>}), + rabbit_ct_client_helpers:close_channel(Ch), + + ok. + +node_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, node_node_stats, + [{node(), 'deer@localhost'}, infos]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [node_node_metrics, {node(), 'deer@localhost'}]), + + %% Trigger gc. When the gen_server:call returns, the gc has already finished. + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [node_node_metrics, {node(), 'deer@localhost'}]), + + ok. + +gen_server2_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []), + + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, gen_server2_stats, + [DeadPid, 1]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [gen_server2_metrics, DeadPid]), + + %% Trigger gc. When the gen_server:call returns, the gc has already finished. + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), + + [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [gen_server2_metrics]), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, + [gen_server2_metrics, DeadPid]), + + ok. + +dead_pid() -> + spawn(fun() -> ok end). + +q(Name) -> + #resource{ virtual_host = <<"/">>, + kind = queue, + name = Name }. + +x(Name) -> + #resource{ virtual_host = <<"/">>, + kind = exchange, + name = Name }. |
