diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-09-03 16:14:30 +0100 |
|---|---|---|
| committer | Diana Corbacho <dparracorbacho@piotal.io> | 2019-09-13 11:23:36 +0100 |
| commit | 97e237592efde035c0951c241708a26d7d238673 (patch) | |
| tree | 8e5458038491ad28c159dc875e853b5d48fde898 /src | |
| parent | 27e3b3d3db47c45997dd4c7efe6eae4f71ec4bb5 (diff) | |
| download | rabbitmq-server-git-97e237592efde035c0951c241708a26d7d238673.tar.gz | |
Rebalance quorum queues
[#166551605]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b4118b96ac..4fe9c4cbf3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -39,6 +39,7 @@ -export([cleanup_data_dir/0]). -export([shrink_all/1, grow/4]). +-export([rebalance/2]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -847,6 +848,114 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. +-spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}. +rebalance(VhostSpec, QueueSpec) -> + Running = rabbit_mnesia:cluster_nodes(running), + NumRunning = length(Running), + ToRebalance = [Q || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + NumToRebalance = length(ToRebalance), + ByNode = group_by_node(ToRebalance), + Rem = case (NumToRebalance rem NumRunning) of + 0 -> 0; + _ -> 1 + end, + MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, + iterative_rebalance(ByNode, MaxQueuesDesired). + +iterative_rebalance(ByNode, MaxQueuesDesired) -> + case maybe_migrate(ByNode, MaxQueuesDesired) of + {ok, Summary} -> + rabbit_log:warning("Nothing to do, all balanced"), + {ok, Summary}; + {migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired); + {not_migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired) + end. + +maybe_migrate(ByNode, MaxQueuesDesired) -> + maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). + +maybe_migrate(ByNode, _, []) -> + {ok, maps:fold(fun(K, V, Acc) -> + [{K, length(V)} | Acc] + end, [], ByNode)}; +maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> + case maps:get(N, ByNode, []) of + [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> + {RaName, _} = Pid = amqqueue:get_pid(Q), + Name = amqqueue:get_name(Q), + Members = get_nodes(Q) -- [N], + case Members of + [] -> + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + _ -> + [{Length, Destination} | _] = sort_by_number_of_queues(Members, ByNode), + rabbit_log:warning("Migrating quorum queue ~p from node ~p with ~p queues to node ~p with ~p queues", + [Name, N, length(All), Destination, Length]), + case ra:transfer_leadership(Pid, {RaName, Destination}) of + ok -> + {_, _, {_, NewNode}} = ra:members(Pid), + rabbit_log:warning("Quorum queue ~p migrated to ~p", [Name, NewNode]), + {migrated, update_migrated_queue(NewNode, N, Queue, Queues, ByNode)}; + already_leader -> + rabbit_log:warning("Quorum queue ~p in ~p is already a leader", + [Name, Destination]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + {error, Reason} -> + rabbit_log:warning("Error migrating quorum queue ~p: ~p", [Name, Reason]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + {timeout, _} -> + %% TODO should we retry once? + rabbit_log:warning("Timeout migrating quorum queue ~p: ~p", [Name]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} + end + end; + [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> + rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " + "Do nothing", [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes); + All -> + rabbit_log:warning("Node ~p only contains ~p queues, do nothing", + [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes) + end. + +update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> + maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). + +update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> + maps:update_with(NewNode, + fun(L) -> L ++ [{Entries, Q, true}] end, + [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). + +sort_by_number_of_queues(Nodes, ByNode) -> + lists:keysort(1, + lists:map(fun(Node) -> + {num_queues(Node, ByNode), Node} + end, Nodes)). + +num_queues(Node, ByNode) -> + length(maps:get(Node, ByNode, [])). + +group_by_node(Queues) -> + ByNode = lists:foldl(fun(Q, Acc) -> + maps:update_with(amqqueue:qnode(Q), + fun(L) -> [{log_entries(Q), Q, false} | L] end, + [{log_entries(Q), Q, false}], Acc) + end, #{}, Queues), + maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). + +log_entries(Q) -> + Name = amqqueue:get_name(Q), + case ets:lookup(ra_metrics, Name) of + [] -> 0; + [{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx + end. + get_resource_name(#resource{name = Name}) -> Name. |
