diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-07-10 21:43:10 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-07-10 21:43:10 +0300 |
| commit | 976d4a3406511daa674d85a9af4a2d3b43fda82f (patch) | |
| tree | 873813f3332df4c39cc91fd4c4337430d407219b | |
| parent | a451a4b1158682d5931d59f00dfdfc411eadaef4 (diff) | |
| parent | e5fea8d1735f7de245afc9132f71f94ef03b8b13 (diff) | |
| download | rabbitmq-server-git-976d4a3406511daa674d85a9af4a2d3b43fda82f.tar.gz | |
Merge branch 'stable'
Conflicts:
src/rabbit_amqqueue.erl
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 105 |
5 files changed, 91 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 81eb5edf6e..d6775a4347 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,8 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0]). +-export([list_down/1, count/1, list_names/0, + list_names/0, list_local_names/0]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/4, basic_consume/11, basic_cancel/5, notify_decorators/1]). @@ -592,6 +593,11 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_local_names() -> + [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), + State =/= crashed, + node() =:= node(QPid) ]. + list(VHostPath) -> list(VHostPath, rabbit_queue). %% Not dirty_match_object since that would not be transactional when used in a diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c52d329392..4e43104de2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -266,11 +266,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 3141fdc301..3321f2b5de 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -70,7 +70,7 @@ gc_channels() -> ok. gc_queues() -> - Queues = rabbit_amqqueue:list_names(), + Queues = rabbit_amqqueue:list_local_names(), GbSet = gb_sets:from_list(Queues), gc_entity(queue_metrics, GbSet), gc_entity(queue_coarse_metrics, GbSet), diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2994e8cbcf..fd64fd61b3 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_sync). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 0ef86717ac..d63c9ab2fb 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -24,7 +24,8 @@ all() -> [ - {group, non_parallel_tests} + {group, non_parallel_tests}, + {group, cluster_tests} ]. groups() -> @@ -37,7 +38,8 @@ groups() -> gen_server2_metrics, consumer_metrics ] - } + }, + {cluster_tests, [], [cluster_queue_metrics]} ]. %% ------------------------------------------------------------------- @@ -45,33 +47,27 @@ groups() -> %% ------------------------------------------------------------------- merge_app_env(Config) -> - rabbit_ct_helpers:merge_app_env(Config, - {rabbit, [ - {core_metrics_gc_interval, 6000000}, - {collect_statistics_interval, 100}, - {collect_statistics, fine} - ]}). - -init_per_suite(Config) -> + AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000}, + {collect_statistics_interval, 100}, + {collect_statistics, fine}]}, + rabbit_ct_helpers:merge_app_env(Config, AppEnv). + +init_per_group(cluster_tests, Config) -> + rabbit_ct_helpers:log_environment(), + Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()); +init_per_group(non_parallel_tests, Config) -> rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - rabbit_ct_helpers:run_setup_steps( - Config1, - [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()). - -end_per_suite(Config) -> + Conf = [{rmq_nodename_suffix, non_parallel_tests}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()). + +end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps( Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_helpers:run_steps(Config, @@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) -> Config, rabbit_ct_client_helpers:teardown_steps()). +setup_steps() -> + [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps(). + %% ------------------------------------------------------------------- -%% Testcases. +%% Single-node Testcases. %% ------------------------------------------------------------------- queue_metrics(Config) -> @@ -329,3 +328,59 @@ x(Name) -> #resource{ virtual_host = <<"/">>, kind = exchange, name = Name }. + +%% ------------------------------------------------------------------- +%% Cluster Testcases. +%% ------------------------------------------------------------------- + +cluster_queue_metrics(Config) -> + VHost = <<"/">>, + QueueName = <<"cluster_queue_metrics">>, + PolicyName = <<"ha-policy-1">>, + PolicyPattern = <<".*">>, + PolicyAppliesTo = <<"queues">>, + + Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Node0), + + Node0Name = rabbit_data_coercion:to_binary(Node0), + Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition0), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"hello">>}), + + % Update policy to point to other node + Node1Name = rabbit_data_coercion:to_binary(Node1), + Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition1), + + % Synchronize + Name = rabbit_misc:r(VHost, queue, QueueName), + [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, + ets, lookup, + [rabbit_queue, Name]), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, + sync_mirrors, [QPid]), + + timer:sleep(1500), + + % Check ETS table for data + % rabbit_core_metrics:queue_stats + [] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list, + [queue_coarse_metrics]), + + [{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets, + tab2list, + [queue_coarse_metrics]), + + amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}), + rabbit_ct_client_helpers:close_channel(Ch), + Config. |
