diff options
| author | D Corbacho <diana@rabbitmq.com> | 2020-07-14 15:49:41 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-14 15:49:41 +0100 |
| commit | 56ab7a16822a1f00a0ab452ddc2513425100b6da (patch) | |
| tree | 1c778ecdb94cbfa094fb1dbbb37889588c58fafd | |
| parent | 9c9301917b84f6ad6b3b844ff76523c7b3bd1903 (diff) | |
| parent | ac714634bd300d306c1b460addc5bf3c978e91a1 (diff) | |
| download | rabbitmq-server-git-56ab7a16822a1f00a0ab452ddc2513425100b6da.tar.gz | |
Merge pull request #2349 from rabbitmq/rabbitmq-server-2321
Node drain (maintenance mode) operations
| -rw-r--r-- | apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl | 6 | ||||
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_core_ff.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 330 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 104 | ||||
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 4 | ||||
| -rw-r--r-- | src/tcp_listener_sup.erl | 24 | ||||
| -rw-r--r-- | test/maintenance_mode_SUITE.erl | 227 | ||||
| -rw-r--r-- | test/queue_master_location_SUITE.erl | 91 |
16 files changed, 887 insertions, 84 deletions
diff --git a/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl b/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl index 2a2eb2b7fd..f04f252784 100644 --- a/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl +++ b/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl @@ -19,7 +19,7 @@ log_error(Error) -> format_error({error, {duplicate_node_name, NodeName, NodeHost}}) -> rabbit_misc:format( - "ERROR: node with name ~p already running on ~p", + "ERROR: node with name ~p is already running on host ~p", [NodeName, NodeHost]); format_error({error, {epmd_error, NodeHost, EpmdReason}}) -> rabbit_misc:format( @@ -30,11 +30,11 @@ format_error({error, {invalid_dist_port_range, DistTcpPort}}) -> "Invalid Erlang distribution TCP port: ~b", [DistTcpPort]); format_error({error, {dist_port_already_used, Port, not_erlang, Host}}) -> rabbit_misc:format( - "ERROR: distribution port ~b in use on ~s " + "ERROR: distribution port ~b in use on host ~s " "(by non-Erlang process?)", [Port, Host]); format_error({error, {dist_port_already_used, Port, Name, Host}}) -> rabbit_misc:format( - "ERROR: distribution port ~b in use by ~s@~s", [Port, Name, Host]); + "ERROR: distribution port ~b in use by another node: ~s@~s", [Port, Name, Host]); format_error({error, {erlang_dist_running_with_unexpected_nodename, Unexpected, Node}}) -> rabbit_misc:format( diff --git a/src/rabbit.erl b/src/rabbit.erl index 60a0654340..8298dfe79d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -654,6 +654,7 @@ status() -> {erlang_version, erlang:system_info(system_version)}, {memory, rabbit_vm:memory()}, {alarms, alarms()}, + {is_under_maintenance, rabbit_maintenance:is_being_drained_local_read(node())}, {listeners, listeners()}, {vm_memory_calculation_strategy, vm_memory_monitor:get_memory_calculation_strategy()}], S2 = rabbit_misc:filter_exit_map( @@ -911,6 +912,9 @@ do_run_postlaunch_phase() -> end end, Plugins), + rabbit_log_prelaunch:info("Resetting node maintenance status"), + %% successful boot resets node maintenance state + rabbit_maintenance:unmark_as_being_drained(), rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]), rabbit_boot_state:set(ready), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8855c8bbf7..9818e689de 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -23,7 +23,6 @@ emit_info_local/4, emit_info_down/4]). -export([count/0]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, - list_local_mirrored_classic_names/0, list_local_names_down/0, list_with_possible_retry/1]). -export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). -export([force_event_refresh/1, notify_policy_changed/1]). @@ -38,6 +37,7 @@ -export([has_synchronised_mirrors_online/1]). -export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). -export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, list_local_mirrored_classic_without_synchronised_mirrors/0, list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]). @@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) -> _ -> case rabbit_queue_master_location_misc:get_location(Q) of {ok, Node0} -> Node0; + undefined -> Node; {error, _} -> Node end end, @@ -1071,6 +1072,14 @@ list_local_followers() -> amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. +-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()]. +list_local_mirrored_classic_queues() -> + [ Q || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + -spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. list_local_mirrored_classic_names() -> [ amqqueue:get_name(Q) || Q <- list(), diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index e4c01339a9..f97170f6ac 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -9,7 +9,8 @@ -export([quorum_queue_migration/3, implicit_default_bindings_migration/3, - virtual_host_metadata_migration/3]). + virtual_host_metadata_migration/3, + maintenance_mode_status_migration/3]). -rabbit_feature_flag( {quorum_queue, @@ -34,6 +35,14 @@ migration_fun => {?MODULE, virtual_host_metadata_migration} }}). +-rabbit_feature_flag( + {maintenance_mode_status, + #{ + desc => "Maintenance mode status", + stability => stable, + migration_fun => {?MODULE, maintenance_mode_status_migration} + }}). + %% ------------------------------------------------------------------- %% Quorum queues. %% ------------------------------------------------------------------- @@ -112,3 +121,20 @@ virtual_host_metadata_migration(_FeatureName, _FeatureProps, enable) -> end; virtual_host_metadata_migration(_FeatureName, _FeatureProps, is_enabled) -> mnesia:table_info(rabbit_vhost, attributes) =:= vhost:fields(vhost_v2). + + +%% +%% Maintenance mode +%% + +maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) -> + TableName = rabbit_maintenance:status_table_name(), + rabbit_log:info("Creating table ~s for feature flag `~s`", [TableName, FeatureName]), + try + _ = rabbit_table:create(TableName, rabbit_maintenance:status_table_definition()), + _ = rabbit_table:ensure_table_copy(TableName, node()) + catch throw:Reason -> + rabbit_log:error("Failed to create maintenance status table: ~p", [Reason]) + end; +maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) -> + rabbit_table:exists(rabbit_maintenance:status_table_name()). diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl new file mode 100644 index 0000000000..7c8b39ad84 --- /dev/null +++ b/src/rabbit_maintenance.erl @@ -0,0 +1,330 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_maintenance). + +-include("rabbit.hrl"). + +-export([ + is_enabled/0, + drain/0, + revive/0, + mark_as_being_drained/0, + unmark_as_being_drained/0, + is_being_drained_local_read/1, + is_being_drained_consistent_read/1, + status_local_read/1, + status_consistent_read/1, + filter_out_drained_nodes_local_read/1, + filter_out_drained_nodes_consistent_read/1, + suspend_all_client_listeners/0, + resume_all_client_listeners/0, + close_all_client_connections/0, + primary_replica_transfer_candidate_nodes/0, + random_primary_replica_transfer_candidate_node/1, + transfer_leadership_of_quorum_queues/1, + transfer_leadership_of_classic_mirrored_queues/1, + status_table_name/0, + status_table_definition/0 +]). + +-define(TABLE, rabbit_node_maintenance_states). +-define(FEATURE_FLAG, maintenance_mode_status). +-define(DEFAULT_STATUS, regular). +-define(DRAINING_STATUS, draining). + +-type maintenance_status() :: ?DEFAULT_STATUS | ?DRAINING_STATUS. + +-export_type([ + maintenance_status/0 +]). + +%% +%% API +%% + +-spec status_table_name() -> mnesia:table(). +status_table_name() -> + ?TABLE. + +-spec status_table_definition() -> list(). +status_table_definition() -> + maps:to_list(#{ + record_name => node_maintenance_state, + attributes => record_info(fields, node_maintenance_state) + }). + +-spec is_enabled() -> boolean(). +is_enabled() -> + rabbit_feature_flags:is_enabled(?FEATURE_FLAG). + +-spec drain() -> ok. +drain() -> + case is_enabled() of + true -> do_drain(); + false -> rabbit_log:warning("Feature flag `~s` is not enabled, draining is a no-op", [?FEATURE_FLAG]) + end. + +-spec do_drain() -> ok. +do_drain() -> + rabbit_log:alert("This node is being put into maintenance (drain) mode"), + mark_as_being_drained(), + rabbit_log:info("Marked this node as undergoing maintenance"), + suspend_all_client_listeners(), + rabbit_log:alert("Suspended all listeners and will no longer accept client connections"), + {ok, NConnections} = close_all_client_connections(), + %% allow plugins to react e.g. by closing their protocol connections + rabbit_event:notify(maintenance_connections_closed, #{ + reason => <<"node is being put into maintenance">> + }), + rabbit_log:alert("Closed ~b local client connections", [NConnections]), + + TransferCandidates = primary_replica_transfer_candidate_nodes(), + ReadableCandidates = readable_candidate_list(TransferCandidates), + rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s", + [length(TransferCandidates), ReadableCandidates]), + transfer_leadership_of_classic_mirrored_queues(TransferCandidates), + transfer_leadership_of_quorum_queues(TransferCandidates), + + %% allow plugins to react + rabbit_event:notify(maintenance_draining, #{ + reason => <<"node is being put into maintenance">> + }), + rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"), + + ok. + +-spec revive() -> ok. +revive() -> + case is_enabled() of + true -> do_revive(); + false -> rabbit_log:warning("Feature flag `~s` is not enabled, reviving is a no-op", [?FEATURE_FLAG]) + end. + +-spec do_revive() -> ok. +do_revive() -> + rabbit_log:alert("This node is being revived from maintenance (drain) mode"), + revive_local_quorum_queue_replicas(), + rabbit_log:alert("Resumed all listeners and will accept client connections again"), + resume_all_client_listeners(), + rabbit_log:alert("Resumed all listeners and will accept client connections again"), + unmark_as_being_drained(), + rabbit_log:info("Marked this node as back from maintenance and ready to serve clients"), + + %% allow plugins to react + rabbit_event:notify(maintenance_revived, #{}), + + ok. + +-spec mark_as_being_drained() -> boolean(). +mark_as_being_drained() -> + rabbit_log:debug("Marking the node as undergoing maintenance"), + set_maintenance_status_status(?DRAINING_STATUS). + +-spec unmark_as_being_drained() -> boolean(). +unmark_as_being_drained() -> + rabbit_log:debug("Unmarking the node as undergoing maintenance"), + set_maintenance_status_status(?DEFAULT_STATUS). + +set_maintenance_status_status(Status) -> + Res = mnesia:transaction(fun () -> + case mnesia:wread({?TABLE, node()}) of + [] -> + Row = #node_maintenance_state{ + node = node(), + status = Status + }, + mnesia:write(?TABLE, Row, write); + [Row0] -> + Row = Row0#node_maintenance_state{ + node = node(), + status = Status + }, + mnesia:write(?TABLE, Row, write) + end + end), + case Res of + {atomic, ok} -> true; + _ -> false + end. + + +-spec is_being_drained_local_read(node()) -> boolean(). +is_being_drained_local_read(Node) -> + Status = status_local_read(Node), + Status =:= ?DRAINING_STATUS. + +-spec is_being_drained_consistent_read(node()) -> boolean(). +is_being_drained_consistent_read(Node) -> + Status = status_consistent_read(Node), + Status =:= ?DRAINING_STATUS. + +-spec status_local_read(node()) -> maintenance_status(). +status_local_read(Node) -> + case mnesia:dirty_read(?TABLE, Node) of + [] -> ?DEFAULT_STATUS; + [#node_maintenance_state{node = Node, status = Status}] -> + Status; + _ -> ?DEFAULT_STATUS + end. + +-spec status_consistent_read(node()) -> maintenance_status(). +status_consistent_read(Node) -> + case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of + {atomic, []} -> ?DEFAULT_STATUS; + {atomic, [#node_maintenance_state{node = Node, status = Status}]} -> + Status; + {atomic, _} -> ?DEFAULT_STATUS; + {aborted, _Reason} -> ?DEFAULT_STATUS + end. + + -spec filter_out_drained_nodes_local_read([node()]) -> [node()]. +filter_out_drained_nodes_local_read(Nodes) -> + lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes). + +-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()]. +filter_out_drained_nodes_consistent_read(Nodes) -> + lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes). + +-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). + %% Pauses all listeners on the current node except for + %% Erlang distribution (clustering and CLI tools). + %% A respausedumed listener will not accept any new client connections + %% but previously established connections won't be interrupted. +suspend_all_client_listeners() -> + Listeners = rabbit_networking:node_client_listeners(node()), + rabbit_log:info("Asked to suspend ~b client connection listeners. " + "No new client connections will be accepted until these listeners are resumed!", [length(Listeners)]), + Results = lists:foldl(local_listener_fold_fun(fun ranch:suspend_listener/1), [], Listeners), + lists:foldl(fun ok_or_first_error/2, ok, Results). + + -spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()). + %% Resumes all listeners on the current node except for + %% Erlang distribution (clustering and CLI tools). + %% A resumed listener will accept new client connections. +resume_all_client_listeners() -> + Listeners = rabbit_networking:node_client_listeners(node()), + rabbit_log:info("Asked to resume ~b client connection listeners. " + "New client connections will be accepted from now on", [length(Listeners)]), + Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners), + lists:foldl(fun ok_or_first_error/2, ok, Results). + + -spec close_all_client_connections() -> {'ok', non_neg_integer()}. +close_all_client_connections() -> + Pids = rabbit_networking:local_connections(), + rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"), + {ok, length(Pids)}. + +-spec transfer_leadership_of_quorum_queues([node()]) -> ok. +transfer_leadership_of_quorum_queues([]) -> + rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership_of_quorum_queues(_TransferCandidates) -> + %% we only transfer leadership for QQs that have local leaders + Queues = rabbit_amqqueue:list_local_leaders(), + rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will stop Ra server ~p", [RaLeader]), + case ra:stop_server(RaLeader) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"). + +-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok. + transfer_leadership_of_classic_mirrored_queues([]) -> + rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> + Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(), + ReadableCandidates = readable_candidate_list(TransferCandidates), + rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s", + [length(Queues), ReadableCandidates]), + + [begin + Name = amqqueue:get_name(Q), + case random_primary_replica_transfer_candidate_node(TransferCandidates) of + {ok, Pick} -> + rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s", + [rabbit_misc:rs(Name), Pick]), + case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of + {migrated, _} -> + rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s", + [rabbit_misc:rs(Name), Pick]); + Other -> + rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p", + [rabbit_misc:rs(Name), Pick, Other]) + end; + undefined -> + rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?", + [Name]) + end + end || Q <- Queues], + rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). + + -spec primary_replica_transfer_candidate_nodes() -> [node()]. +primary_replica_transfer_candidate_nodes() -> + filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). + +-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined. +random_primary_replica_transfer_candidate_node([]) -> + undefined; +random_primary_replica_transfer_candidate_node(Candidates) -> + Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)), + Candidate = lists:nth(Nth + 1, Candidates), + {ok, Candidate}. + +revive_local_quorum_queue_replicas() -> + Queues = rabbit_amqqueue:list_local_followers(), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will trigger a leader election for local quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% start local QQ replica (Ra server) of this queue + {Prefix, _Node} = amqqueue:get_pid(Q), + RaServer = {Prefix, node()}, + rabbit_log:debug("Will start Ra server ~p", [RaServer]), + case ra:restart_server(RaServer) of + ok -> + rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]); + {error, {already_started, _Pid}} -> + rabbit_log:debug("Ra server ~p is already running", [RaServer]); + {error, nodedown} -> + rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Restart of local quorum queue replicas is complete"). + +%% +%% Implementation +%% + +local_listener_fold_fun(Fun) -> + fun(#listener{node = Node, ip_address = Addr, port = Port}, Acc) when Node =:= node() -> + RanchRef = rabbit_networking:ranch_ref(Addr, Port), + [Fun(RanchRef) | Acc]; + (_, Acc) -> + Acc + end. + +ok_or_first_error(ok, Acc) -> + Acc; +ok_or_first_error({error, _} = Err, _Acc) -> + Err. + +readable_candidate_list(Nodes) -> + string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", "). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ba87617a74..070c6a8205 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -571,7 +571,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% Subsequent node in cluster, catch up maybe_force_load(), ok = rabbit_table:wait_for_replicated(_Retry = true), - ok = rabbit_table:create_local_copy(NodeType) + ok = rabbit_table:ensure_local_copies(NodeType) end, ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin), ensure_schema_integrity(), @@ -824,10 +824,17 @@ schema_ok_or_move() -> %% up only create_schema() -> stop_mnesia(), + rabbit_log:debug("Will bootstrap a schema database..."), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), + rabbit_log:debug("Bootstraped a schema database successfully"), start_mnesia(), + + rabbit_log:debug("Will create schema database tables"), ok = rabbit_table:create(), + rabbit_log:debug("Created schema database tables successfully"), + rabbit_log:debug("Will check schema database integrity..."), ensure_schema_integrity(), + rabbit_log:debug("Schema database schema integrity check passed"), ok = rabbit_version:record_desired(). move_db() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index d31c1e825b..966601d222 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -21,13 +21,16 @@ -export([boot/0, start_tcp_listener/2, start_ssl_listener/3, stop_tcp_listener/1, on_node_down/1, active_listeners/0, - node_listeners/1, register_connection/1, unregister_connection/1, + node_listeners/1, node_client_listeners/1, + register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, - close_connection/2, force_connection_event_refresh/1, - handshake/2, tcp_host/1]). + close_connection/2, close_connections/2, + force_connection_event_refresh/1, handshake/2, tcp_host/1, + ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, + listener_of_protocol/1, stop_ranch_listener_of_protocol/1]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/9, @@ -37,8 +40,11 @@ -deprecated([{force_connection_event_refresh, 1, eventually}]). -%% Internal --export([connections_local/0]). +-export([ + local_connections/0, + %% prefer local_connections/0 + connections_local/0 +]). -include("rabbit.hrl"). @@ -179,13 +185,63 @@ tcp_listener_addresses_auto(Port) -> tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) -> - {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), - {tcp_listener_sup, start_link, - [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts, - {?MODULE, tcp_listener_started, [Protocol, SocketOpts]}, - {?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]}, - NumAcceptors, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}. + Args = [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts, + {?MODULE, tcp_listener_started, [Protocol, SocketOpts]}, + {?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]}, + NumAcceptors, Label], + #{ + id => rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + start => {tcp_listener_sup, start_link, Args}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [tcp_listener_sup] + }. + +-spec ranch_ref(#listener{} | [{atom(), any()}] | 'undefined') -> ranch:ref() | undefined. +ranch_ref(#listener{port = Port}) -> + [{IPAddress, Port, _Family} | _] = tcp_listener_addresses(Port), + {acceptor, IPAddress, Port}; +ranch_ref(Listener) when is_list(Listener) -> + Port = rabbit_misc:pget(port, Listener), + [{IPAddress, Port, _Family} | _] = tcp_listener_addresses(Port), + {acceptor, IPAddress, Port}; +ranch_ref(undefined) -> + undefined. + +-spec ranch_ref(inet:ip_address(), ip_port()) -> ranch:ref(). + +%% Returns a reference that identifies a TCP listener in Ranch. +ranch_ref(IPAddress, Port) -> + {acceptor, IPAddress, Port}. + +-spec ranch_ref_of_protocol(atom()) -> ranch:ref() | undefined. +ranch_ref_of_protocol(Protocol) -> + ranch_ref(listener_of_protocol(Protocol)). + +-spec listener_of_protocol(atom()) -> #listener{}. +listener_of_protocol(Protocol) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + MatchSpec = #listener{ + node = node(), + protocol = Protocol, + _ = '_' + }, + case mnesia:match_object(rabbit_listener, MatchSpec, read) of + [] -> undefined; + [Row] -> Row + end + end). + +-spec stop_ranch_listener_of_protocol(atom()) -> ok | {error, not_found}. +stop_ranch_listener_of_protocol(Protocol) -> + case rabbit_networking:ranch_ref_of_protocol(Protocol) of + undefined -> ok; + Ref -> + rabbit_log:debug("Stopping Ranch listener for protocol ~s", [Protocol]), + ranch:stop_listener(Ref) + end. -spec start_tcp_listener( listener_config(), integer()) -> 'ok' | {'error', term()}. @@ -297,6 +353,17 @@ active_listeners() -> node_listeners(Node) -> mnesia:dirty_read(rabbit_listener, Node). +-spec node_client_listeners(node()) -> [rabbit_types:listener()]. + +node_client_listeners(Node) -> + case node_listeners(Node) of + [] -> []; + Xs -> + lists:filter(fun (#listener{protocol = clustering}) -> false; + (_) -> true + end, Xs) + end. + -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> @@ -324,8 +391,13 @@ connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). --spec connections_local() -> [rabbit_types:connection()]. +-spec local_connections() -> [rabbit_types:connection()]. +%% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node. +local_connections() -> + connections_local(). +-spec connections_local() -> [rabbit_types:connection()]. +%% @deprecated Prefer {@link local_connections} connections_local() -> pg_local:get_members(rabbit_connections). -spec connection_info_keys() -> rabbit_types:info_keys(). @@ -375,8 +447,12 @@ close_connection(Pid, Explanation) -> ok end. --spec force_connection_event_refresh(reference()) -> 'ok'. +-spec close_connections([pid()], string()) -> 'ok'. +close_connections(Pids, Explanation) -> + [close_connection(Pid, Explanation) || Pid <- Pids], + ok. +-spec force_connection_event_refresh(reference()) -> 'ok'. force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl index 3eb3d24217..2df1608534 100644 --- a/src/rabbit_queue_location_client_local.erl +++ b/src/rabbit_queue_location_client_local.erl @@ -30,4 +30,10 @@ description() -> [{description, <<"Locate queue master node as the client local node">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> + %% unlike with other locator strategies we do not check node maintenance + %% status for two reasons: + %% + %% * nodes in maintenance mode will drop their client connections + %% * with other strategies, if no nodes are available, the current node + %% is returned but this strategy already does just that {ok, node()}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index 94002cf580..6535f082fe 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -32,7 +32,7 @@ description() -> queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster = rabbit_queue_master_location_misc:all_nodes(Q), QueueNames = rabbit_amqqueue:list_names(), - MastersPerNode = lists:foldl( + MastersPerNode0 = lists:foldl( fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of {ok, Master} when is_atom(Master) -> @@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) -> end, maps:from_list([{N, 0} || N <- Cluster]), QueueNames), + + MastersPerNode = maps:filter(fun (Node, _N) -> + not rabbit_maintenance:is_being_drained_local_read(Node) + end, MastersPerNode0), - {MinNode, _NMasters} = maps:fold( - fun(Node, NMasters, init) -> - {Node, NMasters}; - (Node, NMasters, {MinNode, MinMasters}) -> - case NMasters < MinMasters of - true -> {Node, NMasters}; - false -> {MinNode, MinMasters} - end - end, - init, - MastersPerNode), - {ok, MinNode}. + case map_size(MastersPerNode) > 0 of + true -> + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, MastersPerNode), + {ok, MinNode}; + false -> + undefined + end. diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 9f2445ceac..7232fc6703 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -30,7 +30,13 @@ description() -> <<"Locate queue master node from cluster in a random manner">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), - MasterNode = lists:nth(RandomPos + 1, Cluster), - {ok, MasterNode}. + Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q), + Cluster = rabbit_maintenance:filter_out_drained_nodes_local_read(Cluster0), + case Cluster of + [] -> + undefined; + Candidates when is_list(Candidates) -> + RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)), + MasterNode = lists:nth(RandomPos + 1, Candidates), + {ok, MasterNode} + end. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index 7b65ef0d45..37698e184f 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -18,6 +18,7 @@ get_location_mod_by_policy/1, all_nodes/1]). +-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}. lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 3a7d2dd1bf..05f9a8d381 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -7,10 +7,12 @@ -module(rabbit_table). --export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, wait/2, - force_load/0, is_present/0, is_empty/0, needs_default_data/0, - check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, - wait_for_replicated/0]). +-export([ + create/0, create/2, ensure_local_copies/1, ensure_table_copy/2, + wait_for_replicated/1, wait/1, wait/2, + force_load/0, is_present/0, is_empty/0, needs_default_data/0, + check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, + wait_for_replicated/0, exists/1]). %% for testing purposes -export([definitions/0]). @@ -28,18 +30,29 @@ -spec create() -> 'ok'. create() -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, definitions()), + lists:foreach( + fun ({Table, Def}) -> create(Table, Def) end, + definitions()), ensure_secondary_indexes(), ok. +-spec create(mnesia:table(), list()) -> rabbit_types:ok_or_error(any()). + +create(TableName, TableDefinition) -> + TableDefinition1 = proplists:delete(match, TableDefinition), + rabbit_log:debug("Will create a schema database table '~s'", [TableName]), + case mnesia:create_table(TableName, TableDefinition1) of + {atomic, ok} -> ok; + {aborted,{already_exists, TableName}} -> ok; + {aborted, {already_exists, TableName, _}} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, TableName, TableDefinition1, Reason}}) + end. + +-spec exists(mnesia:table()) -> boolean(). +exists(Table) -> + lists:member(Table, mnesia:system_info(tables)). + %% Sets up secondary indexes in a blank node database. ensure_secondary_indexes() -> ensure_secondary_index(rabbit_queue, vhost), @@ -51,19 +64,15 @@ ensure_secondary_index(Table, Field) -> {aborted, {already_exists, Table, _}} -> ok end. -%% The sequence in which we delete the schema and then the other -%% tables is important: if we delete the schema first when moving to -%% RAM mnesia will loudly complain since it doesn't make much sense to -%% do that. But when moving to disc, we need to move the schema first. - --spec create_local_copy('disc' | 'ram') -> 'ok'. - -create_local_copy(disc) -> - create_local_copy(schema, disc_copies), - create_local_copies(disc); -create_local_copy(ram) -> - create_local_copies(ram), - create_local_copy(schema, ram_copies). +-spec ensure_table_copy(mnesia:table(), node()) -> ok | {error, any()}. +ensure_table_copy(TableName, Node) -> + rabbit_log:debug("Will add a local schema database copy for table '~s'", [TableName]), + case mnesia:add_table_copy(TableName, Node, disc_copies) of + {atomic, ok} -> ok; + {aborted,{already_exists, TableName}} -> ok; + {aborted, {already_exists, TableName, _}} -> ok; + {aborted, Reason} -> {error, Reason} + end. %% This arity only exists for backwards compatibility with certain %% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19. @@ -180,6 +189,20 @@ clear_ram_only_tables() -> end, names()), ok. +%% The sequence in which we delete the schema and then the other +%% tables is important: if we delete the schema first when moving to +%% RAM mnesia will loudly complain since it doesn't make much sense to +%% do that. But when moving to disc, we need to move the schema first. + +-spec ensure_local_copies('disc' | 'ram') -> 'ok'. + +ensure_local_copies(disc) -> + create_local_copy(schema, disc_copies), + create_local_copies(disc); +ensure_local_copies(ram) -> + create_local_copies(ram), + create_local_copy(schema, ram_copies). + %%-------------------------------------------------------------------- %% Internal helpers %%-------------------------------------------------------------------- @@ -363,7 +386,8 @@ definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, amqqueue:fields()}, - {match, amqqueue:pattern_match_on_name(queue_name_match())}]}] + {match, amqqueue:pattern_match_on_name(queue_name_match())}]} + ] ++ gm:table_definitions() ++ mirrored_supervisor:table_definitions(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e554e46821..59417c72bb 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -48,7 +48,7 @@ -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). -rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}). -rabbit_upgrade({vhost_limits, mnesia, []}). --rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}). +-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}). -rabbit_upgrade({topic_permission, mnesia, []}). -rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}). -rabbit_upgrade({exchange_options, mnesia, [operator_policies]}). @@ -610,6 +610,7 @@ user_password_hashing() -> end, [username, password_hash, tags, hashing_algorithm]). +-spec topic_permission() -> 'ok'. topic_permission() -> create(rabbit_topic_permission, [{record_name, topic_permission}, @@ -647,6 +648,7 @@ transform(TableName, Fun, FieldList, NewRecordName) -> ok. create(Tab, TabDef) -> + rabbit_log:debug("Will create a schema table named '~s'", [Tab]), {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index b0490a0f8d..76ff814ff4 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -17,11 +17,8 @@ -behaviour(supervisor). -export([start_link/10]). - -export([init/1]). -%%---------------------------------------------------------------------------- - -type mfargs() :: {atom(), atom(), [any()]}. -spec start_link @@ -48,11 +45,16 @@ init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, On {port, Port} | SocketOpts] }, - {ok, {{one_for_all, 10, 10}, [ - ranch:child_spec({acceptor, IPAddress, Port}, - Transport, RanchListenerOpts, - ProtoSup, ProtoOpts), - {tcp_listener, {tcp_listener, start_link, - [IPAddress, Port, - OnStartup, OnShutdown, Label]}, - transient, 16#ffffffff, worker, [tcp_listener]}]}}. + Flags = #{strategy => one_for_all, intensity => 10, period => 10}, + OurChildSpec = #{ + id => tcp_listener, + start => {tcp_listener, start_link, [IPAddress, Port, OnStartup, OnShutdown, Label]}, + restart => transient, + shutdown => 16#ffffffff, + type => worker, + modules => [tcp_listener] + }, + RanchChildSpec = ranch:child_spec(rabbit_networking:ranch_ref(IPAddress, Port), + Transport, RanchListenerOpts, + ProtoSup, ProtoOpts), + {ok, {Flags, [RanchChildSpec, OurChildSpec]}}. diff --git a/test/maintenance_mode_SUITE.erl b/test/maintenance_mode_SUITE.erl new file mode 100644 index 0000000000..6ec3f61ad5 --- /dev/null +++ b/test/maintenance_mode_SUITE.erl @@ -0,0 +1,227 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(maintenance_mode_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_3} + ]. + +groups() -> + [ + {cluster_size_3, [], [ + maintenance_mode_status, + listener_suspension_status, + client_connection_closure, + classic_mirrored_queue_leadership_transfer, + quorum_queue_leadership_transfer + ]} + ]. + +%% ------------------------------------------------------------------- +%% Setup and teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_3, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 3} + ]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_clustered, true}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ [ + fun rabbit_ct_broker_helpers:set_ha_policy_all/1 + ]). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Test Cases +%% ------------------------------------------------------------------- + +maintenance_mode_status(Config) -> + Nodes = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + [begin + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_local_read(Config, Node)), + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, Node)) + end || Node <- Nodes], + + [begin + [begin + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, NodeToCheck)) + end || NodeToCheck <- Nodes] + end || TargetNode <- Nodes], + + rabbit_ct_broker_helpers:mark_as_being_drained(Config, B), + rabbit_ct_helpers:await_condition( + fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, B) end, + 10000), + + [begin + ?assert(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, B)) + end || TargetNode <- Nodes], + + ?assertEqual( + lists:usort([A, C]), + lists:usort(rabbit_ct_broker_helpers:rpc(Config, B, + rabbit_maintenance, primary_replica_transfer_candidate_nodes, []))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, B), + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, B) end, + 10000), + + [begin + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_local_read(Config, TargetNode, B)), + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, B)) + end || TargetNode <- Nodes], + + ?assertEqual( + lists:usort([A, C]), + lists:usort(rabbit_ct_broker_helpers:rpc(Config, B, + rabbit_maintenance, primary_replica_transfer_candidate_nodes, []))), + + ok. + + +listener_suspension_status(Config) -> + Nodes = [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ct:pal("Picked node ~s for maintenance tests...", [A]), + + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + [begin + ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, Node)) + end || Node <- Nodes], + + Conn1 = rabbit_ct_client_helpers:open_connection(Config, A), + ?assert(is_pid(Conn1)), + rabbit_ct_client_helpers:close_connection(Conn1), + + rabbit_ct_broker_helpers:drain_node(Config, A), + rabbit_ct_helpers:await_condition( + fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + ?assertEqual({error, econnrefused}, rabbit_ct_client_helpers:open_unmanaged_connection(Config, A)), + + rabbit_ct_broker_helpers:revive_node(Config, A), + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + Conn3 = rabbit_ct_client_helpers:open_connection(Config, A), + ?assert(is_pid(Conn3)), + rabbit_ct_client_helpers:close_connection(Conn3), + + ok. + + +client_connection_closure(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ct:pal("Picked node ~s for maintenance tests...", [A]), + + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + Conn1 = rabbit_ct_client_helpers:open_connection(Config, A), + ?assert(is_pid(Conn1)), + ?assertEqual(1, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_networking, local_connections, []))), + + rabbit_ct_broker_helpers:drain_node(Config, A), + ?assertEqual(0, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_networking, local_connections, []))), + + rabbit_ct_broker_helpers:revive_node(Config, A). + + +classic_mirrored_queue_leadership_transfer(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ct:pal("Picked node ~s for maintenance tests...", [A]), + + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + PolicyPattern = <<"^cq.mirrored">>, + rabbit_ct_broker_helpers:set_ha_policy(Config, A, PolicyPattern, <<"all">>), + + Conn = rabbit_ct_client_helpers:open_connection(Config, A), + {ok, Ch} = amqp_connection:open_channel(Conn), + QName = <<"cq.mirrored.1">>, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true}), + + ?assertEqual(1, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list_local, [<<"/">>]))), + + rabbit_ct_broker_helpers:drain_node(Config, A), + rabbit_ct_helpers:await_condition( + fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + ?assertEqual(0, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list_local, [<<"/">>]))), + + rabbit_ct_broker_helpers:revive_node(Config, A), + %% rabbit_ct_broker_helpers:set_ha_policy/4 uses pattern for policy name + rabbit_ct_broker_helpers:clear_policy(Config, A, PolicyPattern). + +quorum_queue_leadership_transfer(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ct:pal("Picked node ~s for maintenance tests...", [A]), + + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + Conn = rabbit_ct_client_helpers:open_connection(Config, A), + {ok, Ch} = amqp_connection:open_channel(Conn), + QName = <<"qq.1">>, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true, arguments = [ + {<<"x-queue-type">>, longstr, <<"quorum">>} + ]}), + + %% we cannot assert on the number of local leaders here: declaring a QQ on node A + %% does not guarantee that the leader will be hosted on node A + + rabbit_ct_broker_helpers:drain_node(Config, A), + rabbit_ct_helpers:await_condition( + fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000), + + %% quorum queue leader election is asynchronous + rabbit_ct_helpers:await_condition( + fun () -> + LocalLeaders = rabbit_ct_broker_helpers:rpc(Config, A, + rabbit_amqqueue, list_local_leaders, []), + length(LocalLeaders) =:= 0 + end, 20000), + + rabbit_ct_broker_helpers:revive_node(Config, A). diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index c8ea773682..6a346c5618 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -34,7 +34,8 @@ all() -> [ - {group, cluster_size_3} + {group, cluster_size_3}, + {group, maintenance_mode} ]. groups() -> @@ -51,11 +52,19 @@ groups() -> calculate_min_master_with_bindings, calculate_random, calculate_client_local - ]} + ]}, + + {maintenance_mode, [], [ + declare_with_min_masters_and_some_nodes_under_maintenance, + declare_with_min_masters_and_all_nodes_under_maintenance, + + declare_with_random_and_some_nodes_under_maintenance, + declare_with_random_and_all_nodes_under_maintenance + ]} ]. %% ------------------------------------------------------------------- -%% Testsuite setup/teardown. +%% Test suite setup/teardown %% ------------------------------------------------------------------- init_per_suite(Config) -> @@ -67,7 +76,12 @@ end_per_suite(Config) -> init_per_group(cluster_size_3, Config) -> rabbit_ct_helpers:set_config(Config, [ - {rmq_nodes_count, 3} %% Replaced with a list of node names later. + %% Replaced with a list of node names later + {rmq_nodes_count, 3} + ]); +init_per_group(maintenance_mode, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 3} ]). end_per_group(_, Config) -> @@ -98,7 +112,7 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config1, Testcase). %% ------------------------------------------------------------------- -%% Testcases. +%% Test cases %% ------------------------------------------------------------------- %% @@ -199,12 +213,71 @@ declare_config(Config) -> setup_test_environment(Config), set_location_config(Config, <<"min-masters">>), QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), - declare(Config, QueueName, false, false, _Args=[], none), + declare(Config, QueueName, false, false, _Args = [], none), verify_min_master(Config, Q), unset_location_config(Config), ok. %% +%% Maintenance mode effects +%% + +declare_with_min_masters_and_some_nodes_under_maintenance(Config) -> + set_location_policy(Config, ?POLICY, <<"min-masters">>), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1), + + QName = <<"qm.tests.min_masters.maintenance.case1">>, + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% the only node that's not being drained + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1). + +declare_with_min_masters_and_all_nodes_under_maintenance(Config) -> + declare_with_all_nodes_under_maintenance(Config, <<"min-masters">>). + +declare_with_random_and_some_nodes_under_maintenance(Config) -> + set_location_policy(Config, ?POLICY, <<"random">>), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2), + + QName = <<"qm.tests.random.maintenance.case1">>, + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% the only node that's not being drained + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2). + +declare_with_random_and_all_nodes_under_maintenance(Config) -> + declare_with_all_nodes_under_maintenance(Config, <<"random">>). + +declare_with_all_nodes_under_maintenance(Config, Locator) -> + set_location_policy(Config, ?POLICY, Locator), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2), + + QName = rabbit_data_coercion:to_binary( + rabbit_misc:format("qm.tests.~s.maintenance.case2", [Locator])), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% when queue master locator returns no node, the node that handles + %% the declaration method will be used as a fallback + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2). + +%% %% Test 'calculations' %% @@ -333,8 +406,10 @@ unset_location_config(Config) -> declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) -> Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>], - {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1), - Queue. + case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1) of + {new, Queue} -> Queue; + Other -> Other + end. bind(Config, QueueName, RoutingKey) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<"amq.direct">>), |
