summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-09-03 01:24:06 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-09-03 01:24:06 +0300
commitf8371f775321ec5a0e15776bfa1d0072c9732c84 (patch)
tree5696defc11a8c7268f787e374249796699909332 /test
parent1e3c0b48edee2fdd4ed713faabf4ee5521ea7e59 (diff)
parenteabdd68997169ce7a5fc25adce43e05028cb1b49 (diff)
downloadrabbitmq-server-git-f8371f775321ec5a0e15776bfa1d0072c9732c84.tar.gz
Merge branch 'rabbitmq-server-803-stable' into stablerabbitmq_v3_6_6_milestone4
Diffstat (limited to 'test')
-rw-r--r--test/dynamic_ha_SUITE.erl154
1 files changed, 152 insertions, 2 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 5872d97d4c..bba7fad707 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -31,6 +31,7 @@
%% The first two are change_policy, the last two are change_cluster
-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -61,6 +62,10 @@ groups() ->
{cluster_size_3, [], [
change_policy,
rapid_change
+ % FIXME: Re-enable those tests when the know issues are
+ % fixed.
+ %failing_random_policies,
+ %random_policy
]}
]}
].
@@ -137,7 +142,7 @@ change_policy(Config) ->
assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
%% Clear the policy, and we go back to non-mirrored
- rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
assert_slaves(A, ?QNAME, {A, ''}),
%% Test switching "away" from an unmirrored node
@@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) ->
after 0 ->
rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
<<"all">>),
- rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
rapid_loop(Config, Node, MRef)
end.
@@ -253,6 +258,23 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+random_policy(Config) ->
+ run_proper(fun prop_random_policy/1, [Config]).
+
+failing_random_policies(Config) ->
+ [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ %% Those set of policies were found as failing by PropEr in the
+ %% `random_policy` test above. We add them explicitely here to make
+ %% sure they get tested.
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{nodes, [A, B]}, {nodes, [A]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{exactly, 3}, undefined, all, {nodes, [B]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3},
+ undefined, {exactly, 3}, all])).
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->
@@ -327,3 +349,131 @@ get_stacktrace() ->
_:e ->
erlang:get_stacktrace()
end.
+
+%%----------------------------------------------------------------------------
+run_proper(Fun, Args) ->
+ ?assertEqual(true,
+ proper:counterexample(erlang:apply(Fun, Args),
+ [{numtests, 25},
+ {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])).
+
+prop_random_policy(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ ?FORALL(
+ Policies, non_empty(list(policy_gen(Nodes))),
+ test_random_policy(Config, Nodes, Policies)).
+
+test_random_policy(Config, Nodes, Policies) ->
+ [NodeA | _] = Nodes,
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ %% Add some load so mirrors can be busy synchronising
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
+ %% Apply policies in parallel on all nodes
+ apply_in_parallel(Config, Nodes, Policies),
+ %% Give it some time to generate all internal notifications
+ timer:sleep(2000),
+ %% Check the result
+ Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
+ %% Cleanup
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
+ Result.
+
+apply_in_parallel(Config, Nodes, Policies) ->
+ Self = self(),
+ [spawn_link(fun() ->
+ [begin
+ apply_policy(Config, N, Policy)
+ end || Policy <- Policies],
+ Self ! parallel_task_done
+ end) || N <- Nodes],
+ [receive
+ parallel_task_done ->
+ ok
+ end || _ <- Nodes].
+
+%% Proper generators
+policy_gen(Nodes) ->
+ %% Stop mirroring needs to be called often to trigger rabbitmq-server#803
+ frequency([{3, undefined},
+ {1, all},
+ {1, {nodes, nodes_gen(Nodes)}},
+ {1, {exactly, choose(1, 3)}}
+ ]).
+
+nodes_gen(Nodes) ->
+ ?LET(List, non_empty(list(oneof(Nodes))),
+ sets:to_list(sets:from_list(List))).
+
+%% Checks
+wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
+ %% Ensure the owner/master is able to process a call request,
+ %% which means that all pending casts have been processed.
+ %% Use the information returned by owner/master to verify the
+ %% test result
+ Info = find_queue(QueueName, NodeA),
+ Pid = proplists:get_value(pid, Info),
+ Node = node(Pid),
+ %% Gets owner/master
+ case rpc:call(Node, gen_server, call, [Pid, info], 5000) of
+ {badrpc, _} ->
+ %% The queue is probably being migrated to another node.
+ %% Let's wait a bit longer.
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
+ FinalInfo ->
+ %% The last policy is the final state
+ LastPolicy = lists:last(TestedPolicies),
+ case verify_policy(LastPolicy, FinalInfo) of
+ true ->
+ true;
+ false when Tries =:= 1 ->
+ Policies = rpc:call(Node, rabbit_policy, list, [], 5000),
+ ct:pal(
+ "Last policy not applied:~n"
+ " Queue node: ~s (~p)~n"
+ " Queue info: ~p~n"
+ " Configured policies: ~p~n"
+ " Tested policies: ~p",
+ [Node, Pid, FinalInfo, Policies, TestedPolicies]),
+ false;
+ false ->
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies,
+ Tries - 1)
+ end
+ end.
+
+verify_policy(undefined, Info) ->
+ %% If the queue is not mirrored, it returns ''
+ '' == proplists:get_value(slave_pids, Info);
+verify_policy(all, Info) ->
+ 2 == length(proplists:get_value(slave_pids, Info));
+verify_policy({exactly, 1}, Info) ->
+ %% If the queue is mirrored, it returns a list
+ [] == proplists:get_value(slave_pids, Info);
+verify_policy({exactly, N}, Info) ->
+ (N - 1) == length(proplists:get_value(slave_pids, Info));
+verify_policy({nodes, Nodes}, Info) ->
+ Master = node(proplists:get_value(pid, Info)),
+ Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)],
+ lists:sort(Nodes) == lists:sort([Master | Slaves]).
+
+%% Policies
+apply_policy(Config, N, undefined) ->
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY);
+apply_policy(Config, N, all) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {nodes, Nodes}) ->
+ NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes],
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"nodes">>, NNodes},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {exactly, Exactly}) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"exactly">>, Exactly},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]).