summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-06-24 21:57:04 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-07-14 03:50:31 +0300
commit017d41c7dec7685af7fa9c9648d5133b1431b1b6 (patch)
treeb730e7d9b6f50843d041dd351f4bac13baa43f62 /src
parente4af962641547d215e77c9dcb6e004ff4eb1d942 (diff)
downloadrabbitmq-server-git-017d41c7dec7685af7fa9c9648d5133b1431b1b6.tar.gz
Node entering maintenance will shut down its local quorum queue replicas
and restart them upon revival. Part of rabbitmq/rabbitmq-server#2321
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_maintenance.erl59
2 files changed, 56 insertions, 10 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 60a0654340..cad08c9e19 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -910,10 +910,13 @@ do_run_postlaunch_phase() ->
Error -> throw(Error)
end
end, Plugins),
-
+
+ rabbit_log_prelaunch:info("Resetting node maintenance status"),
+ %% successful boot resets node maintenance state
+ rabbit_maintenance:unmark_as_being_drained(),
rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]),
rabbit_boot_state:set(ready),
-
+
ok = rabbit_lager:broker_is_started(),
ok = log_broker_started(
rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index a863ce519b..817d65479d 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -53,11 +53,10 @@ drain() ->
rabbit_log:alert("Closed ~b local client connections", [NConnections]),
TransferCandidates = primary_replica_transfer_candidate_nodes(),
- ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ","),
+ ReadableCandidates = readable_candidate_list(TransferCandidates),
rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s",
[length(TransferCandidates), ReadableCandidates]),
transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
- %% TODO: shut all Ra instances on this node down
transfer_leadership_of_quorum_queues(TransferCandidates),
rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"),
@@ -65,6 +64,8 @@ drain() ->
revive() ->
rabbit_log:alert("This node is being revived from maintenance (drain) mode"),
+ revive_local_quorum_queue_replicas(),
+ rabbit_log:alert("Resumed all listeners and will accept client connections again"),
resume_all_client_listeners(),
rabbit_log:alert("Resumed all listeners and will accept client connections again"),
unmark_as_being_drained(),
@@ -163,8 +164,27 @@ close_all_client_connections() ->
transfer_leadership_of_quorum_queues([]) ->
rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
"(online, not under maintenance) nodes to transfer to!");
-transfer_leadership_of_quorum_queues(TransferCandidates) ->
- TransferCandidates.
+transfer_leadership_of_quorum_queues(_TransferCandidates) ->
+ %% we only transfer leadership for QQs that have local leaders
+ Queues = rabbit_amqqueue:list_local_leaders(),
+ rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% we trigger an election and exclude this node from the list of candidates
+ %% by simply shutting its local QQ replica (Ra server)
+ RaLeader = amqqueue:get_pid(Q),
+ rabbit_log:debug("Will stop Ra server ~p", [RaLeader]),
+ case ra:stop_server(RaLeader) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok.
transfer_leadership_of_classic_mirrored_queues([]) ->
@@ -172,8 +192,8 @@ transfer_leadership_of_quorum_queues(TransferCandidates) ->
"(online, not under maintenance) nodes to transfer to!");
transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(),
- ReadableCandidates = string:join(lists:map(fun rabbit_data_coercion:to_list/1, TransferCandidates), ", "),
- rabbit_log:info("Will transfer leadership of ~b classic mirrored queues to these nodes: ~s",
+ ReadableCandidates = readable_candidate_list(TransferCandidates),
+ rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
[length(Queues), ReadableCandidates]),
[begin
@@ -197,7 +217,7 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
end || Q <- Queues],
rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").
--spec primary_replica_transfer_candidate_nodes() -> [node()].
+ -spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
@@ -208,7 +228,27 @@ random_primary_replica_transfer_candidate_node(Candidates) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
Candidate = lists:nth(Nth + 1, Candidates),
{ok, Candidate}.
-
+
+revive_local_quorum_queue_replicas() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ [begin
+ Name = amqqueue:get_name(Q),
+ RaLeader = amqqueue:get_pid(Q),
+ rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% start local QQ replica (Ra server) of this queue
+ RaLeader = amqqueue:get_pid(Q),
+ rabbit_log:debug("Will start Ra server ~p", [RaLeader]),
+ case ra:restart_server(RaLeader) of
+ ok ->
+ rabbit_log:debug("Successfully restarted Ra server ~p", [RaLeader]);
+ {error, {already_started, _Pid}} ->
+ rabbit_log:debug("Ra server ~p is already running", [RaLeader]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Restart of local quorum queue replicas is complete").
%%
%% Implementation
@@ -226,3 +266,6 @@ ok_or_first_error(ok, Acc) ->
Acc;
ok_or_first_error({error, _} = Err, _Acc) ->
Err.
+
+readable_candidate_list(Nodes) ->
+ string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", ").