diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-09-13 15:55:47 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-09-13 15:55:47 +0100 |
| commit | 2baea46d1521108fac8743bac375423aa7916d0c (patch) | |
| tree | 446adea18d9301b501b91e16b1945aa18fe3e45b | |
| parent | 27e3b3d3db47c45997dd4c7efe6eae4f71ec4bb5 (diff) | |
| parent | 432fb82ae9de41846fa53ffcb0ca9c41bc16056f (diff) | |
| download | rabbitmq-server-git-2baea46d1521108fac8743bac375423aa7916d0c.tar.gz | |
Merge pull request #2106 from rabbitmq/rebalance-all-queues
Rebalance all queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 116 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 26 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 117 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 42 |
5 files changed, 341 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f740aa7c06..e4c262ad79 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -53,6 +53,8 @@ -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). +-export([rebalance/3]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -484,6 +486,120 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> + {ok, [{node(), pos_integer()}]}. +rebalance(Type, VhostSpec, QueueSpec) -> + Running = rabbit_mnesia:cluster_nodes(running), + NumRunning = length(Running), + ToRebalance = [Q || Q <- rabbit_amqqueue:list(), + filter_per_type(Type, Q), + is_replicated(Q), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + NumToRebalance = length(ToRebalance), + ByNode = group_by_node(ToRebalance), + Rem = case (NumToRebalance rem NumRunning) of + 0 -> 0; + _ -> 1 + end, + MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, + iterative_rebalance(ByNode, MaxQueuesDesired). + +filter_per_type(all, _) -> + true; +filter_per_type(quorum, Q) -> + ?amqqueue_is_quorum(Q); +filter_per_type(classic, Q) -> + ?amqqueue_is_classic(Q). + +rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue; +rebalance_module(Q) when ?amqqueue_is_classic(Q) -> + rabbit_mirror_queue_misc. + +get_resource_name(#resource{name = Name}) -> + Name. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + +iterative_rebalance(ByNode, MaxQueuesDesired) -> + case maybe_migrate(ByNode, MaxQueuesDesired) of + {ok, Summary} -> + rabbit_log:warning("Nothing to do, all balanced"), + {ok, Summary}; + {migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired); + {not_migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired) + end. + +maybe_migrate(ByNode, MaxQueuesDesired) -> + maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). + +maybe_migrate(ByNode, _, []) -> + {ok, maps:fold(fun(K, V, Acc) -> + [{K, length(V)} | Acc] + end, [], ByNode)}; +maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> + case maps:get(N, ByNode, []) of + [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> + Name = amqqueue:get_name(Q), + Module = rebalance_module(Q), + OtherNodes = Module:get_replicas(Q) -- [N], + case OtherNodes of + [] -> + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + _ -> + [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode), + rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues", + [Name, N, length(All), Destination, Length]), + case Module:transfer_leadership(Q, Destination) of + {migrated, NewNode} -> + rabbit_log:warning("Queue ~p migrated to ~p", [Name, NewNode]), + {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; + {not_migrated, Reason} -> + rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} + end + end; + [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> + rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " + "Do nothing", [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes); + All -> + rabbit_log:warning("Node ~p only contains ~p queues, do nothing", + [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes) + end. + +update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> + maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). + +update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> + maps:update_with(NewNode, + fun(L) -> L ++ [{Entries, Q, true}] end, + [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). + +sort_by_number_of_queues(Nodes, ByNode) -> + lists:keysort(1, + lists:map(fun(Node) -> + {num_queues(Node, ByNode), Node} + end, Nodes)). + +num_queues(Node, ByNode) -> + length(maps:get(Node, ByNode, [])). + +group_by_node(Queues) -> + ByNode = lists:foldl(fun(Q, Acc) -> + Module = rebalance_module(Q), + Length = Module:queue_length(Q), + maps:update_with(amqqueue:qnode(Q), + fun(L) -> [{Length, Q, false} | L] end, + [{Length, Q, false}], Acc) + end, #{}, Queues), + maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). + -spec with(name(), qfun(A), fun((not_found_or_absent()) -> rabbit_types:channel_exit())) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index f63f417b93..c1ae96b02f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -28,6 +28,8 @@ -export([sync_queue/1, cancel_sync_queue/1]). +-export([transfer_leadership/2, queue_length/1, get_replicas/1]). + %% for testing only -export([module/1]). @@ -535,6 +537,46 @@ update_mirrors(Q) when ?is_amqqueue(Q) -> maybe_auto_sync(Q), ok. +queue_length(Q) -> + [{messages, M}] = rabbit_amqqueue:info(Q, [messages]), + M. + +get_replicas(Q) -> + {MNode, SNodes} = suggested_queue_nodes(Q), + [MNode] ++ SNodes. + +transfer_leadership(Q, Destination) -> + QName = amqqueue:get_name(Q), + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + OldNodes = [OldMNode | OldSNodes], + add_mirrors(QName, [Destination] -- OldNodes, async), + drop_mirrors(QName, OldNodes -- [Destination]), + {Result, NewQ} = wait_for_new_master(QName, Destination), + update_mirrors(NewQ), + Result. + +wait_for_new_master(QName, Destination) -> + wait_for_new_master(QName, Destination, 100). + +wait_for_new_master(QName, _, 0) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + {{not_migrated, ""}, Q}; +wait_for_new_master(QName, Destination, N) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + case amqqueue:get_pid(Q) of + none -> + timer:sleep(100), + wait_for_new_master(QName, Destination, N - 1); + Pid -> + case node(Pid) of + Destination -> + {{migrated, Destination}, Q}; + _ -> + timer:sleep(100), + wait_for_new_master(QName, Destination, N - 1) + end + end. + %% The arrival of a newly synced slave may cause the master to die if %% the policy does not want the master but it has been kept alive %% because there were no synced slaves. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b4118b96ac..4f8c129291 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -39,6 +39,7 @@ -export([cleanup_data_dir/0]). -export([shrink_all/1, grow/4]). +-export([transfer_leadership/2, get_replicas/1, queue_length/1]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -847,6 +848,31 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. +transfer_leadership(Q, Destination) -> + {RaName, _} = Pid = amqqueue:get_pid(Q), + case ra:transfer_leadership(Pid, {RaName, Destination}) of + ok -> + {_, _, {_, NewNode}} = ra:members(Pid), + {migrated, NewNode}; + already_leader -> + {not_migrated, already_leader}; + {error, Reason} -> + {not_migrated, Reason}; + {timeout, _} -> + %% TODO should we retry once? + {not_migrated, timeout} + end. + +queue_length(Q) -> + Name = amqqueue:get_name(Q), + case ets:lookup(ra_metrics, Name) of + [] -> 0; + [{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx + end. + +get_replicas(Q) -> + get_nodes(Q). + get_resource_name(#resource{name = Name}) -> Name. diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 01fbc55e22..5eccfd2ad5 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -71,7 +71,10 @@ groups() -> rapid_change, nodes_policy_should_pick_master_from_its_params, promote_slave_after_standalone_restart, - queue_survive_adding_dead_vhost_mirror + queue_survive_adding_dead_vhost_mirror, + rebalance_all, + rebalance_exactly, + rebalance_nodes % FIXME: Re-enable those tests when the know issues are % fixed. % failing_random_policies, @@ -539,6 +542,118 @@ promote_slave_after_standalone_restart(Config) -> ok. +rebalance_all(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, <<"all">>), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q3, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q4, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q5, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + %% Check that Q1 and Q2 haven't moved + assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), + + ok. + +rebalance_exactly(Config) -> + [A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, {<<"exactly">>, 2}), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + %% Check that Q1 and Q2 haven't moved + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + + ok. + +rebalance_nodes(Config) -> + [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"q.*">>, + {<<"nodes">>, [rabbit_misc:atom_to_binary(A), rabbit_misc:atom_to_binary(B)]}), + timer:sleep(1000), + + rabbit_ct_client_helpers:publish(ACh, Q1, 5), + rabbit_ct_client_helpers:publish(ACh, Q2, 3), + + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))), + + {ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + + %% Check that we have at most 3 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 3 end, Summary)), + %% Check that Q1 and Q2 haven't moved + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + + ok. + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 7ef38895eb..819f8f8761 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -71,7 +71,8 @@ groups() -> metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, consume_in_minority, - shrink_all + shrink_all, + rebalance ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -667,7 +668,46 @@ shrink_all(Config) -> {_, {error, 1, last_node}}], Result2), ok. +rebalance(Config) -> + [Server0, _, _] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(1000), + + {ok, _, {_, Leader1}} = ra:members({ra_name(Q1), Server0}), + {ok, _, {_, Leader2}} = ra:members({ra_name(Q2), Server0}), + rabbit_ct_client_helpers:publish(Ch, Q1, 3), + rabbit_ct_client_helpers:publish(Ch, Q2, 2), + + ?assertEqual({'queue.declare_ok', Q3, 0, 0}, + declare(Ch, Q3, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q4, 0, 0}, + declare(Ch, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', Q5, 0, 0}, + declare(Ch, Q5, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(500), + {ok, Summary} = rpc:call(Server0, rabbit_amqqueue, rebalance, [quorum, ".*", ".*"]), + %% Q1 and Q2 should not have moved leader, as these are the queues with more + %% log entries and we allow up to two queues per node (3 nodes, 5 queues) + ?assertMatch({ok, _, {_, Leader1}}, ra:members({ra_name(Q1), Server0})), + ?assertMatch({ok, _, {_, Leader2}}, ra:members({ra_name(Q2), Server0})), + + %% Check that we have at most 2 queues per node + ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)), + ok. subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
