diff options
author | Diana Corbacho <dparracorbacho@piotal.io> | 2019-09-10 15:15:48 +0100 |
---|---|---|
committer | Diana Corbacho <dparracorbacho@piotal.io> | 2019-09-10 15:15:48 +0100 |
commit | e5dcb76f412ec71cc5dfe89aa9938793ea0806de (patch) | |
tree | 5013401533b590b466d2c1dd32d234cebdc7de03 | |
parent | 97a08d2ec95d0536d2f256746f3fa6d54450272e (diff) | |
download | rabbitmq-server-git-transfer-ha-master.tar.gz |
Rebalance mirrored queuestransfer-ha-master
[#166480197]
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index f63f417b93..17ea429337 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -28,6 +28,8 @@ -export([sync_queue/1, cancel_sync_queue/1]). +-export([rebalance/2]). + %% for testing only -export([module/1]). @@ -535,6 +537,141 @@ update_mirrors(Q) when ?is_amqqueue(Q) -> maybe_auto_sync(Q), ok. +-spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}. +rebalance(VhostSpec, QueueSpec) -> + Running = rabbit_mnesia:cluster_nodes(running), + NumRunning = length(Running), + ToRebalance = [Q || Q <- rabbit_amqqueue:list(), + ?amqqueue_is_classic(Q), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + NumToRebalance = length(ToRebalance), + ByNode = group_by_node(ToRebalance), + rabbit_log:warning("######## BYNODE ~p", [ByNode]), + Rem = case (NumToRebalance rem NumRunning) of + 0 -> 0; + _ -> 1 + end, + MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, + iterative_rebalance(ByNode, MaxQueuesDesired). + +get_resource_name(#resource{name = Name}) -> + Name. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + +iterative_rebalance(ByNode, MaxQueuesDesired) -> + case maybe_migrate(ByNode, MaxQueuesDesired) of + {ok, Summary} -> + rabbit_log:warning("Nothing to do, all balanced"), + {ok, Summary}; + {migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired); + {not_migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired) + end. + +maybe_migrate(ByNode, MaxQueuesDesired) -> + maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). + +maybe_migrate(ByNode, _, []) -> + {ok, maps:fold(fun(K, V, Acc) -> + [{K, length(V)} | Acc] + end, [], ByNode)}; +maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> + case maps:get(N, ByNode, []) of + [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> + Name = amqqueue:get_name(Q), + OtherNodes = rabbit_mnesia:cluster_nodes(running) -- [N], + case OtherNodes of + [] -> + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + _ -> + [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode), + rabbit_log:warning("Migrating mirrored queue ~p from node ~p with ~p queues to node ~p with ~p queues", + [Name, N, length(All), Destination, Length]), + case transfer_master(Q, Destination) of + migrated -> + rabbit_log:warning("Mirrored queue ~p migrated to ~p", [Name, Destination]), + {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; + not_migrated -> + rabbit_log:warning("Error migrating mirrored queue ~p", [Name]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} + end + end; + [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> + rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " + "Do nothing", [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes); + All -> + rabbit_log:warning("Node ~p only contains ~p queues, do nothing", + [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes) + end. + +update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> + maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). + +update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> + maps:update_with(NewNode, + fun(L) -> L ++ [{Entries, Q, true}] end, + [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). + +sort_by_number_of_queues(Nodes, ByNode) -> + lists:keysort(1, + lists:map(fun(Node) -> + {num_queues(Node, ByNode), Node} + end, Nodes)). + +num_queues(Node, ByNode) -> + length(maps:get(Node, ByNode, [])). + +group_by_node(Queues) -> + ByNode = lists:foldl(fun(Q, Acc) -> + Messages = total_messages(Q), + maps:update_with(amqqueue:qnode(Q), + fun(L) -> [{Messages, Q, false} | L] end, + [{Messages, Q, false}], Acc) + end, #{}, Queues), + maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). + +total_messages(Q) -> + [{messages, M}] = rabbit_amqqueue:info(Q, [messages]), + M. + +transfer_master(Q, Destination) -> + QName = amqqueue:get_name(Q), + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + OldNodes = [OldMNode | OldSNodes], + add_mirrors(QName, [Destination] -- OldNodes, async), + drop_mirrors(QName, OldNodes -- [Destination]), + {Result, NewQ} = wait_for_new_master(QName, Destination), + update_mirrors(NewQ), + Result. + +wait_for_new_master(QName, Destination) -> + wait_for_new_master(QName, Destination, 100). + +wait_for_new_master(QName, _, 0) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + {not_migrated, Q}; +wait_for_new_master(QName, Destination, N) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + case amqqueue:get_pid(Q) of + none -> + timer:sleep(100), + wait_for_new_master(QName, Destination, N - 1); + Pid -> + case node(Pid) of + Destination -> + {migrated, Q}; + _ -> + timer:sleep(100), + wait_for_new_master(QName, Destination, N - 1) + end + end. + %% The arrival of a newly synced slave may cause the master to die if %% the policy does not want the master but it has been kept alive %% because there were no synced slaves. |