diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2016-09-03 01:24:06 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2016-09-03 01:24:06 +0300 |
| commit | f8371f775321ec5a0e15776bfa1d0072c9732c84 (patch) | |
| tree | 5696defc11a8c7268f787e374249796699909332 /test | |
| parent | 1e3c0b48edee2fdd4ed713faabf4ee5521ea7e59 (diff) | |
| parent | eabdd68997169ce7a5fc25adce43e05028cb1b49 (diff) | |
| download | rabbitmq-server-git-f8371f775321ec5a0e15776bfa1d0072c9732c84.tar.gz | |
Merge branch 'rabbitmq-server-803-stable' into stablerabbitmq_v3_6_6_milestone4
Diffstat (limited to 'test')
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 154 |
1 files changed, 152 insertions, 2 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 5872d97d4c..bba7fad707 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -31,6 +31,7 @@ %% The first two are change_policy, the last two are change_cluster -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -61,6 +62,10 @@ groups() -> {cluster_size_3, [], [ change_policy, rapid_change + % FIXME: Re-enable those tests when the know issues are + % fixed. + %failing_random_policies, + %random_policy ]} ]} ]. @@ -137,7 +142,7 @@ change_policy(Config) -> assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]), %% Clear the policy, and we go back to non-mirrored - rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), assert_slaves(A, ?QNAME, {A, ''}), %% Test switching "away" from an unmirrored node @@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) -> after 0 -> rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY, <<"all">>), - rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), rapid_loop(Config, Node, MRef) end. @@ -253,6 +258,23 @@ promote_on_shutdown(Config) -> durable = true}), ok. +random_policy(Config) -> + run_proper(fun prop_random_policy/1, [Config]). + +failing_random_policies(Config) -> + [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + %% Those set of policies were found as failing by PropEr in the + %% `random_policy` test above. We add them explicitely here to make + %% sure they get tested. + ?assertEqual(true, test_random_policy(Config, Nodes, + [{nodes, [A, B]}, {nodes, [A]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [{exactly, 3}, undefined, all, {nodes, [B]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3}, + undefined, {exactly, 3}, all])). + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> @@ -327,3 +349,131 @@ get_stacktrace() -> _:e -> erlang:get_stacktrace() end. + +%%---------------------------------------------------------------------------- +run_proper(Fun, Args) -> + ?assertEqual(true, + proper:counterexample(erlang:apply(Fun, Args), + [{numtests, 25}, + {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +prop_random_policy(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + ?FORALL( + Policies, non_empty(list(policy_gen(Nodes))), + test_random_policy(Config, Nodes, Policies)). + +test_random_policy(Config, Nodes, Policies) -> + [NodeA | _] = Nodes, + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), + %% Add some load so mirrors can be busy synchronising + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), + %% Apply policies in parallel on all nodes + apply_in_parallel(Config, Nodes, Policies), + %% Give it some time to generate all internal notifications + timer:sleep(2000), + %% Check the result + Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30), + %% Cleanup + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY), + Result. + +apply_in_parallel(Config, Nodes, Policies) -> + Self = self(), + [spawn_link(fun() -> + [begin + apply_policy(Config, N, Policy) + end || Policy <- Policies], + Self ! parallel_task_done + end) || N <- Nodes], + [receive + parallel_task_done -> + ok + end || _ <- Nodes]. + +%% Proper generators +policy_gen(Nodes) -> + %% Stop mirroring needs to be called often to trigger rabbitmq-server#803 + frequency([{3, undefined}, + {1, all}, + {1, {nodes, nodes_gen(Nodes)}}, + {1, {exactly, choose(1, 3)}} + ]). + +nodes_gen(Nodes) -> + ?LET(List, non_empty(list(oneof(Nodes))), + sets:to_list(sets:from_list(List))). + +%% Checks +wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) -> + %% Ensure the owner/master is able to process a call request, + %% which means that all pending casts have been processed. + %% Use the information returned by owner/master to verify the + %% test result + Info = find_queue(QueueName, NodeA), + Pid = proplists:get_value(pid, Info), + Node = node(Pid), + %% Gets owner/master + case rpc:call(Node, gen_server, call, [Pid, info], 5000) of + {badrpc, _} -> + %% The queue is probably being migrated to another node. + %% Let's wait a bit longer. + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1); + FinalInfo -> + %% The last policy is the final state + LastPolicy = lists:last(TestedPolicies), + case verify_policy(LastPolicy, FinalInfo) of + true -> + true; + false when Tries =:= 1 -> + Policies = rpc:call(Node, rabbit_policy, list, [], 5000), + ct:pal( + "Last policy not applied:~n" + " Queue node: ~s (~p)~n" + " Queue info: ~p~n" + " Configured policies: ~p~n" + " Tested policies: ~p", + [Node, Pid, FinalInfo, Policies, TestedPolicies]), + false; + false -> + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, + Tries - 1) + end + end. + +verify_policy(undefined, Info) -> + %% If the queue is not mirrored, it returns '' + '' == proplists:get_value(slave_pids, Info); +verify_policy(all, Info) -> + 2 == length(proplists:get_value(slave_pids, Info)); +verify_policy({exactly, 1}, Info) -> + %% If the queue is mirrored, it returns a list + [] == proplists:get_value(slave_pids, Info); +verify_policy({exactly, N}, Info) -> + (N - 1) == length(proplists:get_value(slave_pids, Info)); +verify_policy({nodes, Nodes}, Info) -> + Master = node(proplists:get_value(pid, Info)), + Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)], + lists:sort(Nodes) == lists:sort([Master | Slaves]). + +%% Policies +apply_policy(Config, N, undefined) -> + _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY); +apply_policy(Config, N, all) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {nodes, Nodes}) -> + NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes], + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"nodes">>, NNodes}, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {exactly, Exactly}) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"exactly">>, Exactly}, + [{<<"ha-sync-mode">>, <<"automatic">>}]). |
