diff options
| author | Andrew Bruce <me@andrewbruce.net> | 2016-10-05 15:27:00 +0100 |
|---|---|---|
| committer | Andrew Bruce <me@andrewbruce.net> | 2016-10-05 15:27:00 +0100 |
| commit | 157cdaa0d0c9b7753fbfcb2dba37cd48b8101407 (patch) | |
| tree | 4f5fb822ed7e359a264920650e0522ebc2be6013 | |
| parent | 75795f977c8b96fa580d58cd9bda049e312b2044 (diff) | |
| parent | c1c56a29b098b7c5ad0915a340a2a039e61a22a3 (diff) | |
| download | rabbitmq-server-git-157cdaa0d0c9b7753fbfcb2dba37cd48b8101407.tar.gz | |
Merge branch 'rabbitmq-server-990' into stablerabbitmq_v3_6_6_milestone5
| -rw-r--r-- | src/rabbit_mirror_queue_mode_nodes.erl | 36 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 55 |
2 files changed, 73 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_mode_nodes.erl b/src/rabbit_mirror_queue_mode_nodes.erl index e63f340373..31c55722a5 100644 --- a/src/rabbit_mirror_queue_mode_nodes.erl +++ b/src/rabbit_mirror_queue_mode_nodes.erl @@ -32,29 +32,37 @@ description() -> [{description, <<"Mirror queue to specified nodes">>}]. -suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) -> - Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], +suggested_queue_nodes(PolicyNodes0, CurrentMaster, _SNodes, SSNodes, NodesRunningRabbitMQ) -> + PolicyNodes1 = [list_to_atom(binary_to_list(Node)) || Node <- PolicyNodes0], %% If the current master is not in the nodes specified, then what we want %% to do depends on whether there are any synchronised slaves. If there %% are then we can just kill the current master - the admin has asked for %% a migration and we should give it to them. If there are not however %% then we must keep the master around so as not to lose messages. - Nodes = case SSNodes of - [] -> lists:usort([MNode | Nodes1]); - _ -> Nodes1 - end, - Unavailable = Nodes -- Poss, - Available = Nodes -- Unavailable, - case Available of + + PolicyNodes = case SSNodes of + [] -> lists:usort([CurrentMaster | PolicyNodes1]); + _ -> PolicyNodes1 + end, + Unavailable = PolicyNodes -- NodesRunningRabbitMQ, + AvailablePolicyNodes = PolicyNodes -- Unavailable, + case AvailablePolicyNodes of [] -> %% We have never heard of anything? Not much we can do but %% keep the master alive. - {MNode, []}; - _ -> case lists:member(MNode, Available) of - true -> {MNode, Available -- [MNode]}; + {CurrentMaster, []}; + _ -> case lists:member(CurrentMaster, AvailablePolicyNodes) of + true -> {CurrentMaster, + AvailablePolicyNodes -- [CurrentMaster]}; false -> %% Make sure the new master is synced! In order to %% get here SSNodes must not be empty. - [NewMNode | _] = SSNodes, - {NewMNode, Available -- [NewMNode]} + SyncPolicyNodes = [Node || + Node <- AvailablePolicyNodes, + lists:member(Node, SSNodes)], + NewMaster = case SyncPolicyNodes of + [Node | _] -> Node; + [] -> erlang:hd(SSNodes) + end, + {NewMaster, AvailablePolicyNodes -- [NewMaster]} end end. diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index bba7fad707..502e3a7e86 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -61,7 +61,8 @@ groups() -> ]}, {cluster_size_3, [], [ change_policy, - rapid_change + rapid_change, + nodes_policy_should_pick_master_from_its_params % FIXME: Re-enable those tests when the know issues are % fixed. %failing_random_policies, @@ -258,6 +259,48 @@ promote_on_shutdown(Config) -> durable = true}), ok. +nodes_policy_should_pick_master_from_its_params(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], + [all])), + %% --> Master: A + %% Slaves: [B, C] or [C, B] + Info = find_queue(?QNAME, A), + SSPids = proplists:get_value(synchronised_slave_pids, Info), + + %% Choose slave that isn't the first sync slave. Cover a bug that always + %% chose the first, even if it was not part of the policy + LastSlave = node(lists:last(SSPids)), + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], + [{nodes, [LastSlave]}])), + %% --> Master: B or C (depends on the order of current slaves) + %% Slaves: [] + + %% Now choose a new master that isn't synchronised. The previous + %% policy made sure that the queue only runs on one node (the last + %% from the initial synchronised list). Thus, by taking the first + %% node from this list, we know it is not synchronised. + %% + %% Because the policy doesn't cover any synchronised slave, RabbitMQ + %% should instead use an existing synchronised slave as the new master, + %% even though that isn't in the policy. + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], + [{nodes, [LastSlave, A]}])), + %% --> Master: B or C (same as previous policy) + %% Slaves: [A] + + NewMaster = node(erlang:hd(SSPids)), + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], + [{nodes, [NewMaster]}])), + %% --> Master: B or C (the other one compared to previous policy) + %% Slaves: [] + + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY). + random_policy(Config) -> run_proper(fun prop_random_policy/1, [Config]). @@ -364,9 +407,8 @@ prop_random_policy(Config) -> Policies, non_empty(list(policy_gen(Nodes))), test_random_policy(Config, Nodes, Policies)). -test_random_policy(Config, Nodes, Policies) -> +apply_policy_to_declared_queue(Config, Ch, Nodes, Policies) -> [NodeA | _] = Nodes, - Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), %% Add some load so mirrors can be busy synchronising rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), @@ -375,7 +417,12 @@ test_random_policy(Config, Nodes, Policies) -> %% Give it some time to generate all internal notifications timer:sleep(2000), %% Check the result - Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30), + wait_for_last_policy(?QNAME, NodeA, Policies, 30). + +test_random_policy(Config, Nodes, Policies) -> + [NodeA | _] = Nodes, + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + Result = apply_policy_to_declared_queue(Config, Ch, Nodes, Policies), %% Cleanup amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY), |
