diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 36 |
1 files changed, 31 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 864bce83a0..f81c36829b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec get_rebalance_lock(pid()) -> + {true, {rebalance_queues, pid()}} | false. +get_rebalance_lock(Pid) when is_pid(Pid) -> + Id = {rebalance_queues, Pid}, + Nodes = [node()|nodes()], + %% Note that we're not re-trying. We want to immediately know + %% if a re-balance is taking place and stop accordingly. + case global:set_lock(Id, Nodes, 0) of + true -> + {true, Id}; + false -> + false + end. + -spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> - {ok, [{node(), pos_integer()}]}. + {ok, [{node(), pos_integer()}]} | {error, term()}. rebalance(Type, VhostSpec, QueueSpec) -> + %% We have not yet acquired the rebalance_queues global lock. + maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). + +maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> + rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", + [Type, VhostSpec, QueueSpec]), Running = rabbit_mnesia:cluster_nodes(running), NumRunning = length(Running), ToRebalance = [Q || Q <- rabbit_amqqueue:list(), @@ -527,11 +547,17 @@ rebalance(Type, VhostSpec, QueueSpec) -> NumToRebalance = length(ToRebalance), ByNode = group_by_node(ToRebalance), Rem = case (NumToRebalance rem NumRunning) of - 0 -> 0; - _ -> 1 - end, + 0 -> 0; + _ -> 1 + end, MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, - iterative_rebalance(ByNode, MaxQueuesDesired). + Result = iterative_rebalance(ByNode, MaxQueuesDesired), + global:del_lock(Id), + rabbit_log:info("Finished queue rebalance operation"), + Result; +maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> + rabbit_log:warning("Queue rebalance operation is in progress, please wait."), + {error, rebalance_in_progress}. filter_per_type(all, _) -> true; |
