diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2017-09-23 22:37:07 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2017-09-23 22:37:07 +0300 |
| commit | 065c849786dfa916f3b1d06513636398d06ecb47 (patch) | |
| tree | 99827259390c2533e6bb6dbbcff42a672f453c30 | |
| parent | 2fd5c7e9f21011e61117282807247b7cf074f474 (diff) | |
| parent | 1c81095486f56ca9dcfa19177594d6e5be1fbe0a (diff) | |
| download | rabbitmq-server-git-065c849786dfa916f3b1d06513636398d06ecb47.tar.gz | |
Merge branch 'stable'
Conflicts:
src/rabbit_queue_location_random.erl
test/queue_master_location_SUITE.erl
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 20 | ||||
| -rw-r--r-- | test/queue_master_location_SUITE.erl | 141 |
6 files changed, 149 insertions, 34 deletions
diff --git a/.gitignore b/.gitignore index ecf5261f91..42cc7ad1cc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .*.sw? *.beam *.coverdata +MnesiaCore.* /.erlang.mk/ /cover/ /debug/ diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a6571defcb..82169af7cb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -19,8 +19,9 @@ -export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3, report_deaths/4, store_updated_slaves/1, - initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, + initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1, + is_mirrored/1, is_mirrored_ha_nodes/1, + update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -31,6 +32,8 @@ -include("rabbit.hrl"). +-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes). + -rabbit_boot_step( {?MODULE, [{description, "HA policy validation"}, @@ -370,6 +373,12 @@ is_mirrored(Q) -> _ -> false end. +is_mirrored_ha_nodes(Q) -> + case module(Q) of + {ok, ?HA_NODES_MODULE} -> true; + _ -> false + end. + actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids, sync_slave_pids = SSPids}) -> diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index a1e73ee5d5..5d6f568d4a 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -37,8 +37,8 @@ description() -> [{description, <<"Locate queue master node from cluster node with least bound queues">>}]. -queue_master_location(#amqqueue{}) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(), +queue_master_location(#amqqueue{} = Q) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), VHosts = rabbit_vhost:list(), BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 5a24a00aa0..83fd8cd519 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -37,8 +37,8 @@ description() -> [{description, <<"Locate queue master node from cluster in a random manner">>}]. -queue_master_location(#amqqueue{}) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(), +queue_master_location(#amqqueue{} = Q) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), MasterNode = lists:nth(RandomPos + 1, Cluster), {ok, MasterNode}. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index 0b9efdd6fc..c2cc489ec2 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -24,7 +24,7 @@ get_location_mod_by_config/1, get_location_mod_by_args/1, get_location_mod_by_policy/1, - all_nodes/0]). + all_nodes/1]). lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> @@ -92,4 +92,20 @@ get_location_mod_by_config(#amqqueue{}) -> _ -> {error, "queue_master_locator undefined"} end. -all_nodes() -> rabbit_mnesia:cluster_nodes(running). +all_nodes(Queue = #amqqueue{}) -> + handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue). + +handle_is_mirrored_ha_nodes(false, _Queue) -> + % Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means + % that any node in the cluster is eligible to be the new queue master node + rabbit_nodes:all_running(); +handle_is_mirrored_ha_nodes(true, Queue) -> + % Note: ha-mode is 'nodes', which explicitly specifies allowed nodes. + % We must use suggested_queue_nodes to get that list of nodes as the + % starting point for finding the queue master location + handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)). + +handle_suggested_queue_nodes({_MNode, []}) -> + rabbit_nodes:all_running(); +handle_suggested_queue_nodes({MNode, SNodes}) -> + [MNode | SNodes]. diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index 8d056d3fe9..457cf47b0c 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -34,6 +34,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -50,6 +51,9 @@ groups() -> {cluster_size_3, [], [ declare_args, declare_policy, + declare_policy_nodes, + declare_policy_all, + declare_policy_exactly, declare_config, calculate_min_master, calculate_random, @@ -111,7 +115,7 @@ end_per_testcase(Testcase, Config) -> declare_args(Config) -> setup_test_environment(Config), unset_location_config(Config), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}], declare(Config, QueueName, false, false, Args, none), verify_min_master(Config, Q). @@ -120,14 +124,75 @@ declare_policy(Config) -> setup_test_environment(Config), unset_location_config(Config), set_location_policy(Config, ?POLICY, <<"min-masters">>), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), declare(Config, QueueName, false, false, _Args=[], none), verify_min_master(Config, Q). +declare_policy_nodes(Config) -> + setup_test_environment(Config), + unset_location_config(Config), + % Note: + % Node0 has 15 queues, Node1 has 8 and Node2 has 1 + Node0Name = rabbit_data_coercion:to_binary( + rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), + Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + Node1Name = rabbit_data_coercion:to_binary(Node1), + Nodes = [Node1Name, Node0Name], + Policy = [{<<"queue-master-locator">>, <<"min-masters">>}, + {<<"ha-mode">>, <<"nodes">>}, + {<<"ha-params">>, Nodes}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY, + <<".*">>, <<"queues">>, Policy), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), + declare(Config, QueueName, false, false, _Args=[], none), + verify_min_master(Config, Q, Node1). + +declare_policy_all(Config) -> + setup_test_environment(Config), + unset_location_config(Config), + % Note: + % Node0 has 15 queues, Node1 has 8 and Node2 has 1 + Policy = [{<<"queue-master-locator">>, <<"min-masters">>}, + {<<"ha-mode">>, <<"all">>}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY, + <<".*">>, <<"queues">>, Policy), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), + declare(Config, QueueName, false, false, _Args=[], none), + verify_min_master(Config, Q). + +declare_policy_exactly(Config) -> + setup_test_environment(Config), + unset_location_config(Config), + Policy = [{<<"queue-master-locator">>, <<"min-masters">>}, + {<<"ha-mode">>, <<"exactly">>}, + {<<"ha-params">>, 2}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY, + <<".*">>, <<"queues">>, Policy), + QueueRes = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), + declare(Config, QueueRes, false, false, _Args=[], none), + + Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + rabbit_ct_broker_helpers:control_action(sync_queue, Node0, + [binary_to_list(Q)], [{"-p", "/"}]), + wait_for_sync(Config, Node0, QueueRes, 1), + + {ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0, + rabbit_amqqueue, lookup, [QueueRes]), + {MNode0, [SNode], [SSNode]} = rabbit_ct_broker_helpers:rpc(Config, Node0, + rabbit_mirror_queue_misc, + actual_queue_nodes, [Queue]), + ?assertEqual(SNode, SSNode), + {ok, MNode1} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_queue_master_location_misc, + lookup_master, [Q, ?DEFAULT_VHOST_PATH]), + ?assertEqual(MNode0, MNode1), + Node2 = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename), + ?assertEqual(MNode1, Node2). + declare_config(Config) -> setup_test_environment(Config), set_location_config(Config, <<"min-masters">>), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), declare(Config, QueueName, false, false, _Args=[], none), verify_min_master(Config, Q), unset_location_config(Config), @@ -139,7 +204,7 @@ declare_config(Config) -> calculate_min_master(Config) -> setup_test_environment(Config), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}], declare(Config, QueueName, false, false, Args, none), verify_min_master(Config, Q), @@ -147,7 +212,7 @@ calculate_min_master(Config) -> calculate_random(Config) -> setup_test_environment(Config), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), Args = [{<<"x-queue-master-locator">>, longstr, <<"random">>}], declare(Config, QueueName, false, false, Args, none), verify_random(Config, Q), @@ -155,7 +220,7 @@ calculate_random(Config) -> calculate_client_local(Config) -> setup_test_environment(Config), - QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), Args = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}], declare(Config, QueueName, false, false, Args, none), verify_client_local(Config, Q), @@ -232,42 +297,66 @@ min_master_node(Config) -> set_location_config(Config, Strategy) -> Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - [ok = rpc:call(Node, application, set_env, - [rabbit, queue_master_locator, Strategy]) || Node <- Nodes], + [ok = rabbit_ct_broker_helpers:rpc(Config, Node, + application, set_env, + [rabbit, queue_master_locator, Strategy]) || Node <- Nodes], ok. unset_location_config(Config) -> Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - [ok = rpc:call(Node, application, unset_env, - [rabbit, queue_master_locator]) || Node <- Nodes], + [ok = rabbit_ct_broker_helpers:rpc(Config, Node, + application, unset_env, + [rabbit, queue_master_locator]) || Node <- Nodes], ok. -declare(Config, QueueName, Durable, AutoDelete, Args, Owner) -> - Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - {new, Queue} = rpc:call(Node, rabbit_amqqueue, declare, - [QueueName, Durable, AutoDelete, Args, Owner, - <<"acting-user">>]), +declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) -> + Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>], + {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1), Queue. +verify_min_master(Config, Q, MinMasterNode) -> + Rpc = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_queue_master_location_misc, + lookup_master, [Q, ?DEFAULT_VHOST_PATH]), + ?assertEqual({ok, MinMasterNode}, Rpc). + verify_min_master(Config, Q) -> - Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), MinMaster = min_master_node(Config), - ct:pal("Expecting min master ~p~n", [MinMaster]), - {ok, MinMaster} = rpc:call(Node, rabbit_queue_master_location_misc, - lookup_master, [Q, ?DEFAULT_VHOST_PATH]). + verify_min_master(Config, Q, MinMaster). verify_random(Config, Q) -> - [Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, - nodename), - {ok, Master} = rpc:call(Node, rabbit_queue_master_location_misc, - lookup_master, [Q, ?DEFAULT_VHOST_PATH]), - true = lists:member(Master, Nodes). + [Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {ok, Master} = rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_queue_master_location_misc, + lookup_master, [Q, ?DEFAULT_VHOST_PATH]), + ?assert(lists:member(Master, Nodes)). verify_client_local(Config, Q) -> Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - {ok, Node} = rpc:call(Node, rabbit_queue_master_location_misc, - lookup_master, [Q, ?DEFAULT_VHOST_PATH]). + Rpc = rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_queue_master_location_misc, + lookup_master, [Q, ?DEFAULT_VHOST_PATH]), + ?assertEqual({ok, Node}, Rpc). set_location_policy(Config, Name, Strategy) -> ok = rabbit_ct_broker_helpers:set_policy(Config, 0, Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]). + +wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) -> + wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600). + +wait_for_sync(_, _, _, _, 0) -> + throw(sync_timeout); +wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) -> + case synced(Config, Nodename, Q, ExpectedSSPidLen) of + true -> ok; + false -> timer:sleep(100), + wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1) + end. + +synced(Config, Nodename, Q, ExpectedSSPidLen) -> + Args = [<<"/">>, [name, synchronised_slave_pids]], + Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, + rabbit_amqqueue, info_all, Args), + [SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1], + length(SSPids) =:= ExpectedSSPidLen. |
