diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-06-24 21:57:04 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-07-14 03:50:31 +0300 |
| commit | 017d41c7dec7685af7fa9c9648d5133b1431b1b6 (patch) | |
| tree | b730e7d9b6f50843d041dd351f4bac13baa43f62 /src | |
| parent | e4af962641547d215e77c9dcb6e004ff4eb1d942 (diff) | |
| download | rabbitmq-server-git-017d41c7dec7685af7fa9c9648d5133b1431b1b6.tar.gz | |
Node entering maintenance will shut down its local quorum queue replicas
and restart them upon revival.
Part of rabbitmq/rabbitmq-server#2321
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 59 |
2 files changed, 56 insertions, 10 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 60a0654340..cad08c9e19 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -910,10 +910,13 @@ do_run_postlaunch_phase() -> Error -> throw(Error) end end, Plugins), - + + rabbit_log_prelaunch:info("Resetting node maintenance status"), + %% successful boot resets node maintenance state + rabbit_maintenance:unmark_as_being_drained(), rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]), rabbit_boot_state:set(ready), - + ok = rabbit_lager:broker_is_started(), ok = log_broker_started( rabbit_plugins:strictly_plugins(rabbit_plugins:active())), diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index a863ce519b..817d65479d 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -53,11 +53,10 @@ drain() -> rabbit_log:alert("Closed ~b local client connections", [NConnections]), TransferCandidates = primary_replica_transfer_candidate_nodes(), - ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ","), + ReadableCandidates = readable_candidate_list(TransferCandidates), rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s", [length(TransferCandidates), ReadableCandidates]), transfer_leadership_of_classic_mirrored_queues(TransferCandidates), - %% TODO: shut all Ra instances on this node down transfer_leadership_of_quorum_queues(TransferCandidates), rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"), @@ -65,6 +64,8 @@ drain() -> revive() -> rabbit_log:alert("This node is being revived from maintenance (drain) mode"), + revive_local_quorum_queue_replicas(), + rabbit_log:alert("Resumed all listeners and will accept client connections again"), resume_all_client_listeners(), rabbit_log:alert("Resumed all listeners and will accept client connections again"), unmark_as_being_drained(), @@ -163,8 +164,27 @@ close_all_client_connections() -> transfer_leadership_of_quorum_queues([]) -> rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership_of_quorum_queues(TransferCandidates) -> - TransferCandidates. +transfer_leadership_of_quorum_queues(_TransferCandidates) -> + %% we only transfer leadership for QQs that have local leaders + Queues = rabbit_amqqueue:list_local_leaders(), + rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will stop Ra server ~p", [RaLeader]), + case ra:stop_server(RaLeader) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"). -spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok. transfer_leadership_of_classic_mirrored_queues([]) -> @@ -172,8 +192,8 @@ transfer_leadership_of_quorum_queues(TransferCandidates) -> "(online, not under maintenance) nodes to transfer to!"); transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(), - ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ", "), - rabbit_log:info("Will transfer leadership of ~b classic mirrored queues to these nodes: ~s", + ReadableCandidates = readable_candidate_list(TransferCandidates), + rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s", [length(Queues), ReadableCandidates]), [begin @@ -197,7 +217,7 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> end || Q <- Queues], rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). --spec primary_replica_transfer_candidate_nodes() -> [node()]. + -spec primary_replica_transfer_candidate_nodes() -> [node()]. primary_replica_transfer_candidate_nodes() -> filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). @@ -208,7 +228,27 @@ random_primary_replica_transfer_candidate_node(Candidates) -> Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)), Candidate = lists:nth(Nth + 1, Candidates), {ok, Candidate}. - + +revive_local_quorum_queue_replicas() -> + Queues = rabbit_amqqueue:list_local_followers(), + [begin + Name = amqqueue:get_name(Q), + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% start local QQ replica (Ra server) of this queue + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will start Ra server ~p", [RaLeader]), + case ra:restart_server(RaLeader) of + ok -> + rabbit_log:debug("Successfully restarted Ra server ~p", [RaLeader]); + {error, {already_started, _Pid}} -> + rabbit_log:debug("Ra server ~p is already running", [RaLeader]); + {error, nodedown} -> + rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Restart of local quorum queue replicas is complete"). %% %% Implementation @@ -226,3 +266,6 @@ ok_or_first_error(ok, Acc) -> Acc; ok_or_first_error({error, _} = Err, _Acc) -> Err. + +readable_candidate_list(Nodes) -> + string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", "). |
