diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-06-16 15:03:25 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-07-14 03:50:30 +0300 |
| commit | 4b76ccad878d32740777ee69c74e2dfd0f994831 (patch) | |
| tree | 35614328775ab766b8f1df05b6694ce358752769 | |
| parent | a30eb679c3e166156e85ebc8efa2fd5223b475c5 (diff) | |
| download | rabbitmq-server-git-4b76ccad878d32740777ee69c74e2dfd0f994831.tar.gz | |
Introduce rabbit_maintenance:{drain,revive}/0
Part of #2321.
| -rw-r--r-- | src/rabbit_maintenance.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 2 |
3 files changed, 65 insertions, 13 deletions
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 3361173e11..8f0a7e15f9 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -19,14 +19,18 @@ -include("rabbit.hrl"). -export([ + drain/0, + revive/0, mark_as_being_drained/0, unmark_as_being_drained/0, is_being_drained_local_read/1, is_being_drained_consistent_read/1, - filter_out_drained_nodes/1, + filter_out_drained_nodes_local_read/1, + filter_out_drained_nodes_consistent_read/1, suspend_all_client_listeners/0, resume_all_client_listeners/0, - close_all_client_connections/0]). + close_all_client_connections/0, + primary_replica_transfer_candidate_nodes/0]). -define(TABLE, rabbit_node_maintenance_states). -define(DEFAULT_STATUS, regular). @@ -36,6 +40,31 @@ %% API %% +drain() -> + rabbit_log:alert("This node is being put into maintenance (drain) mode"), + mark_as_being_drained(), + rabbit_log:info("Marked this node as undergoing maintenance"), + suspend_all_client_listeners(), + rabbit_log:alert("Suspended all listeners and will no longer accept client connections"), + {ok, NConnections} = close_all_client_connections(), + rabbit_log:alert("Closed ~b local client connections", [NConnections]), + + TransferCandidates = primary_replica_transfer_candidate_nodes(), + rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s", + [length(TransferCandidates), string:join(TransferCandidates, ",")]), + %% TODO: transfer leadership of all queues hosted on this node + %% TODO: shut all Ra instances on this node down + + rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"), + + ok. + +revive() -> + resume_all_client_listeners(), + unmark_as_being_drained(), + + ok. + -spec mark_as_being_drained() -> boolean(). mark_as_being_drained() -> set_maintenance_state_status(?DRAINING_STATUS). @@ -86,10 +115,19 @@ is_being_drained_consistent_read(Node) -> {aborted, _Reason} -> false end. - -spec filter_out_drained_nodes([node()]) -> [node()]. -filter_out_drained_nodes(Nodes) -> + -spec filter_out_drained_nodes_local_read([node()]) -> [node()]. +filter_out_drained_nodes_local_read(Nodes) -> lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes). +-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()]. +filter_out_drained_nodes_consistent_read(Nodes) -> + lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes). + +-spec primary_replica_transfer_candidate_nodes() -> [node()]. +primary_replica_transfer_candidate_nodes() -> + filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()). + + -spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). %% Pauses all listeners on the current node except for %% Erlang distribution (clustering and CLI tools). @@ -103,7 +141,6 @@ suspend_all_client_listeners() -> lists:foldl(fun ok_or_first_error/2, ok, Results). -spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()). - %% Resumes all listeners on the current node except for %% Erlang distribution (clustering and CLI tools). %% A resumed listener will accept new client connections. @@ -114,8 +151,11 @@ resume_all_client_listeners() -> Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners), lists:foldl(fun ok_or_first_error/2, ok, Results). + -spec close_all_client_connections() -> {'ok', non_neg_integer()}. close_all_client_connections() -> - ok. + Pids = rabbit_networking:local_connections(), + rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"), + {ok, length(Pids)}. %% %% Implementation diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f499e4bcfc..e492954a7e 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -27,8 +27,8 @@ connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, - close_connection/2, force_connection_event_refresh/1, - handshake/2, tcp_host/1, ranch_ref/2]). + close_connection/2, close_connections/2, + force_connection_event_refresh/1, handshake/2, tcp_host/1, ranch_ref/2]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/9, @@ -38,8 +38,11 @@ -deprecated([{force_connection_event_refresh, 1, eventually}]). -%% Internal --export([connections_local/0]). +-export([ + local_connections/0, + %% prefer local_connections/0 + connections_local/0 +]). -include("rabbit.hrl"). @@ -348,8 +351,13 @@ connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). --spec connections_local() -> [rabbit_types:connection()]. +-spec local_connections() -> [rabbit_types:connection()]. +%% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node. +local_connections() -> + connections_local(). +-spec connections_local() -> [rabbit_types:connection()]. +%% @deprecated Prefer {@link local_connections} connections_local() -> pg_local:get_members(rabbit_connections). -spec connection_info_keys() -> rabbit_types:info_keys(). @@ -399,8 +407,12 @@ close_connection(Pid, Explanation) -> ok end. --spec force_connection_event_refresh(reference()) -> 'ok'. +-spec close_connections([pid()], string()) -> 'ok'. +close_connections(Pids, Explanation) -> + [close_connection(Pid, Explanation) || Pid <- Pids], + ok. +-spec force_connection_event_refresh(reference()) -> 'ok'. force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 1c3ca7c3a6..7232fc6703 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -31,7 +31,7 @@ description() -> queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q), - Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0), + Cluster = rabbit_maintenance:filter_out_drained_nodes_local_read(Cluster0), case Cluster of [] -> undefined; |
