diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-07-10 21:38:49 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-10 21:38:49 +0300 |
| commit | e5fea8d1735f7de245afc9132f71f94ef03b8b13 (patch) | |
| tree | 8f360734ff670d2a68fe340fe001b750d4a54533 | |
| parent | 39c1bd19965dc31688413afad8cb13c454346a57 (diff) | |
| parent | 6a77b6dbebd49c0c4205d3debaa14eb8db0debb2 (diff) | |
| download | rabbitmq-server-git-e5fea8d1735f7de245afc9132f71f94ef03b8b13.tar.gz | |
Merge pull request #1278 from rabbitmq/rabbitmq-management-427
Only preserve stats for local queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 105 |
5 files changed, 90 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3eaf7613ee..fa7491104b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,7 +25,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, - info_all/5, info_local/1, list_names/0]). + info_all/5, info_local/1, list_names/0, list_local_names/0]). -export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]). @@ -571,6 +571,11 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_local_names() -> + [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), + State =/= crashed, + node() =:= node(QPid) ]. + list(VHostPath) -> list(VHostPath, rabbit_queue). %% Not dirty_match_object since that would not be transactional when used in a diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cfae4cbaf4..16a5a70e13 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -258,11 +258,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 3141fdc301..3321f2b5de 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -70,7 +70,7 @@ gc_channels() -> ok. gc_queues() -> - Queues = rabbit_amqqueue:list_names(), + Queues = rabbit_amqqueue:list_local_names(), GbSet = gb_sets:from_list(Queues), gc_entity(queue_metrics, GbSet), gc_entity(queue_coarse_metrics, GbSet), diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index c5acd852e7..9e6182488e 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_sync). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 0ef86717ac..d63c9ab2fb 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -24,7 +24,8 @@ all() -> [ - {group, non_parallel_tests} + {group, non_parallel_tests}, + {group, cluster_tests} ]. groups() -> @@ -37,7 +38,8 @@ groups() -> gen_server2_metrics, consumer_metrics ] - } + }, + {cluster_tests, [], [cluster_queue_metrics]} ]. %% ------------------------------------------------------------------- @@ -45,33 +47,27 @@ groups() -> %% ------------------------------------------------------------------- merge_app_env(Config) -> - rabbit_ct_helpers:merge_app_env(Config, - {rabbit, [ - {core_metrics_gc_interval, 6000000}, - {collect_statistics_interval, 100}, - {collect_statistics, fine} - ]}). - -init_per_suite(Config) -> + AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000}, + {collect_statistics_interval, 100}, + {collect_statistics, fine}]}, + rabbit_ct_helpers:merge_app_env(Config, AppEnv). + +init_per_group(cluster_tests, Config) -> + rabbit_ct_helpers:log_environment(), + Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()); +init_per_group(non_parallel_tests, Config) -> rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - rabbit_ct_helpers:run_setup_steps( - Config1, - [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()). - -end_per_suite(Config) -> + Conf = [{rmq_nodename_suffix, non_parallel_tests}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()). + +end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps( Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_helpers:run_steps(Config, @@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) -> Config, rabbit_ct_client_helpers:teardown_steps()). +setup_steps() -> + [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps(). + %% ------------------------------------------------------------------- -%% Testcases. +%% Single-node Testcases. %% ------------------------------------------------------------------- queue_metrics(Config) -> @@ -329,3 +328,59 @@ x(Name) -> #resource{ virtual_host = <<"/">>, kind = exchange, name = Name }. + +%% ------------------------------------------------------------------- +%% Cluster Testcases. +%% ------------------------------------------------------------------- + +cluster_queue_metrics(Config) -> + VHost = <<"/">>, + QueueName = <<"cluster_queue_metrics">>, + PolicyName = <<"ha-policy-1">>, + PolicyPattern = <<".*">>, + PolicyAppliesTo = <<"queues">>, + + Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Node0), + + Node0Name = rabbit_data_coercion:to_binary(Node0), + Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition0), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"hello">>}), + + % Update policy to point to other node + Node1Name = rabbit_data_coercion:to_binary(Node1), + Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition1), + + % Synchronize + Name = rabbit_misc:r(VHost, queue, QueueName), + [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, + ets, lookup, + [rabbit_queue, Name]), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, + sync_mirrors, [QPid]), + + timer:sleep(1500), + + % Check ETS table for data + % rabbit_core_metrics:queue_stats + [] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list, + [queue_coarse_metrics]), + + [{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets, + tab2list, + [queue_coarse_metrics]), + + amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}), + rabbit_ct_client_helpers:close_channel(Ch), + Config. |
