summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLuke Bakken <lbakken@pivotal.io>2020-03-05 19:52:43 +0000
committerLuke Bakken <lbakken@pivotal.io>2020-03-06 16:44:06 +0000
commit93179acf580a69eb3e4fd6f6f7412a6a8d85caa6 (patch)
treed995fc7fcaceca396a47793640638c676f3dbafd /src
parent4406d8eab7334f07d5caef9a61ca6412a54a6e4c (diff)
downloadrabbitmq-server-git-93179acf580a69eb3e4fd6f6f7412a6a8d85caa6.tar.gz
Add test that should fail
Add code to block multiple queue rebalance operations, fix test Allow acquiring the rebalance lock prior to calling rabbit_amqqueue:rebalance Simplify queue rebalance code to always acquire the lock using the current process
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl35
1 files changed, 30 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 864bce83a0..6a5fd98775 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
+-spec get_rebalance_lock(pid()) ->
+ {true, {rebalance_queues, pid()}} | false.
+get_rebalance_lock(Pid) when is_pid(Pid) ->
+ Id = {rebalance_queues, Pid},
+ Nodes = [node()|nodes()],
+ %% Note that we're not re-trying. We want to immediately know
+ %% if a re-balance is taking place and stop accordingly.
+ case global:set_lock(Id, Nodes, 0) of
+ true ->
+ {true, Id};
+ false ->
+ false
+ end.
+
-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
- {ok, [{node(), pos_integer()}]}.
+ {ok, [{node(), pos_integer()}]} | {error, term()}.
rebalance(Type, VhostSpec, QueueSpec) ->
+ %% We have not yet acquired the rebalance_queues global lock.
+ maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
+
+maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
+ rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
+ [Type, VhostSpec, QueueSpec]),
Running = rabbit_mnesia:cluster_nodes(running),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
@@ -527,11 +547,16 @@ rebalance(Type, VhostSpec, QueueSpec) ->
NumToRebalance = length(ToRebalance),
ByNode = group_by_node(ToRebalance),
Rem = case (NumToRebalance rem NumRunning) of
- 0 -> 0;
- _ -> 1
- end,
+ 0 -> 0;
+ _ -> 1
+ end,
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
- iterative_rebalance(ByNode, MaxQueuesDesired).
+ Result = iterative_rebalance(ByNode, MaxQueuesDesired),
+ global:del_lock(Id),
+ Result;
+maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
+ rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
+ {error, rebalance_in_progress}.
filter_per_type(all, _) ->
true;