summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-05-30 00:27:45 +0300
committerMichael Klishin <michael@novemberain.com>2016-05-30 00:27:45 +0300
commit455644498e97f91c7005bed85d2519db5f506030 (patch)
treeebf93b11a5ff4f83b37f17a0d9a6c8de4b91307f /test
parente0210397f3d8328f598d5829d3aa5ba7374f72c9 (diff)
parentf14ae2212e649d3d1017e0aa3a3ae93b1778cfa8 (diff)
downloadrabbitmq-server-git-455644498e97f91c7005bed85d2519db5f506030.tar.gz
Merge pull request #814 from rabbitmq/rabbitmq-server-803
Handle concurrent application of HA policy
Diffstat (limited to 'test')
-rw-r--r--test/dynamic_ha_SUITE.erl115
1 files changed, 112 insertions, 3 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 5872d97d4c..c54e4c2994 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -31,6 +31,7 @@
%% The first two are change_policy, the last two are change_cluster
-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -60,7 +61,8 @@ groups() ->
]},
{cluster_size_3, [], [
change_policy,
- rapid_change
+ rapid_change,
+ random_policy
]}
]}
].
@@ -137,7 +139,7 @@ change_policy(Config) ->
assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
%% Clear the policy, and we go back to non-mirrored
- rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
assert_slaves(A, ?QNAME, {A, ''}),
%% Test switching "away" from an unmirrored node
@@ -206,7 +208,7 @@ rapid_loop(Config, Node, MRef) ->
after 0 ->
rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
<<"all">>),
- rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
rapid_loop(Config, Node, MRef)
end.
@@ -253,6 +255,9 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+random_policy(Config) ->
+ run_proper(fun prop_random_policy/1, [Config]).
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->
@@ -327,3 +332,107 @@ get_stacktrace() ->
_:e ->
erlang:get_stacktrace()
end.
+
+%%----------------------------------------------------------------------------
+run_proper(Fun, Args) ->
+ case proper:counterexample(erlang:apply(Fun, Args),
+ [{numtests, 25},
+ {on_output, fun(F, A) ->
+ io:format(user, F, A)
+ end}]) of
+ true ->
+ true;
+ Value ->
+ exit(Value)
+ end.
+
+prop_random_policy(Config) ->
+ [NodeA, _, _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ ?FORALL(
+ Policies, non_empty(list(policy_gen(Nodes))),
+ begin
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ %% Add some load so mirrors can be busy synchronising
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
+ %% Apply policies in parallel on all nodes
+ apply_in_parallel(Config, Nodes, Policies),
+ %% The last policy is the final state
+ Last = lists:last(Policies),
+ %% Give it some time to generate all internal notifications
+ timer:sleep(2000),
+ %% Ensure the owner/master is able to process a call request,
+ %% which means that all pending casts have been processed.
+ %% Use the information returned by owner/master to verify the
+ %% test result
+ Info = find_queue(?QNAME, NodeA),
+ %% Gets owner/master
+ Pid = proplists:get_value(pid, Info),
+ FinalInfo = rpc:call(node(Pid), gen_server, call, [Pid, info], 5000),
+ %% Check the result
+ Result = verify_policy(Last, FinalInfo),
+ %% Cleanup
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
+ Result
+ end).
+
+apply_in_parallel(Config, Nodes, Policies) ->
+ Self = self(),
+ [spawn_link(fun() ->
+ [begin
+ apply_policy(Config, N, Policy)
+ end || Policy <- Policies],
+ Self ! parallel_task_done
+ end) || N <- Nodes],
+ [receive
+ parallel_task_done ->
+ ok
+ end || _ <- Nodes].
+
+%% Proper generators
+policy_gen(Nodes) ->
+ %% Stop mirroring needs to be called often to trigger rabbitmq-server#803
+ frequency([{3, undefined},
+ {1, all},
+ {1, {nodes, nodes_gen(Nodes)}},
+ {1, {exactly, choose(1, 3)}}
+ ]).
+
+nodes_gen(Nodes) ->
+ ?LET(List, non_empty(list(oneof(Nodes))),
+ sets:to_list(sets:from_list(List))).
+
+%% Checks
+verify_policy(undefined, Info) ->
+ %% If the queue is not mirrored, it returns ''
+ '' == proplists:get_value(slave_pids, Info);
+verify_policy(all, Info) ->
+ 2 == length(proplists:get_value(slave_pids, Info));
+verify_policy({exactly, 1}, Info) ->
+ %% If the queue is mirrored, it returns a list
+ [] == proplists:get_value(slave_pids, Info);
+verify_policy({exactly, N}, Info) ->
+ (N - 1) == length(proplists:get_value(slave_pids, Info));
+verify_policy({nodes, Nodes}, Info) ->
+ Master = node(proplists:get_value(pid, Info)),
+ Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)],
+ lists:sort(Nodes) == lists:sort([Master | Slaves]).
+
+%% Policies
+apply_policy(Config, N, undefined) ->
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY);
+apply_policy(Config, N, all) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {nodes, Nodes}) ->
+ NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes],
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"nodes">>, NNodes},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {exactly, Exactly}) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"exactly">>, Exactly},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]).