summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <dparracorbacho@piotal.io>2019-09-12 11:20:33 +0100
committerDiana Corbacho <dparracorbacho@piotal.io>2019-09-13 11:24:00 +0100
commitb00e82dc2cdd1043fefb3590d6bdd839fde0a523 (patch)
tree867a95a3f8a2ab61b25c64d9eb8ee9f3cff9abaf /src
parentc80d8de2309afdf92f03fb3d3b6db86ad34c9aeb (diff)
downloadrabbitmq-server-git-b00e82dc2cdd1043fefb3590d6bdd839fde0a523.tar.gz
Refactor rebalance of mirrored and quorum queues
[#166480197]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl116
-rw-r--r--src/rabbit_mirror_queue_misc.erl112
-rw-r--r--src/rabbit_quorum_queue.erl119
3 files changed, 142 insertions, 205 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f740aa7c06..e4c262ad79 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -53,6 +53,8 @@
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
+-export([rebalance/3]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -484,6 +486,120 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
+-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
+ {ok, [{node(), pos_integer()}]}.
+rebalance(Type, VhostSpec, QueueSpec) ->
+ Running = rabbit_mnesia:cluster_nodes(running),
+ NumRunning = length(Running),
+ ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
+ filter_per_type(Type, Q),
+ is_replicated(Q),
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
+ NumToRebalance = length(ToRebalance),
+ ByNode = group_by_node(ToRebalance),
+ Rem = case (NumToRebalance rem NumRunning) of
+ 0 -> 0;
+ _ -> 1
+ end,
+ MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
+ iterative_rebalance(ByNode, MaxQueuesDesired).
+
+filter_per_type(all, _) ->
+ true;
+filter_per_type(quorum, Q) ->
+ ?amqqueue_is_quorum(Q);
+filter_per_type(classic, Q) ->
+ ?amqqueue_is_classic(Q).
+
+rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
+ rabbit_quorum_queue;
+rebalance_module(Q) when ?amqqueue_is_classic(Q) ->
+ rabbit_mirror_queue_misc.
+
+get_resource_name(#resource{name = Name}) ->
+ Name.
+
+is_match(Subj, E) ->
+ nomatch /= re:run(Subj, E).
+
+iterative_rebalance(ByNode, MaxQueuesDesired) ->
+ case maybe_migrate(ByNode, MaxQueuesDesired) of
+ {ok, Summary} ->
+ rabbit_log:warning("Nothing to do, all balanced"),
+ {ok, Summary};
+ {migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired);
+ {not_migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired)
+ end.
+
+maybe_migrate(ByNode, MaxQueuesDesired) ->
+ maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
+
+maybe_migrate(ByNode, _, []) ->
+ {ok, maps:fold(fun(K, V, Acc) ->
+ [{K, length(V)} | Acc]
+ end, [], ByNode)};
+maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
+ case maps:get(N, ByNode, []) of
+ [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
+ Name = amqqueue:get_name(Q),
+ Module = rebalance_module(Q),
+ OtherNodes = Module:get_replicas(Q) -- [N],
+ case OtherNodes of
+ [] ->
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
+ _ ->
+ [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
+ rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues",
+ [Name, N, length(All), Destination, Length]),
+ case Module:transfer_leadership(Q, Destination) of
+ {migrated, NewNode} ->
+ rabbit_log:warning("Queue ~p migrated to ~p", [Name, NewNode]),
+ {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)};
+ {not_migrated, Reason} ->
+ rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]),
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
+ end
+ end;
+ [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
+ rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
+ "Do nothing", [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
+ All ->
+ rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
+ [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
+ end.
+
+update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
+
+update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update_with(NewNode,
+ fun(L) -> L ++ [{Entries, Q, true}] end,
+ [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
+
+sort_by_number_of_queues(Nodes, ByNode) ->
+ lists:keysort(1,
+ lists:map(fun(Node) ->
+ {num_queues(Node, ByNode), Node}
+ end, Nodes)).
+
+num_queues(Node, ByNode) ->
+ length(maps:get(Node, ByNode, [])).
+
+group_by_node(Queues) ->
+ ByNode = lists:foldl(fun(Q, Acc) ->
+ Module = rebalance_module(Q),
+ Length = Module:queue_length(Q),
+ maps:update_with(amqqueue:qnode(Q),
+ fun(L) -> [{Length, Q, false} | L] end,
+ [{Length, Q, false}], Acc)
+ end, #{}, Queues),
+ maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
+
-spec with(name(),
qfun(A),
fun((not_found_or_absent()) -> rabbit_types:channel_exit())) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 17ea429337..85c8eab10d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -28,7 +28,7 @@
-export([sync_queue/1, cancel_sync_queue/1]).
--export([rebalance/2]).
+-export([transfer_leadership/2, queue_length/1, get_replicas/1]).
%% for testing only
-export([module/1]).
@@ -537,110 +537,14 @@ update_mirrors(Q) when ?is_amqqueue(Q) ->
maybe_auto_sync(Q),
ok.
--spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}.
-rebalance(VhostSpec, QueueSpec) ->
- Running = rabbit_mnesia:cluster_nodes(running),
- NumRunning = length(Running),
- ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
- ?amqqueue_is_classic(Q),
- is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
- is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
- NumToRebalance = length(ToRebalance),
- ByNode = group_by_node(ToRebalance),
- rabbit_log:warning("######## BYNODE ~p", [ByNode]),
- Rem = case (NumToRebalance rem NumRunning) of
- 0 -> 0;
- _ -> 1
- end,
- MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
- iterative_rebalance(ByNode, MaxQueuesDesired).
-
-get_resource_name(#resource{name = Name}) ->
- Name.
-
-is_match(Subj, E) ->
- nomatch /= re:run(Subj, E).
-
-iterative_rebalance(ByNode, MaxQueuesDesired) ->
- case maybe_migrate(ByNode, MaxQueuesDesired) of
- {ok, Summary} ->
- rabbit_log:warning("Nothing to do, all balanced"),
- {ok, Summary};
- {migrated, Other} ->
- iterative_rebalance(Other, MaxQueuesDesired);
- {not_migrated, Other} ->
- iterative_rebalance(Other, MaxQueuesDesired)
- end.
-
-maybe_migrate(ByNode, MaxQueuesDesired) ->
- maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
-
-maybe_migrate(ByNode, _, []) ->
- {ok, maps:fold(fun(K, V, Acc) ->
- [{K, length(V)} | Acc]
- end, [], ByNode)};
-maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
- case maps:get(N, ByNode, []) of
- [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
- Name = amqqueue:get_name(Q),
- OtherNodes = rabbit_mnesia:cluster_nodes(running) -- [N],
- case OtherNodes of
- [] ->
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
- _ ->
- [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
- rabbit_log:warning("Migrating mirrored queue ~p from node ~p with ~p queues to node ~p with ~p queues",
- [Name, N, length(All), Destination, Length]),
- case transfer_master(Q, Destination) of
- migrated ->
- rabbit_log:warning("Mirrored queue ~p migrated to ~p", [Name, Destination]),
- {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)};
- not_migrated ->
- rabbit_log:warning("Error migrating mirrored queue ~p", [Name]),
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
- end
- end;
- [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
- rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
- "Do nothing", [N, length(All)]),
- maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
- All ->
- rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
- [N, length(All)]),
- maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
- end.
-
-update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
- maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
-
-update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
- maps:update_with(NewNode,
- fun(L) -> L ++ [{Entries, Q, true}] end,
- [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
-
-sort_by_number_of_queues(Nodes, ByNode) ->
- lists:keysort(1,
- lists:map(fun(Node) ->
- {num_queues(Node, ByNode), Node}
- end, Nodes)).
-
-num_queues(Node, ByNode) ->
- length(maps:get(Node, ByNode, [])).
-
-group_by_node(Queues) ->
- ByNode = lists:foldl(fun(Q, Acc) ->
- Messages = total_messages(Q),
- maps:update_with(amqqueue:qnode(Q),
- fun(L) -> [{Messages, Q, false} | L] end,
- [{Messages, Q, false}], Acc)
- end, #{}, Queues),
- maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
-
-total_messages(Q) ->
+queue_length(Q) ->
[{messages, M}] = rabbit_amqqueue:info(Q, [messages]),
M.
-transfer_master(Q, Destination) ->
+get_replicas(Q) ->
+ rabbit_mnesia:cluster_nodes(running).
+
+transfer_leadership(Q, Destination) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
@@ -655,7 +559,7 @@ wait_for_new_master(QName, Destination) ->
wait_for_new_master(QName, _, 0) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
- {not_migrated, Q};
+ {{not_migrated, ""}, Q};
wait_for_new_master(QName, Destination, N) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
case amqqueue:get_pid(Q) of
@@ -665,7 +569,7 @@ wait_for_new_master(QName, Destination, N) ->
Pid ->
case node(Pid) of
Destination ->
- {migrated, Q};
+ {{migrated, Destination}, Q};
_ ->
timer:sleep(100),
wait_for_new_master(QName, Destination, N - 1)
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4fe9c4cbf3..4f8c129291 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -39,7 +39,7 @@
-export([cleanup_data_dir/0]).
-export([shrink_all/1,
grow/4]).
--export([rebalance/2]).
+-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -848,114 +848,31 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
--spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}.
-rebalance(VhostSpec, QueueSpec) ->
- Running = rabbit_mnesia:cluster_nodes(running),
- NumRunning = length(Running),
- ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
- amqqueue:get_type(Q) == ?MODULE,
- is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
- is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
- NumToRebalance = length(ToRebalance),
- ByNode = group_by_node(ToRebalance),
- Rem = case (NumToRebalance rem NumRunning) of
- 0 -> 0;
- _ -> 1
- end,
- MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
- iterative_rebalance(ByNode, MaxQueuesDesired).
-
-iterative_rebalance(ByNode, MaxQueuesDesired) ->
- case maybe_migrate(ByNode, MaxQueuesDesired) of
- {ok, Summary} ->
- rabbit_log:warning("Nothing to do, all balanced"),
- {ok, Summary};
- {migrated, Other} ->
- iterative_rebalance(Other, MaxQueuesDesired);
- {not_migrated, Other} ->
- iterative_rebalance(Other, MaxQueuesDesired)
- end.
-
-maybe_migrate(ByNode, MaxQueuesDesired) ->
- maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
-
-maybe_migrate(ByNode, _, []) ->
- {ok, maps:fold(fun(K, V, Acc) ->
- [{K, length(V)} | Acc]
- end, [], ByNode)};
-maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
- case maps:get(N, ByNode, []) of
- [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
- {RaName, _} = Pid = amqqueue:get_pid(Q),
- Name = amqqueue:get_name(Q),
- Members = get_nodes(Q) -- [N],
- case Members of
- [] ->
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
- _ ->
- [{Length, Destination} | _] = sort_by_number_of_queues(Members, ByNode),
- rabbit_log:warning("Migrating quorum queue ~p from node ~p with ~p queues to node ~p with ~p queues",
- [Name, N, length(All), Destination, Length]),
- case ra:transfer_leadership(Pid, {RaName, Destination}) of
- ok ->
- {_, _, {_, NewNode}} = ra:members(Pid),
- rabbit_log:warning("Quorum queue ~p migrated to ~p", [Name, NewNode]),
- {migrated, update_migrated_queue(NewNode, N, Queue, Queues, ByNode)};
- already_leader ->
- rabbit_log:warning("Quorum queue ~p in ~p is already a leader",
- [Name, Destination]),
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
- {error, Reason} ->
- rabbit_log:warning("Error migrating quorum queue ~p: ~p", [Name, Reason]),
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
- {timeout, _} ->
- %% TODO should we retry once?
- rabbit_log:warning("Timeout migrating quorum queue ~p: ~p", [Name]),
- {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
- end
- end;
- [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
- rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
- "Do nothing", [N, length(All)]),
- maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
- All ->
- rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
- [N, length(All)]),
- maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
+transfer_leadership(Q, Destination) ->
+ {RaName, _} = Pid = amqqueue:get_pid(Q),
+ case ra:transfer_leadership(Pid, {RaName, Destination}) of
+ ok ->
+ {_, _, {_, NewNode}} = ra:members(Pid),
+ {migrated, NewNode};
+ already_leader ->
+ {not_migrated, already_leader};
+ {error, Reason} ->
+ {not_migrated, Reason};
+ {timeout, _} ->
+ %% TODO should we retry once?
+ {not_migrated, timeout}
end.
-update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
- maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
-
-update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
- maps:update_with(NewNode,
- fun(L) -> L ++ [{Entries, Q, true}] end,
- [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
-
-sort_by_number_of_queues(Nodes, ByNode) ->
- lists:keysort(1,
- lists:map(fun(Node) ->
- {num_queues(Node, ByNode), Node}
- end, Nodes)).
-
-num_queues(Node, ByNode) ->
- length(maps:get(Node, ByNode, [])).
-
-group_by_node(Queues) ->
- ByNode = lists:foldl(fun(Q, Acc) ->
- maps:update_with(amqqueue:qnode(Q),
- fun(L) -> [{log_entries(Q), Q, false} | L] end,
- [{log_entries(Q), Q, false}], Acc)
- end, #{}, Queues),
- maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
-
-log_entries(Q) ->
+queue_length(Q) ->
Name = amqqueue:get_name(Q),
case ets:lookup(ra_metrics, Name) of
[] -> 0;
[{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx
end.
+get_replicas(Q) ->
+ get_nodes(Q).
+
get_resource_name(#resource{name = Name}) ->
Name.