diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-06-17 18:12:15 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-07-14 03:50:30 +0300 |
| commit | ac461e9e8ecb504c42405892fea583f037645cce (patch) | |
| tree | 205ba50bb6dfd67e1e213f1c308bfd66a8c4bccd | |
| parent | 4b76ccad878d32740777ee69c74e2dfd0f994831 (diff) | |
| download | rabbitmq-server-git-ac461e9e8ecb504c42405892fea583f037645cce.tar.gz | |
Maintenance mode: transfer leadership (primary replicas) of local CMQs
Part of rabbitmq/rabbitmq#2321
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 68 |
2 files changed, 68 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 532f8894d0..9818e689de 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -23,7 +23,6 @@ emit_info_local/4, emit_info_down/4]). -export([count/0]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, - list_local_mirrored_classic_names/0, list_local_names_down/0, list_with_possible_retry/1]). -export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). -export([force_event_refresh/1, notify_policy_changed/1]). @@ -38,6 +37,7 @@ -export([has_synchronised_mirrors_online/1]). -export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). -export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, list_local_mirrored_classic_without_synchronised_mirrors/0, list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]). @@ -1072,6 +1072,14 @@ list_local_followers() -> amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. +-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()]. +list_local_mirrored_classic_queues() -> + [ Q || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + -spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. list_local_mirrored_classic_names() -> [ amqqueue:get_name(Q) || Q <- list(), diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 8f0a7e15f9..c5e35e30dd 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -30,7 +30,10 @@ suspend_all_client_listeners/0, resume_all_client_listeners/0, close_all_client_connections/0, - primary_replica_transfer_candidate_nodes/0]). + primary_replica_transfer_candidate_nodes/0, + random_primary_replica_transfer_candidate_node/1, + transfer_leadership_of_quorum_queues/1, + transfer_leadership_of_classic_mirrored_queues/1]). -define(TABLE, rabbit_node_maintenance_states). -define(DEFAULT_STATUS, regular). @@ -50,11 +53,12 @@ drain() -> rabbit_log:alert("Closed ~b local client connections", [NConnections]), TransferCandidates = primary_replica_transfer_candidate_nodes(), + ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ","), rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s", - [length(TransferCandidates), string:join(TransferCandidates, ",")]), - %% TODO: transfer leadership of all queues hosted on this node + [length(TransferCandidates), ReadableCandidates]), + transfer_leadership_of_classic_mirrored_queues(TransferCandidates), %% TODO: shut all Ra instances on this node down - + transfer_leadership_of_quorum_queues(TransferCandidates), rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"), ok. @@ -122,11 +126,6 @@ filter_out_drained_nodes_local_read(Nodes) -> -spec filter_out_drained_nodes_consistent_read([node()]) -> [node()]. filter_out_drained_nodes_consistent_read(Nodes) -> lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes). - --spec primary_replica_transfer_candidate_nodes() -> [node()]. -primary_replica_transfer_candidate_nodes() -> - filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()). - -spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). %% Pauses all listeners on the current node except for @@ -157,6 +156,57 @@ close_all_client_connections() -> rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"), {ok, length(Pids)}. +-spec transfer_leadership_of_quorum_queues([node()]) -> ok. +transfer_leadership_of_quorum_queues([]) -> + rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership_of_quorum_queues(TransferCandidates) -> + TransferCandidates. + +-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok. + transfer_leadership_of_classic_mirrored_queues([]) -> + rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> + Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(), + ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ", "), + rabbit_log:info("Will transfer leadership of ~b classic mirrored queues to these nodes: ~s", + [length(Queues), ReadableCandidates]), + + [begin + Name = amqqueue:get_name(Q), + case random_primary_replica_transfer_candidate_node(TransferCandidates) of + {ok, Pick} -> + rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s", + [rabbit_misc:rs(Name), Pick]), + case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of + {migrated, _} -> + rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s", + [rabbit_misc:rs(Name), Pick]); + Other -> + rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p", + [rabbit_misc:rs(Name), Pick, Other]) + end; + undefined -> + rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?", + [Name]) + end + end || Q <- Queues], + rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). + +-spec primary_replica_transfer_candidate_nodes() -> [node()]. +primary_replica_transfer_candidate_nodes() -> + filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). + +-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined. +random_primary_replica_transfer_candidate_node([]) -> + undefined; +random_primary_replica_transfer_candidate_node(Candidates) -> + Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)), + Candidate = lists:nth(Nth + 1, Candidates), + {ok, Candidate}. + + %% %% Implementation %% |
