diff options
| author | Diana Corbacho <dparracorbacho@piotal.io> | 2019-09-12 11:20:33 +0100 |
|---|---|---|
| committer | Diana Corbacho <dparracorbacho@piotal.io> | 2019-09-13 11:24:00 +0100 |
| commit | b00e82dc2cdd1043fefb3590d6bdd839fde0a523 (patch) | |
| tree | 867a95a3f8a2ab61b25c64d9eb8ee9f3cff9abaf /src | |
| parent | c80d8de2309afdf92f03fb3d3b6db86ad34c9aeb (diff) | |
| download | rabbitmq-server-git-b00e82dc2cdd1043fefb3590d6bdd839fde0a523.tar.gz | |
Refactor rebalance of mirrored and quorum queues
[#166480197]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 116 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 112 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 119 |
3 files changed, 142 insertions, 205 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f740aa7c06..e4c262ad79 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -53,6 +53,8 @@ -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). +-export([rebalance/3]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -484,6 +486,120 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> + {ok, [{node(), pos_integer()}]}. +rebalance(Type, VhostSpec, QueueSpec) -> + Running = rabbit_mnesia:cluster_nodes(running), + NumRunning = length(Running), + ToRebalance = [Q || Q <- rabbit_amqqueue:list(), + filter_per_type(Type, Q), + is_replicated(Q), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + NumToRebalance = length(ToRebalance), + ByNode = group_by_node(ToRebalance), + Rem = case (NumToRebalance rem NumRunning) of + 0 -> 0; + _ -> 1 + end, + MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, + iterative_rebalance(ByNode, MaxQueuesDesired). + +filter_per_type(all, _) -> + true; +filter_per_type(quorum, Q) -> + ?amqqueue_is_quorum(Q); +filter_per_type(classic, Q) -> + ?amqqueue_is_classic(Q). + +rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue; +rebalance_module(Q) when ?amqqueue_is_classic(Q) -> + rabbit_mirror_queue_misc. + +get_resource_name(#resource{name = Name}) -> + Name. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + +iterative_rebalance(ByNode, MaxQueuesDesired) -> + case maybe_migrate(ByNode, MaxQueuesDesired) of + {ok, Summary} -> + rabbit_log:warning("Nothing to do, all balanced"), + {ok, Summary}; + {migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired); + {not_migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired) + end. + +maybe_migrate(ByNode, MaxQueuesDesired) -> + maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). + +maybe_migrate(ByNode, _, []) -> + {ok, maps:fold(fun(K, V, Acc) -> + [{K, length(V)} | Acc] + end, [], ByNode)}; +maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> + case maps:get(N, ByNode, []) of + [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> + Name = amqqueue:get_name(Q), + Module = rebalance_module(Q), + OtherNodes = Module:get_replicas(Q) -- [N], + case OtherNodes of + [] -> + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + _ -> + [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode), + rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues", + [Name, N, length(All), Destination, Length]), + case Module:transfer_leadership(Q, Destination) of + {migrated, NewNode} -> + rabbit_log:warning("Queue ~p migrated to ~p", [Name, NewNode]), + {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; + {not_migrated, Reason} -> + rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} + end + end; + [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> + rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " + "Do nothing", [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes); + All -> + rabbit_log:warning("Node ~p only contains ~p queues, do nothing", + [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes) + end. + +update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> + maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). + +update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> + maps:update_with(NewNode, + fun(L) -> L ++ [{Entries, Q, true}] end, + [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). + +sort_by_number_of_queues(Nodes, ByNode) -> + lists:keysort(1, + lists:map(fun(Node) -> + {num_queues(Node, ByNode), Node} + end, Nodes)). + +num_queues(Node, ByNode) -> + length(maps:get(Node, ByNode, [])). + +group_by_node(Queues) -> + ByNode = lists:foldl(fun(Q, Acc) -> + Module = rebalance_module(Q), + Length = Module:queue_length(Q), + maps:update_with(amqqueue:qnode(Q), + fun(L) -> [{Length, Q, false} | L] end, + [{Length, Q, false}], Acc) + end, #{}, Queues), + maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). + -spec with(name(), qfun(A), fun((not_found_or_absent()) -> rabbit_types:channel_exit())) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 17ea429337..85c8eab10d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -28,7 +28,7 @@ -export([sync_queue/1, cancel_sync_queue/1]). --export([rebalance/2]). +-export([transfer_leadership/2, queue_length/1, get_replicas/1]). %% for testing only -export([module/1]). @@ -537,110 +537,14 @@ update_mirrors(Q) when ?is_amqqueue(Q) -> maybe_auto_sync(Q), ok. --spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}. -rebalance(VhostSpec, QueueSpec) -> - Running = rabbit_mnesia:cluster_nodes(running), - NumRunning = length(Running), - ToRebalance = [Q || Q <- rabbit_amqqueue:list(), - ?amqqueue_is_classic(Q), - is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], - NumToRebalance = length(ToRebalance), - ByNode = group_by_node(ToRebalance), - rabbit_log:warning("######## BYNODE ~p", [ByNode]), - Rem = case (NumToRebalance rem NumRunning) of - 0 -> 0; - _ -> 1 - end, - MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, - iterative_rebalance(ByNode, MaxQueuesDesired). - -get_resource_name(#resource{name = Name}) -> - Name. - -is_match(Subj, E) -> - nomatch /= re:run(Subj, E). - -iterative_rebalance(ByNode, MaxQueuesDesired) -> - case maybe_migrate(ByNode, MaxQueuesDesired) of - {ok, Summary} -> - rabbit_log:warning("Nothing to do, all balanced"), - {ok, Summary}; - {migrated, Other} -> - iterative_rebalance(Other, MaxQueuesDesired); - {not_migrated, Other} -> - iterative_rebalance(Other, MaxQueuesDesired) - end. - -maybe_migrate(ByNode, MaxQueuesDesired) -> - maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). - -maybe_migrate(ByNode, _, []) -> - {ok, maps:fold(fun(K, V, Acc) -> - [{K, length(V)} | Acc] - end, [], ByNode)}; -maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> - case maps:get(N, ByNode, []) of - [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> - Name = amqqueue:get_name(Q), - OtherNodes = rabbit_mnesia:cluster_nodes(running) -- [N], - case OtherNodes of - [] -> - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; - _ -> - [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode), - rabbit_log:warning("Migrating mirrored queue ~p from node ~p with ~p queues to node ~p with ~p queues", - [Name, N, length(All), Destination, Length]), - case transfer_master(Q, Destination) of - migrated -> - rabbit_log:warning("Mirrored queue ~p migrated to ~p", [Name, Destination]), - {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; - not_migrated -> - rabbit_log:warning("Error migrating mirrored queue ~p", [Name]), - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} - end - end; - [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> - rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " - "Do nothing", [N, length(All)]), - maybe_migrate(ByNode, MaxQueuesDesired, Nodes); - All -> - rabbit_log:warning("Node ~p only contains ~p queues, do nothing", - [N, length(All)]), - maybe_migrate(ByNode, MaxQueuesDesired, Nodes) - end. - -update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> - maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). - -update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> - maps:update_with(NewNode, - fun(L) -> L ++ [{Entries, Q, true}] end, - [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). - -sort_by_number_of_queues(Nodes, ByNode) -> - lists:keysort(1, - lists:map(fun(Node) -> - {num_queues(Node, ByNode), Node} - end, Nodes)). - -num_queues(Node, ByNode) -> - length(maps:get(Node, ByNode, [])). - -group_by_node(Queues) -> - ByNode = lists:foldl(fun(Q, Acc) -> - Messages = total_messages(Q), - maps:update_with(amqqueue:qnode(Q), - fun(L) -> [{Messages, Q, false} | L] end, - [{Messages, Q, false}], Acc) - end, #{}, Queues), - maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). - -total_messages(Q) -> +queue_length(Q) -> [{messages, M}] = rabbit_amqqueue:info(Q, [messages]), M. -transfer_master(Q, Destination) -> +get_replicas(Q) -> + rabbit_mnesia:cluster_nodes(running). + +transfer_leadership(Q, Destination) -> QName = amqqueue:get_name(Q), {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], @@ -655,7 +559,7 @@ wait_for_new_master(QName, Destination) -> wait_for_new_master(QName, _, 0) -> {ok, Q} = rabbit_amqqueue:lookup(QName), - {not_migrated, Q}; + {{not_migrated, ""}, Q}; wait_for_new_master(QName, Destination, N) -> {ok, Q} = rabbit_amqqueue:lookup(QName), case amqqueue:get_pid(Q) of @@ -665,7 +569,7 @@ wait_for_new_master(QName, Destination, N) -> Pid -> case node(Pid) of Destination -> - {migrated, Q}; + {{migrated, Destination}, Q}; _ -> timer:sleep(100), wait_for_new_master(QName, Destination, N - 1) diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4fe9c4cbf3..4f8c129291 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -39,7 +39,7 @@ -export([cleanup_data_dir/0]). -export([shrink_all/1, grow/4]). --export([rebalance/2]). +-export([transfer_leadership/2, get_replicas/1, queue_length/1]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -848,114 +848,31 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. --spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}. -rebalance(VhostSpec, QueueSpec) -> - Running = rabbit_mnesia:cluster_nodes(running), - NumRunning = length(Running), - ToRebalance = [Q || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == ?MODULE, - is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], - NumToRebalance = length(ToRebalance), - ByNode = group_by_node(ToRebalance), - Rem = case (NumToRebalance rem NumRunning) of - 0 -> 0; - _ -> 1 - end, - MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, - iterative_rebalance(ByNode, MaxQueuesDesired). - -iterative_rebalance(ByNode, MaxQueuesDesired) -> - case maybe_migrate(ByNode, MaxQueuesDesired) of - {ok, Summary} -> - rabbit_log:warning("Nothing to do, all balanced"), - {ok, Summary}; - {migrated, Other} -> - iterative_rebalance(Other, MaxQueuesDesired); - {not_migrated, Other} -> - iterative_rebalance(Other, MaxQueuesDesired) - end. - -maybe_migrate(ByNode, MaxQueuesDesired) -> - maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). - -maybe_migrate(ByNode, _, []) -> - {ok, maps:fold(fun(K, V, Acc) -> - [{K, length(V)} | Acc] - end, [], ByNode)}; -maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> - case maps:get(N, ByNode, []) of - [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> - {RaName, _} = Pid = amqqueue:get_pid(Q), - Name = amqqueue:get_name(Q), - Members = get_nodes(Q) -- [N], - case Members of - [] -> - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; - _ -> - [{Length, Destination} | _] = sort_by_number_of_queues(Members, ByNode), - rabbit_log:warning("Migrating quorum queue ~p from node ~p with ~p queues to node ~p with ~p queues", - [Name, N, length(All), Destination, Length]), - case ra:transfer_leadership(Pid, {RaName, Destination}) of - ok -> - {_, _, {_, NewNode}} = ra:members(Pid), - rabbit_log:warning("Quorum queue ~p migrated to ~p", [Name, NewNode]), - {migrated, update_migrated_queue(NewNode, N, Queue, Queues, ByNode)}; - already_leader -> - rabbit_log:warning("Quorum queue ~p in ~p is already a leader", - [Name, Destination]), - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; - {error, Reason} -> - rabbit_log:warning("Error migrating quorum queue ~p: ~p", [Name, Reason]), - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; - {timeout, _} -> - %% TODO should we retry once? - rabbit_log:warning("Timeout migrating quorum queue ~p: ~p", [Name]), - {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} - end - end; - [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> - rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " - "Do nothing", [N, length(All)]), - maybe_migrate(ByNode, MaxQueuesDesired, Nodes); - All -> - rabbit_log:warning("Node ~p only contains ~p queues, do nothing", - [N, length(All)]), - maybe_migrate(ByNode, MaxQueuesDesired, Nodes) +transfer_leadership(Q, Destination) -> + {RaName, _} = Pid = amqqueue:get_pid(Q), + case ra:transfer_leadership(Pid, {RaName, Destination}) of + ok -> + {_, _, {_, NewNode}} = ra:members(Pid), + {migrated, NewNode}; + already_leader -> + {not_migrated, already_leader}; + {error, Reason} -> + {not_migrated, Reason}; + {timeout, _} -> + %% TODO should we retry once? + {not_migrated, timeout} end. -update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> - maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). - -update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> - maps:update_with(NewNode, - fun(L) -> L ++ [{Entries, Q, true}] end, - [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). - -sort_by_number_of_queues(Nodes, ByNode) -> - lists:keysort(1, - lists:map(fun(Node) -> - {num_queues(Node, ByNode), Node} - end, Nodes)). - -num_queues(Node, ByNode) -> - length(maps:get(Node, ByNode, [])). - -group_by_node(Queues) -> - ByNode = lists:foldl(fun(Q, Acc) -> - maps:update_with(amqqueue:qnode(Q), - fun(L) -> [{log_entries(Q), Q, false} | L] end, - [{log_entries(Q), Q, false}], Acc) - end, #{}, Queues), - maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). - -log_entries(Q) -> +queue_length(Q) -> Name = amqqueue:get_name(Q), case ets:lookup(ra_metrics, Name) of [] -> 0; [{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx end. +get_replicas(Q) -> + get_nodes(Q). + get_resource_name(#resource{name = Name}) -> Name. |
