summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2017-09-23 22:37:07 +0300
committerMichael Klishin <mklishin@pivotal.io>2017-09-23 22:37:07 +0300
commit065c849786dfa916f3b1d06513636398d06ecb47 (patch)
tree99827259390c2533e6bb6dbbcff42a672f453c30
parent2fd5c7e9f21011e61117282807247b7cf074f474 (diff)
parent1c81095486f56ca9dcfa19177594d6e5be1fbe0a (diff)
downloadrabbitmq-server-git-065c849786dfa916f3b1d06513636398d06ecb47.tar.gz
Merge branch 'stable'
Conflicts: src/rabbit_queue_location_random.erl test/queue_master_location_SUITE.erl
-rw-r--r--.gitignore1
-rw-r--r--src/rabbit_mirror_queue_misc.erl13
-rw-r--r--src/rabbit_queue_location_min_masters.erl4
-rw-r--r--src/rabbit_queue_location_random.erl4
-rw-r--r--src/rabbit_queue_master_location_misc.erl20
-rw-r--r--test/queue_master_location_SUITE.erl141
6 files changed, 149 insertions, 34 deletions
diff --git a/.gitignore b/.gitignore
index ecf5261f91..42cc7ad1cc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
.*.sw?
*.beam
*.coverdata
+MnesiaCore.*
/.erlang.mk/
/cover/
/debug/
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index a6571defcb..82169af7cb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -19,8 +19,9 @@
-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
- initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
+ initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1,
+ is_mirrored/1, is_mirrored_ha_nodes/1,
+ update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -31,6 +32,8 @@
-include("rabbit.hrl").
+-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
+
-rabbit_boot_step(
{?MODULE,
[{description, "HA policy validation"},
@@ -370,6 +373,12 @@ is_mirrored(Q) ->
_ -> false
end.
+is_mirrored_ha_nodes(Q) ->
+ case module(Q) of
+ {ok, ?HA_NODES_MODULE} -> true;
+ _ -> false
+ end.
+
actual_queue_nodes(#amqqueue{pid = MPid,
slave_pids = SPids,
sync_slave_pids = SSPids}) ->
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index a1e73ee5d5..5d6f568d4a 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -37,8 +37,8 @@ description() ->
[{description,
<<"Locate queue master node from cluster node with least bound queues">>}].
-queue_master_location(#amqqueue{}) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(),
+queue_master_location(#amqqueue{} = Q) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
VHosts = rabbit_vhost:list(),
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 5a24a00aa0..83fd8cd519 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -37,8 +37,8 @@ description() ->
[{description,
<<"Locate queue master node from cluster in a random manner">>}].
-queue_master_location(#amqqueue{}) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(),
+queue_master_location(#amqqueue{} = Q) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
MasterNode = lists:nth(RandomPos + 1, Cluster),
{ok, MasterNode}.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index 0b9efdd6fc..c2cc489ec2 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -24,7 +24,7 @@
get_location_mod_by_config/1,
get_location_mod_by_args/1,
get_location_mod_by_policy/1,
- all_nodes/0]).
+ all_nodes/1]).
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
@@ -92,4 +92,20 @@ get_location_mod_by_config(#amqqueue{}) ->
_ -> {error, "queue_master_locator undefined"}
end.
-all_nodes() -> rabbit_mnesia:cluster_nodes(running).
+all_nodes(Queue = #amqqueue{}) ->
+ handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue).
+
+handle_is_mirrored_ha_nodes(false, _Queue) ->
+ % Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means
+ % that any node in the cluster is eligible to be the new queue master node
+ rabbit_nodes:all_running();
+handle_is_mirrored_ha_nodes(true, Queue) ->
+ % Note: ha-mode is 'nodes', which explicitly specifies allowed nodes.
+ % We must use suggested_queue_nodes to get that list of nodes as the
+ % starting point for finding the queue master location
+ handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)).
+
+handle_suggested_queue_nodes({_MNode, []}) ->
+ rabbit_nodes:all_running();
+handle_suggested_queue_nodes({MNode, SNodes}) ->
+ [MNode | SNodes].
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
index 8d056d3fe9..457cf47b0c 100644
--- a/test/queue_master_location_SUITE.erl
+++ b/test/queue_master_location_SUITE.erl
@@ -34,6 +34,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@@ -50,6 +51,9 @@ groups() ->
{cluster_size_3, [], [
declare_args,
declare_policy,
+ declare_policy_nodes,
+ declare_policy_all,
+ declare_policy_exactly,
declare_config,
calculate_min_master,
calculate_random,
@@ -111,7 +115,7 @@ end_per_testcase(Testcase, Config) ->
declare_args(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
declare(Config, QueueName, false, false, Args, none),
verify_min_master(Config, Q).
@@ -120,14 +124,75 @@ declare_policy(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
set_location_policy(Config, ?POLICY, <<"min-masters">>),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q).
+declare_policy_nodes(Config) ->
+ setup_test_environment(Config),
+ unset_location_config(Config),
+ % Note:
+ % Node0 has 15 queues, Node1 has 8 and Node2 has 1
+ Node0Name = rabbit_data_coercion:to_binary(
+ rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ Node1Name = rabbit_data_coercion:to_binary(Node1),
+ Nodes = [Node1Name, Node0Name],
+ Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
+ {<<"ha-mode">>, <<"nodes">>},
+ {<<"ha-params">>, Nodes}],
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
+ <<".*">>, <<"queues">>, Policy),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
+ declare(Config, QueueName, false, false, _Args=[], none),
+ verify_min_master(Config, Q, Node1).
+
+declare_policy_all(Config) ->
+ setup_test_environment(Config),
+ unset_location_config(Config),
+ % Note:
+ % Node0 has 15 queues, Node1 has 8 and Node2 has 1
+ Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
+ {<<"ha-mode">>, <<"all">>}],
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
+ <<".*">>, <<"queues">>, Policy),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
+ declare(Config, QueueName, false, false, _Args=[], none),
+ verify_min_master(Config, Q).
+
+declare_policy_exactly(Config) ->
+ setup_test_environment(Config),
+ unset_location_config(Config),
+ Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
+ {<<"ha-mode">>, <<"exactly">>},
+ {<<"ha-params">>, 2}],
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
+ <<".*">>, <<"queues">>, Policy),
+ QueueRes = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
+ declare(Config, QueueRes, false, false, _Args=[], none),
+
+ Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ rabbit_ct_broker_helpers:control_action(sync_queue, Node0,
+ [binary_to_list(Q)], [{"-p", "/"}]),
+ wait_for_sync(Config, Node0, QueueRes, 1),
+
+ {ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0,
+ rabbit_amqqueue, lookup, [QueueRes]),
+ {MNode0, [SNode], [SSNode]} = rabbit_ct_broker_helpers:rpc(Config, Node0,
+ rabbit_mirror_queue_misc,
+ actual_queue_nodes, [Queue]),
+ ?assertEqual(SNode, SSNode),
+ {ok, MNode1} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
+ ?assertEqual(MNode0, MNode1),
+ Node2 = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
+ ?assertEqual(MNode1, Node2).
+
declare_config(Config) ->
setup_test_environment(Config),
set_location_config(Config, <<"min-masters">>),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q),
unset_location_config(Config),
@@ -139,7 +204,7 @@ declare_config(Config) ->
calculate_min_master(Config) ->
setup_test_environment(Config),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
declare(Config, QueueName, false, false, Args, none),
verify_min_master(Config, Q),
@@ -147,7 +212,7 @@ calculate_min_master(Config) ->
calculate_random(Config) ->
setup_test_environment(Config),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"random">>}],
declare(Config, QueueName, false, false, Args, none),
verify_random(Config, Q),
@@ -155,7 +220,7 @@ calculate_random(Config) ->
calculate_client_local(Config) ->
setup_test_environment(Config),
- QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}],
declare(Config, QueueName, false, false, Args, none),
verify_client_local(Config, Q),
@@ -232,42 +297,66 @@ min_master_node(Config) ->
set_location_config(Config, Strategy) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- [ok = rpc:call(Node, application, set_env,
- [rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
+ [ok = rabbit_ct_broker_helpers:rpc(Config, Node,
+ application, set_env,
+ [rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
ok.
unset_location_config(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- [ok = rpc:call(Node, application, unset_env,
- [rabbit, queue_master_locator]) || Node <- Nodes],
+ [ok = rabbit_ct_broker_helpers:rpc(Config, Node,
+ application, unset_env,
+ [rabbit, queue_master_locator]) || Node <- Nodes],
ok.
-declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
- Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- {new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
- [QueueName, Durable, AutoDelete, Args, Owner,
- <<"acting-user">>]),
+declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) ->
+ Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>],
+ {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1),
Queue.
+verify_min_master(Config, Q, MinMasterNode) ->
+ Rpc = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
+ ?assertEqual({ok, MinMasterNode}, Rpc).
+
verify_min_master(Config, Q) ->
- Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
MinMaster = min_master_node(Config),
- ct:pal("Expecting min master ~p~n", [MinMaster]),
- {ok, MinMaster} = rpc:call(Node, rabbit_queue_master_location_misc,
- lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
+ verify_min_master(Config, Q, MinMaster).
verify_random(Config, Q) ->
- [Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
- nodename),
- {ok, Master} = rpc:call(Node, rabbit_queue_master_location_misc,
- lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
- true = lists:member(Master, Nodes).
+ [Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {ok, Master} = rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
+ ?assert(lists:member(Master, Nodes)).
verify_client_local(Config, Q) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- {ok, Node} = rpc:call(Node, rabbit_queue_master_location_misc,
- lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
+ Rpc = rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
+ ?assertEqual({ok, Node}, Rpc).
set_location_policy(Config, Name, Strategy) ->
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]).
+
+wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) ->
+ wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600).
+
+wait_for_sync(_, _, _, _, 0) ->
+ throw(sync_timeout);
+wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) ->
+ case synced(Config, Nodename, Q, ExpectedSSPidLen) of
+ true -> ok;
+ false -> timer:sleep(100),
+ wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1)
+ end.
+
+synced(Config, Nodename, Q, ExpectedSSPidLen) ->
+ Args = [<<"/">>, [name, synchronised_slave_pids]],
+ Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
+ rabbit_amqqueue, info_all, Args),
+ [SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1],
+ length(SSPids) =:= ExpectedSSPidLen.