summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-06-16 15:03:25 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-07-14 03:50:30 +0300
commit4b76ccad878d32740777ee69c74e2dfd0f994831 (patch)
tree35614328775ab766b8f1df05b6694ce358752769
parenta30eb679c3e166156e85ebc8efa2fd5223b475c5 (diff)
downloadrabbitmq-server-git-4b76ccad878d32740777ee69c74e2dfd0f994831.tar.gz
Introduce rabbit_maintenance:{drain,revive}/0
Part of #2321.
-rw-r--r--src/rabbit_maintenance.erl52
-rw-r--r--src/rabbit_networking.erl24
-rw-r--r--src/rabbit_queue_location_random.erl2
3 files changed, 65 insertions, 13 deletions
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 3361173e11..8f0a7e15f9 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -19,14 +19,18 @@
-include("rabbit.hrl").
-export([
+ drain/0,
+ revive/0,
mark_as_being_drained/0,
unmark_as_being_drained/0,
is_being_drained_local_read/1,
is_being_drained_consistent_read/1,
- filter_out_drained_nodes/1,
+ filter_out_drained_nodes_local_read/1,
+ filter_out_drained_nodes_consistent_read/1,
suspend_all_client_listeners/0,
resume_all_client_listeners/0,
- close_all_client_connections/0]).
+ close_all_client_connections/0,
+ primary_replica_transfer_candidate_nodes/0]).
-define(TABLE, rabbit_node_maintenance_states).
-define(DEFAULT_STATUS, regular).
@@ -36,6 +40,31 @@
%% API
%%
+drain() ->
+ rabbit_log:alert("This node is being put into maintenance (drain) mode"),
+ mark_as_being_drained(),
+ rabbit_log:info("Marked this node as undergoing maintenance"),
+ suspend_all_client_listeners(),
+ rabbit_log:alert("Suspended all listeners and will no longer accept client connections"),
+ {ok, NConnections} = close_all_client_connections(),
+ rabbit_log:alert("Closed ~b local client connections", [NConnections]),
+
+ TransferCandidates = primary_replica_transfer_candidate_nodes(),
+ rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s",
+ [length(TransferCandidates), string:join(TransferCandidates, ",")]),
+ %% TODO: transfer leadership of all queues hosted on this node
+ %% TODO: shut all Ra instances on this node down
+
+ rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"),
+
+ ok.
+
+revive() ->
+ resume_all_client_listeners(),
+ unmark_as_being_drained(),
+
+ ok.
+
-spec mark_as_being_drained() -> boolean().
mark_as_being_drained() ->
set_maintenance_state_status(?DRAINING_STATUS).
@@ -86,10 +115,19 @@ is_being_drained_consistent_read(Node) ->
{aborted, _Reason} -> false
end.
- -spec filter_out_drained_nodes([node()]) -> [node()].
-filter_out_drained_nodes(Nodes) ->
+ -spec filter_out_drained_nodes_local_read([node()]) -> [node()].
+filter_out_drained_nodes_local_read(Nodes) ->
lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes).
+-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()].
+filter_out_drained_nodes_consistent_read(Nodes) ->
+ lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes).
+
+-spec primary_replica_transfer_candidate_nodes() -> [node()].
+primary_replica_transfer_candidate_nodes() ->
+ filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()).
+
+
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
%% Pauses all listeners on the current node except for
%% Erlang distribution (clustering and CLI tools).
@@ -103,7 +141,6 @@ suspend_all_client_listeners() ->
lists:foldl(fun ok_or_first_error/2, ok, Results).
-spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()).
-
%% Resumes all listeners on the current node except for
%% Erlang distribution (clustering and CLI tools).
%% A resumed listener will accept new client connections.
@@ -114,8 +151,11 @@ resume_all_client_listeners() ->
Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners),
lists:foldl(fun ok_or_first_error/2, ok, Results).
+ -spec close_all_client_connections() -> {'ok', non_neg_integer()}.
close_all_client_connections() ->
- ok.
+ Pids = rabbit_networking:local_connections(),
+ rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
+ {ok, length(Pids)}.
%%
%% Implementation
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f499e4bcfc..e492954a7e 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -27,8 +27,8 @@
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
- close_connection/2, force_connection_event_refresh/1,
- handshake/2, tcp_host/1, ranch_ref/2]).
+ close_connection/2, close_connections/2,
+ force_connection_event_refresh/1, handshake/2, tcp_host/1, ranch_ref/2]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -38,8 +38,11 @@
-deprecated([{force_connection_event_refresh, 1, eventually}]).
-%% Internal
--export([connections_local/0]).
+-export([
+ local_connections/0,
+ %% prefer local_connections/0
+ connections_local/0
+]).
-include("rabbit.hrl").
@@ -348,8 +351,13 @@ connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
--spec connections_local() -> [rabbit_types:connection()].
+-spec local_connections() -> [rabbit_types:connection()].
+%% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node.
+local_connections() ->
+ connections_local().
+-spec connections_local() -> [rabbit_types:connection()].
+%% @deprecated Prefer {@link local_connections}
connections_local() -> pg_local:get_members(rabbit_connections).
-spec connection_info_keys() -> rabbit_types:info_keys().
@@ -399,8 +407,12 @@ close_connection(Pid, Explanation) ->
ok
end.
--spec force_connection_event_refresh(reference()) -> 'ok'.
+-spec close_connections([pid()], string()) -> 'ok'.
+close_connections(Pids, Explanation) ->
+ [close_connection(Pid, Explanation) || Pid <- Pids],
+ ok.
+-spec force_connection_event_refresh(reference()) -> 'ok'.
force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 1c3ca7c3a6..7232fc6703 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -31,7 +31,7 @@ description() ->
queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q),
- Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0),
+ Cluster = rabbit_maintenance:filter_out_drained_nodes_local_read(Cluster0),
case Cluster of
[] ->
undefined;