summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-06-17 18:12:15 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-07-14 03:50:30 +0300
commitac461e9e8ecb504c42405892fea583f037645cce (patch)
tree205ba50bb6dfd67e1e213f1c308bfd66a8c4bccd
parent4b76ccad878d32740777ee69c74e2dfd0f994831 (diff)
downloadrabbitmq-server-git-ac461e9e8ecb504c42405892fea583f037645cce.tar.gz
Maintenance mode: transfer leadership (primary replicas) of local CMQs
Part of rabbitmq/rabbitmq#2321
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_maintenance.erl68
2 files changed, 68 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 532f8894d0..9818e689de 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -23,7 +23,6 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
- list_local_mirrored_classic_names/0,
list_local_names_down/0, list_with_possible_retry/1]).
-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
@@ -38,6 +37,7 @@
-export([has_synchronised_mirrors_online/1]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
+ list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
list_local_mirrored_classic_without_synchronised_mirrors/0,
list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]).
@@ -1072,6 +1072,14 @@ list_local_followers() ->
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(),
lists:member(node(), get_quorum_nodes(Q))].
+-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()].
+list_local_mirrored_classic_queues() ->
+ [ Q || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q)].
+
-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
list_local_mirrored_classic_names() ->
[ amqqueue:get_name(Q) || Q <- list(),
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 8f0a7e15f9..c5e35e30dd 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -30,7 +30,10 @@
suspend_all_client_listeners/0,
resume_all_client_listeners/0,
close_all_client_connections/0,
- primary_replica_transfer_candidate_nodes/0]).
+ primary_replica_transfer_candidate_nodes/0,
+ random_primary_replica_transfer_candidate_node/1,
+ transfer_leadership_of_quorum_queues/1,
+ transfer_leadership_of_classic_mirrored_queues/1]).
-define(TABLE, rabbit_node_maintenance_states).
-define(DEFAULT_STATUS, regular).
@@ -50,11 +53,12 @@ drain() ->
rabbit_log:alert("Closed ~b local client connections", [NConnections]),
TransferCandidates = primary_replica_transfer_candidate_nodes(),
+ ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ","),
rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s",
- [length(TransferCandidates), string:join(TransferCandidates, ",")]),
- %% TODO: transfer leadership of all queues hosted on this node
+ [length(TransferCandidates), ReadableCandidates]),
+ transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
%% TODO: shut all Ra instances on this node down
-
+ transfer_leadership_of_quorum_queues(TransferCandidates),
rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"),
ok.
@@ -122,11 +126,6 @@ filter_out_drained_nodes_local_read(Nodes) ->
-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()].
filter_out_drained_nodes_consistent_read(Nodes) ->
lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes).
-
--spec primary_replica_transfer_candidate_nodes() -> [node()].
-primary_replica_transfer_candidate_nodes() ->
- filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()).
-
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
%% Pauses all listeners on the current node except for
@@ -157,6 +156,57 @@ close_all_client_connections() ->
rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
{ok, length(Pids)}.
+-spec transfer_leadership_of_quorum_queues([node()]) -> ok.
+transfer_leadership_of_quorum_queues([]) ->
+ rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership_of_quorum_queues(TransferCandidates) ->
+ TransferCandidates.
+
+-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok.
+ transfer_leadership_of_classic_mirrored_queues([]) ->
+ rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
+ Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(),
+ ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ", "),
+ rabbit_log:info("Will transfer leadership of ~b classic mirrored queues to these nodes: ~s",
+ [length(Queues), ReadableCandidates]),
+
+ [begin
+ Name = amqqueue:get_name(Q),
+ case random_primary_replica_transfer_candidate_node(TransferCandidates) of
+ {ok, Pick} ->
+ rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
+ [rabbit_misc:rs(Name), Pick]),
+ case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
+ {migrated, _} ->
+ rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s",
+ [rabbit_misc:rs(Name), Pick]);
+ Other ->
+ rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p",
+ [rabbit_misc:rs(Name), Pick, Other])
+ end;
+ undefined ->
+ rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?",
+ [Name])
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").
+
+-spec primary_replica_transfer_candidate_nodes() -> [node()].
+primary_replica_transfer_candidate_nodes() ->
+ filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
+
+-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
+random_primary_replica_transfer_candidate_node([]) ->
+ undefined;
+random_primary_replica_transfer_candidate_node(Candidates) ->
+ Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
+ Candidate = lists:nth(Nth + 1, Candidates),
+ {ok, Candidate}.
+
+
%%
%% Implementation
%%