diff options
| author | Luke Bakken <lbakken@pivotal.io> | 2020-03-05 19:52:43 +0000 |
|---|---|---|
| committer | Luke Bakken <lbakken@pivotal.io> | 2020-03-06 16:44:06 +0000 |
| commit | 93179acf580a69eb3e4fd6f6f7412a6a8d85caa6 (patch) | |
| tree | d995fc7fcaceca396a47793640638c676f3dbafd /src | |
| parent | 4406d8eab7334f07d5caef9a61ca6412a54a6e4c (diff) | |
| download | rabbitmq-server-git-93179acf580a69eb3e4fd6f6f7412a6a8d85caa6.tar.gz | |
Add test that should fail
Add code to block multiple queue rebalance operations, fix test
Allow acquiring the rebalance lock prior to calling rabbit_amqqueue:rebalance
Simplify queue rebalance code to always acquire the lock using the current process
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 864bce83a0..6a5fd98775 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec get_rebalance_lock(pid()) -> + {true, {rebalance_queues, pid()}} | false. +get_rebalance_lock(Pid) when is_pid(Pid) -> + Id = {rebalance_queues, Pid}, + Nodes = [node()|nodes()], + %% Note that we're not re-trying. We want to immediately know + %% if a re-balance is taking place and stop accordingly. + case global:set_lock(Id, Nodes, 0) of + true -> + {true, Id}; + false -> + false + end. + -spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> - {ok, [{node(), pos_integer()}]}. + {ok, [{node(), pos_integer()}]} | {error, term()}. rebalance(Type, VhostSpec, QueueSpec) -> + %% We have not yet acquired the rebalance_queues global lock. + maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). + +maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> + rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", + [Type, VhostSpec, QueueSpec]), Running = rabbit_mnesia:cluster_nodes(running), NumRunning = length(Running), ToRebalance = [Q || Q <- rabbit_amqqueue:list(), @@ -527,11 +547,16 @@ rebalance(Type, VhostSpec, QueueSpec) -> NumToRebalance = length(ToRebalance), ByNode = group_by_node(ToRebalance), Rem = case (NumToRebalance rem NumRunning) of - 0 -> 0; - _ -> 1 - end, + 0 -> 0; + _ -> 1 + end, MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, - iterative_rebalance(ByNode, MaxQueuesDesired). + Result = iterative_rebalance(ByNode, MaxQueuesDesired), + global:del_lock(Id), + Result; +maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> + rabbit_log:warning("Queue rebalance operation is in progress, please wait."), + {error, rebalance_in_progress}. filter_per_type(all, _) -> true; |
