diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 111 |
1 files changed, 110 insertions, 1 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 5872d97d4c..8abfa68626 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -31,6 +31,7 @@ %% The first two are change_policy, the last two are change_cluster -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -60,7 +61,8 @@ groups() -> ]}, {cluster_size_3, [], [ change_policy, - rapid_change + rapid_change, + random_policy ]} ]} ]. @@ -253,6 +255,9 @@ promote_on_shutdown(Config) -> durable = true}), ok. +random_policy(Config) -> + run_proper(fun prop_random_policy/1, [Config]). + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> @@ -327,3 +332,107 @@ get_stacktrace() -> _:e -> erlang:get_stacktrace() end. + +%%---------------------------------------------------------------------------- +run_proper(Fun, Args) -> + case proper:counterexample(erlang:apply(Fun, Args), + [{numtests, 25}, + {on_output, fun(F, A) -> + io:format(user, F, A) + end}]) of + true -> + true; + Value -> + exit(Value) + end. + +prop_random_policy(Config) -> + [NodeA, _, _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + ?FORALL( + Policies, non_empty(list(policy_gen(Nodes))), + begin + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), + %% Add some load so mirrors can be busy synchronising + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), + %% Apply policies in parallel on all nodes + apply_in_parallel(Config, Nodes, Policies), + %% The last policy is the final state + Last = lists:last(Policies), + %% Give it some time to generate all internal notifications + timer:sleep(2000), + %% Ensure the owner/master is able to process a call request, + %% which means that all pending casts have been processed. + %% Use the information returned by owner/master to verify the + %% test result + Info = find_queue(?QNAME, NodeA), + %% Gets owner/master + Pid = proplists:get_value(pid, Info), + FinalInfo = rpc:call(node(Pid), gen_server, call, [Pid, info], 5000), + %% Check the result + Result = verify_policy(Last, FinalInfo), + %% Cleanup + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + (catch rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY)), + Result + end). + +apply_in_parallel(Config, Nodes, Policies) -> + Self = self(), + [spawn_link(fun() -> + [begin + apply_policy(Config, N, Policy) + end || Policy <- Policies], + Self ! parallel_task_done + end) || N <- Nodes], + [receive + parallel_task_done -> + ok + end || _ <- Nodes]. + +%% Proper generators +policy_gen(Nodes) -> + %% Stop mirroring needs to be called often to trigger rabbitmq-server#803 + frequency([{3, undefined}, + {1, all}, + {1, {nodes, nodes_gen(Nodes)}}, + {1, {exactly, choose(1, 3)}} + ]). + +nodes_gen(Nodes) -> + ?LET(List, non_empty(list(oneof(Nodes))), + sets:to_list(sets:from_list(List))). + +%% Checks +verify_policy(undefined, Info) -> + %% If the queue is not mirrored, it returns '' + '' == proplists:get_value(slave_pids, Info); +verify_policy(all, Info) -> + 2 == length(proplists:get_value(slave_pids, Info)); +verify_policy({exactly, 1}, Info) -> + %% If the queue is mirrored, it returns a list + [] == proplists:get_value(slave_pids, Info); +verify_policy({exactly, N}, Info) -> + (N - 1) == length(proplists:get_value(slave_pids, Info)); +verify_policy({nodes, Nodes}, Info) -> + Master = node(proplists:get_value(pid, Info)), + Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)], + lists:sort(Nodes) == lists:sort([Master | Slaves]). + +%% Policies +apply_policy(Config, N, undefined) -> + (catch rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY)); +apply_policy(Config, N, all) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {nodes, Nodes}) -> + NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes], + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"nodes">>, NNodes}, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {exactly, Exactly}) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"exactly">>, Exactly}, + [{<<"ha-sync-mode">>, <<"automatic">>}]). |
