summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-03-07 21:31:01 +0300
committerGitHub <noreply@github.com>2020-03-07 21:31:01 +0300
commitf05114e8651ac1aebe5841d2d71bd64744391119 (patch)
tree11745476b5ad51a104c8a7f6568fdb7242de1865 /src
parent4406d8eab7334f07d5caef9a61ca6412a54a6e4c (diff)
parent462f963cd4b6546d378d69eed2dabec7fd30251e (diff)
downloadrabbitmq-server-git-f05114e8651ac1aebe5841d2d71bd64744391119.tar.gz
Merge pull request #2268 from rabbitmq/rabbitmq-management-782
Allow only one rebalance operation to happen at a time
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl36
1 files changed, 31 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 864bce83a0..f81c36829b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
+-spec get_rebalance_lock(pid()) ->
+ {true, {rebalance_queues, pid()}} | false.
+get_rebalance_lock(Pid) when is_pid(Pid) ->
+ Id = {rebalance_queues, Pid},
+ Nodes = [node()|nodes()],
+ %% Note that we're not re-trying. We want to immediately know
+ %% if a re-balance is taking place and stop accordingly.
+ case global:set_lock(Id, Nodes, 0) of
+ true ->
+ {true, Id};
+ false ->
+ false
+ end.
+
-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
- {ok, [{node(), pos_integer()}]}.
+ {ok, [{node(), pos_integer()}]} | {error, term()}.
rebalance(Type, VhostSpec, QueueSpec) ->
+ %% We have not yet acquired the rebalance_queues global lock.
+ maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
+
+maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
+ rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
+ [Type, VhostSpec, QueueSpec]),
Running = rabbit_mnesia:cluster_nodes(running),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
@@ -527,11 +547,17 @@ rebalance(Type, VhostSpec, QueueSpec) ->
NumToRebalance = length(ToRebalance),
ByNode = group_by_node(ToRebalance),
Rem = case (NumToRebalance rem NumRunning) of
- 0 -> 0;
- _ -> 1
- end,
+ 0 -> 0;
+ _ -> 1
+ end,
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
- iterative_rebalance(ByNode, MaxQueuesDesired).
+ Result = iterative_rebalance(ByNode, MaxQueuesDesired),
+ global:del_lock(Id),
+ rabbit_log:info("Finished queue rebalance operation"),
+ Result;
+maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
+ rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
+ {error, rebalance_in_progress}.
filter_per_type(all, _) ->
true;