summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <dparracorbacho@piotal.io>2019-09-10 15:15:48 +0100
committerDiana Corbacho <dparracorbacho@piotal.io>2019-09-10 15:15:48 +0100
commite5dcb76f412ec71cc5dfe89aa9938793ea0806de (patch)
tree5013401533b590b466d2c1dd32d234cebdc7de03
parent97a08d2ec95d0536d2f256746f3fa6d54450272e (diff)
downloadrabbitmq-server-git-transfer-ha-master.tar.gz
Rebalance mirrored queuestransfer-ha-master
[#166480197]
-rw-r--r--src/rabbit_mirror_queue_misc.erl137
1 files changed, 137 insertions, 0 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index f63f417b93..17ea429337 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -28,6 +28,8 @@
-export([sync_queue/1, cancel_sync_queue/1]).
+-export([rebalance/2]).
+
%% for testing only
-export([module/1]).
@@ -535,6 +537,141 @@ update_mirrors(Q) when ?is_amqqueue(Q) ->
maybe_auto_sync(Q),
ok.
+-spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}.
+rebalance(VhostSpec, QueueSpec) ->
+ Running = rabbit_mnesia:cluster_nodes(running),
+ NumRunning = length(Running),
+ ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
+ ?amqqueue_is_classic(Q),
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
+ NumToRebalance = length(ToRebalance),
+ ByNode = group_by_node(ToRebalance),
+ rabbit_log:warning("######## BYNODE ~p", [ByNode]),
+ Rem = case (NumToRebalance rem NumRunning) of
+ 0 -> 0;
+ _ -> 1
+ end,
+ MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
+ iterative_rebalance(ByNode, MaxQueuesDesired).
+
+get_resource_name(#resource{name = Name}) ->
+ Name.
+
+is_match(Subj, E) ->
+ nomatch /= re:run(Subj, E).
+
+iterative_rebalance(ByNode, MaxQueuesDesired) ->
+ case maybe_migrate(ByNode, MaxQueuesDesired) of
+ {ok, Summary} ->
+ rabbit_log:warning("Nothing to do, all balanced"),
+ {ok, Summary};
+ {migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired);
+ {not_migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired)
+ end.
+
+maybe_migrate(ByNode, MaxQueuesDesired) ->
+ maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
+
+maybe_migrate(ByNode, _, []) ->
+ {ok, maps:fold(fun(K, V, Acc) ->
+ [{K, length(V)} | Acc]
+ end, [], ByNode)};
+maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
+ case maps:get(N, ByNode, []) of
+ [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
+ Name = amqqueue:get_name(Q),
+ OtherNodes = rabbit_mnesia:cluster_nodes(running) -- [N],
+ case OtherNodes of
+ [] ->
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
+ _ ->
+ [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
+ rabbit_log:warning("Migrating mirrored queue ~p from node ~p with ~p queues to node ~p with ~p queues",
+ [Name, N, length(All), Destination, Length]),
+ case transfer_master(Q, Destination) of
+ migrated ->
+ rabbit_log:warning("Mirrored queue ~p migrated to ~p", [Name, Destination]),
+ {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)};
+ not_migrated ->
+ rabbit_log:warning("Error migrating mirrored queue ~p", [Name]),
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
+ end
+ end;
+ [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
+ rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
+ "Do nothing", [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
+ All ->
+ rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
+ [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
+ end.
+
+update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
+
+update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update_with(NewNode,
+ fun(L) -> L ++ [{Entries, Q, true}] end,
+ [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
+
+sort_by_number_of_queues(Nodes, ByNode) ->
+ lists:keysort(1,
+ lists:map(fun(Node) ->
+ {num_queues(Node, ByNode), Node}
+ end, Nodes)).
+
+num_queues(Node, ByNode) ->
+ length(maps:get(Node, ByNode, [])).
+
+group_by_node(Queues) ->
+ ByNode = lists:foldl(fun(Q, Acc) ->
+ Messages = total_messages(Q),
+ maps:update_with(amqqueue:qnode(Q),
+ fun(L) -> [{Messages, Q, false} | L] end,
+ [{Messages, Q, false}], Acc)
+ end, #{}, Queues),
+ maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
+
+total_messages(Q) ->
+ [{messages, M}] = rabbit_amqqueue:info(Q, [messages]),
+ M.
+
+transfer_master(Q, Destination) ->
+ QName = amqqueue:get_name(Q),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ OldNodes = [OldMNode | OldSNodes],
+ add_mirrors(QName, [Destination] -- OldNodes, async),
+ drop_mirrors(QName, OldNodes -- [Destination]),
+ {Result, NewQ} = wait_for_new_master(QName, Destination),
+ update_mirrors(NewQ),
+ Result.
+
+wait_for_new_master(QName, Destination) ->
+ wait_for_new_master(QName, Destination, 100).
+
+wait_for_new_master(QName, _, 0) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ {not_migrated, Q};
+wait_for_new_master(QName, Destination, N) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ case amqqueue:get_pid(Q) of
+ none ->
+ timer:sleep(100),
+ wait_for_new_master(QName, Destination, N - 1);
+ Pid ->
+ case node(Pid) of
+ Destination ->
+ {migrated, Q};
+ _ ->
+ timer:sleep(100),
+ wait_for_new_master(QName, Destination, N - 1)
+ end
+ end.
+
%% The arrival of a newly synced slave may cause the master to die if
%% the policy does not want the master but it has been kept alive
%% because there were no synced slaves.