diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-09-13 15:55:47 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-09-13 15:55:47 +0100 |
| commit | 2baea46d1521108fac8743bac375423aa7916d0c (patch) | |
| tree | 446adea18d9301b501b91e16b1945aa18fe3e45b /test | |
| parent | 27e3b3d3db47c45997dd4c7efe6eae4f71ec4bb5 (diff) | |
| parent | 432fb82ae9de41846fa53ffcb0ca9c41bc16056f (diff) | |
| download | rabbitmq-server-git-2baea46d1521108fac8743bac375423aa7916d0c.tar.gz | |
Merge pull request #2106 from rabbitmq/rebalance-all-queues
Rebalance all queues
Diffstat (limited to 'test')
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 117 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 42 |
2 files changed, 157 insertions, 2 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 01fbc55e22..5eccfd2ad5 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -71,7 +71,10 @@ groups() -> rapid_change, nodes_policy_should_pick_master_from_its_params, promote_slave_after_standalone_restart, - queue_survive_adding_dead_vhost_mirror + queue_survive_adding_dead_vhost_mirror, + rebalance_all, + rebalance_exactly, + rebalance_nodes % FIXME: Re-enable those tests when the know issues are % fixed. % failing_random_policies, @@ -539,6 +542,118 @@ promote_slave_after_standalone_restart(Config) -> ok. +rebalance_all(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, <<"all">>), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q3, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q4, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q5, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + %% Check that Q1 and Q2 haven't moved + assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + + ok. + +rebalance_exactly(Config) -> + [A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, {<<"exactly">>, 2}), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + %% Check that Q1 and Q2 haven't moved + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + + ok. + +rebalance_nodes(Config) -> + [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"q.*">>, + {<<"nodes">>, [rabbit_misc:atom_to_binary(A), rabbit_misc:atom_to_binary(B)]}), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 3 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 3 end, Summary)), + %% Check that Q1 and Q2 haven't moved + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + + ok. + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 7ef38895eb..819f8f8761 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -71,7 +71,8 @@ groups() -> metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, consume_in_minority, - shrink_all + shrink_all, + rebalance ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -667,7 +668,46 @@ shrink_all(Config) -> {_, {error, 1, last_node}}], Result2), ok. +rebalance(Config) -> + [Server0, _, _] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(1000), + + {ok, _, {_, Leader1}} = ra:members({ra_name(Q1), Server0}), + {ok, _, {_, Leader2}} = ra:members({ra_name(Q2), Server0}), + rabbit_ct_client_helpers:publish(Ch, Q1, 3), + rabbit_ct_client_helpers:publish(Ch, Q2, 2), + + ?assertEqual({'queue.declare_ok', Q3, 0, 0}, + declare(Ch, Q3, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q4, 0, 0}, + declare(Ch, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q5, 0, 0}, + declare(Ch, Q5, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(500), + {ok, Summary} = rpc:call(Server0, rabbit_amqqueue, rebalance, [quorum, ".*", ".*"]), + %% Q1 and Q2 should not have moved leader, as these are the queues with more + %% log entries and we allow up to two queues per node (3 nodes, 5 queues) + ?assertMatch({ok, _, {_, Leader1}}, ra:members({ra_name(Q1), Server0})), + ?assertMatch({ok, _, {_, Leader2}}, ra:members({ra_name(Q2), Server0})), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + ok. subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
