summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorD Corbacho <diana@rabbitmq.com>2020-07-14 15:49:41 +0100
committerGitHub <noreply@github.com>2020-07-14 15:49:41 +0100
commit56ab7a16822a1f00a0ab452ddc2513425100b6da (patch)
tree1c778ecdb94cbfa094fb1dbbb37889588c58fafd
parent9c9301917b84f6ad6b3b844ff76523c7b3bd1903 (diff)
parentac714634bd300d306c1b460addc5bf3c978e91a1 (diff)
downloadrabbitmq-server-git-56ab7a16822a1f00a0ab452ddc2513425100b6da.tar.gz
Merge pull request #2349 from rabbitmq/rabbitmq-server-2321
Node drain (maintenance mode) operations
-rw-r--r--apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl6
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_core_ff.erl28
-rw-r--r--src/rabbit_maintenance.erl330
-rw-r--r--src/rabbit_mnesia.erl9
-rw-r--r--src/rabbit_networking.erl104
-rw-r--r--src/rabbit_queue_location_client_local.erl6
-rw-r--r--src/rabbit_queue_location_min_masters.erl34
-rw-r--r--src/rabbit_queue_location_random.erl14
-rw-r--r--src/rabbit_queue_master_location_misc.erl1
-rw-r--r--src/rabbit_table.erl78
-rw-r--r--src/rabbit_upgrade_functions.erl4
-rw-r--r--src/tcp_listener_sup.erl24
-rw-r--r--test/maintenance_mode_SUITE.erl227
-rw-r--r--test/queue_master_location_SUITE.erl91
16 files changed, 887 insertions, 84 deletions
diff --git a/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl b/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl
index 2a2eb2b7fd..f04f252784 100644
--- a/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl
+++ b/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl
@@ -19,7 +19,7 @@ log_error(Error) ->
format_error({error, {duplicate_node_name, NodeName, NodeHost}}) ->
rabbit_misc:format(
- "ERROR: node with name ~p already running on ~p",
+ "ERROR: node with name ~p is already running on host ~p",
[NodeName, NodeHost]);
format_error({error, {epmd_error, NodeHost, EpmdReason}}) ->
rabbit_misc:format(
@@ -30,11 +30,11 @@ format_error({error, {invalid_dist_port_range, DistTcpPort}}) ->
"Invalid Erlang distribution TCP port: ~b", [DistTcpPort]);
format_error({error, {dist_port_already_used, Port, not_erlang, Host}}) ->
rabbit_misc:format(
- "ERROR: distribution port ~b in use on ~s "
+ "ERROR: distribution port ~b in use on host ~s "
"(by non-Erlang process?)", [Port, Host]);
format_error({error, {dist_port_already_used, Port, Name, Host}}) ->
rabbit_misc:format(
- "ERROR: distribution port ~b in use by ~s@~s", [Port, Name, Host]);
+ "ERROR: distribution port ~b in use by another node: ~s@~s", [Port, Name, Host]);
format_error({error, {erlang_dist_running_with_unexpected_nodename,
Unexpected, Node}}) ->
rabbit_misc:format(
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 60a0654340..8298dfe79d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -654,6 +654,7 @@ status() ->
{erlang_version, erlang:system_info(system_version)},
{memory, rabbit_vm:memory()},
{alarms, alarms()},
+ {is_under_maintenance, rabbit_maintenance:is_being_drained_local_read(node())},
{listeners, listeners()},
{vm_memory_calculation_strategy, vm_memory_monitor:get_memory_calculation_strategy()}],
S2 = rabbit_misc:filter_exit_map(
@@ -911,6 +912,9 @@ do_run_postlaunch_phase() ->
end
end, Plugins),
+ rabbit_log_prelaunch:info("Resetting node maintenance status"),
+ %% successful boot resets node maintenance state
+ rabbit_maintenance:unmark_as_being_drained(),
rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]),
rabbit_boot_state:set(ready),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8855c8bbf7..9818e689de 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -23,7 +23,6 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
- list_local_mirrored_classic_names/0,
list_local_names_down/0, list_with_possible_retry/1]).
-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
@@ -38,6 +37,7 @@
-export([has_synchronised_mirrors_online/1]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
+ list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
list_local_mirrored_classic_without_synchronised_mirrors/0,
list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]).
@@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) ->
_ ->
case rabbit_queue_master_location_misc:get_location(Q) of
{ok, Node0} -> Node0;
+ undefined -> Node;
{error, _} -> Node
end
end,
@@ -1071,6 +1072,14 @@ list_local_followers() ->
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(),
lists:member(node(), get_quorum_nodes(Q))].
+-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()].
+list_local_mirrored_classic_queues() ->
+ [ Q || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q)].
+
-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
list_local_mirrored_classic_names() ->
[ amqqueue:get_name(Q) || Q <- list(),
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
index e4c01339a9..f97170f6ac 100644
--- a/src/rabbit_core_ff.erl
+++ b/src/rabbit_core_ff.erl
@@ -9,7 +9,8 @@
-export([quorum_queue_migration/3,
implicit_default_bindings_migration/3,
- virtual_host_metadata_migration/3]).
+ virtual_host_metadata_migration/3,
+ maintenance_mode_status_migration/3]).
-rabbit_feature_flag(
{quorum_queue,
@@ -34,6 +35,14 @@
migration_fun => {?MODULE, virtual_host_metadata_migration}
}}).
+-rabbit_feature_flag(
+ {maintenance_mode_status,
+ #{
+ desc => "Maintenance mode status",
+ stability => stable,
+ migration_fun => {?MODULE, maintenance_mode_status_migration}
+ }}).
+
%% -------------------------------------------------------------------
%% Quorum queues.
%% -------------------------------------------------------------------
@@ -112,3 +121,20 @@ virtual_host_metadata_migration(_FeatureName, _FeatureProps, enable) ->
end;
virtual_host_metadata_migration(_FeatureName, _FeatureProps, is_enabled) ->
mnesia:table_info(rabbit_vhost, attributes) =:= vhost:fields(vhost_v2).
+
+
+%%
+%% Maintenance mode
+%%
+
+maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) ->
+ TableName = rabbit_maintenance:status_table_name(),
+ rabbit_log:info("Creating table ~s for feature flag `~s`", [TableName, FeatureName]),
+ try
+ _ = rabbit_table:create(TableName, rabbit_maintenance:status_table_definition()),
+ _ = rabbit_table:ensure_table_copy(TableName, node())
+ catch throw:Reason ->
+ rabbit_log:error("Failed to create maintenance status table: ~p", [Reason])
+ end;
+maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) ->
+ rabbit_table:exists(rabbit_maintenance:status_table_name()).
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
new file mode 100644
index 0000000000..7c8b39ad84
--- /dev/null
+++ b/src/rabbit_maintenance.erl
@@ -0,0 +1,330 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_maintenance).
+
+-include("rabbit.hrl").
+
+-export([
+ is_enabled/0,
+ drain/0,
+ revive/0,
+ mark_as_being_drained/0,
+ unmark_as_being_drained/0,
+ is_being_drained_local_read/1,
+ is_being_drained_consistent_read/1,
+ status_local_read/1,
+ status_consistent_read/1,
+ filter_out_drained_nodes_local_read/1,
+ filter_out_drained_nodes_consistent_read/1,
+ suspend_all_client_listeners/0,
+ resume_all_client_listeners/0,
+ close_all_client_connections/0,
+ primary_replica_transfer_candidate_nodes/0,
+ random_primary_replica_transfer_candidate_node/1,
+ transfer_leadership_of_quorum_queues/1,
+ transfer_leadership_of_classic_mirrored_queues/1,
+ status_table_name/0,
+ status_table_definition/0
+]).
+
+-define(TABLE, rabbit_node_maintenance_states).
+-define(FEATURE_FLAG, maintenance_mode_status).
+-define(DEFAULT_STATUS, regular).
+-define(DRAINING_STATUS, draining).
+
+-type maintenance_status() :: ?DEFAULT_STATUS | ?DRAINING_STATUS.
+
+-export_type([
+ maintenance_status/0
+]).
+
+%%
+%% API
+%%
+
+-spec status_table_name() -> mnesia:table().
+status_table_name() ->
+ ?TABLE.
+
+-spec status_table_definition() -> list().
+status_table_definition() ->
+ maps:to_list(#{
+ record_name => node_maintenance_state,
+ attributes => record_info(fields, node_maintenance_state)
+ }).
+
+-spec is_enabled() -> boolean().
+is_enabled() ->
+ rabbit_feature_flags:is_enabled(?FEATURE_FLAG).
+
+-spec drain() -> ok.
+drain() ->
+ case is_enabled() of
+ true -> do_drain();
+ false -> rabbit_log:warning("Feature flag `~s` is not enabled, draining is a no-op", [?FEATURE_FLAG])
+ end.
+
+-spec do_drain() -> ok.
+do_drain() ->
+ rabbit_log:alert("This node is being put into maintenance (drain) mode"),
+ mark_as_being_drained(),
+ rabbit_log:info("Marked this node as undergoing maintenance"),
+ suspend_all_client_listeners(),
+ rabbit_log:alert("Suspended all listeners and will no longer accept client connections"),
+ {ok, NConnections} = close_all_client_connections(),
+ %% allow plugins to react e.g. by closing their protocol connections
+ rabbit_event:notify(maintenance_connections_closed, #{
+ reason => <<"node is being put into maintenance">>
+ }),
+ rabbit_log:alert("Closed ~b local client connections", [NConnections]),
+
+ TransferCandidates = primary_replica_transfer_candidate_nodes(),
+ ReadableCandidates = readable_candidate_list(TransferCandidates),
+ rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s",
+ [length(TransferCandidates), ReadableCandidates]),
+ transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
+ transfer_leadership_of_quorum_queues(TransferCandidates),
+
+ %% allow plugins to react
+ rabbit_event:notify(maintenance_draining, #{
+ reason => <<"node is being put into maintenance">>
+ }),
+ rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"),
+
+ ok.
+
+-spec revive() -> ok.
+revive() ->
+ case is_enabled() of
+ true -> do_revive();
+ false -> rabbit_log:warning("Feature flag `~s` is not enabled, reviving is a no-op", [?FEATURE_FLAG])
+ end.
+
+-spec do_revive() -> ok.
+do_revive() ->
+ rabbit_log:alert("This node is being revived from maintenance (drain) mode"),
+ revive_local_quorum_queue_replicas(),
+ rabbit_log:alert("Resumed all listeners and will accept client connections again"),
+ resume_all_client_listeners(),
+ rabbit_log:alert("Resumed all listeners and will accept client connections again"),
+ unmark_as_being_drained(),
+ rabbit_log:info("Marked this node as back from maintenance and ready to serve clients"),
+
+ %% allow plugins to react
+ rabbit_event:notify(maintenance_revived, #{}),
+
+ ok.
+
+-spec mark_as_being_drained() -> boolean().
+mark_as_being_drained() ->
+ rabbit_log:debug("Marking the node as undergoing maintenance"),
+ set_maintenance_status_status(?DRAINING_STATUS).
+
+-spec unmark_as_being_drained() -> boolean().
+unmark_as_being_drained() ->
+ rabbit_log:debug("Unmarking the node as undergoing maintenance"),
+ set_maintenance_status_status(?DEFAULT_STATUS).
+
+set_maintenance_status_status(Status) ->
+ Res = mnesia:transaction(fun () ->
+ case mnesia:wread({?TABLE, node()}) of
+ [] ->
+ Row = #node_maintenance_state{
+ node = node(),
+ status = Status
+ },
+ mnesia:write(?TABLE, Row, write);
+ [Row0] ->
+ Row = Row0#node_maintenance_state{
+ node = node(),
+ status = Status
+ },
+ mnesia:write(?TABLE, Row, write)
+ end
+ end),
+ case Res of
+ {atomic, ok} -> true;
+ _ -> false
+ end.
+
+
+-spec is_being_drained_local_read(node()) -> boolean().
+is_being_drained_local_read(Node) ->
+ Status = status_local_read(Node),
+ Status =:= ?DRAINING_STATUS.
+
+-spec is_being_drained_consistent_read(node()) -> boolean().
+is_being_drained_consistent_read(Node) ->
+ Status = status_consistent_read(Node),
+ Status =:= ?DRAINING_STATUS.
+
+-spec status_local_read(node()) -> maintenance_status().
+status_local_read(Node) ->
+ case mnesia:dirty_read(?TABLE, Node) of
+ [] -> ?DEFAULT_STATUS;
+ [#node_maintenance_state{node = Node, status = Status}] ->
+ Status;
+ _ -> ?DEFAULT_STATUS
+ end.
+
+-spec status_consistent_read(node()) -> maintenance_status().
+status_consistent_read(Node) ->
+ case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of
+ {atomic, []} -> ?DEFAULT_STATUS;
+ {atomic, [#node_maintenance_state{node = Node, status = Status}]} ->
+ Status;
+ {atomic, _} -> ?DEFAULT_STATUS;
+ {aborted, _Reason} -> ?DEFAULT_STATUS
+ end.
+
+ -spec filter_out_drained_nodes_local_read([node()]) -> [node()].
+filter_out_drained_nodes_local_read(Nodes) ->
+ lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes).
+
+-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()].
+filter_out_drained_nodes_consistent_read(Nodes) ->
+ lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes).
+
+-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
+ %% Pauses all listeners on the current node except for
+ %% Erlang distribution (clustering and CLI tools).
+ %% A respausedumed listener will not accept any new client connections
+ %% but previously established connections won't be interrupted.
+suspend_all_client_listeners() ->
+ Listeners = rabbit_networking:node_client_listeners(node()),
+ rabbit_log:info("Asked to suspend ~b client connection listeners. "
+ "No new client connections will be accepted until these listeners are resumed!", [length(Listeners)]),
+ Results = lists:foldl(local_listener_fold_fun(fun ranch:suspend_listener/1), [], Listeners),
+ lists:foldl(fun ok_or_first_error/2, ok, Results).
+
+ -spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()).
+ %% Resumes all listeners on the current node except for
+ %% Erlang distribution (clustering and CLI tools).
+ %% A resumed listener will accept new client connections.
+resume_all_client_listeners() ->
+ Listeners = rabbit_networking:node_client_listeners(node()),
+ rabbit_log:info("Asked to resume ~b client connection listeners. "
+ "New client connections will be accepted from now on", [length(Listeners)]),
+ Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners),
+ lists:foldl(fun ok_or_first_error/2, ok, Results).
+
+ -spec close_all_client_connections() -> {'ok', non_neg_integer()}.
+close_all_client_connections() ->
+ Pids = rabbit_networking:local_connections(),
+ rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
+ {ok, length(Pids)}.
+
+-spec transfer_leadership_of_quorum_queues([node()]) -> ok.
+transfer_leadership_of_quorum_queues([]) ->
+ rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership_of_quorum_queues(_TransferCandidates) ->
+ %% we only transfer leadership for QQs that have local leaders
+ Queues = rabbit_amqqueue:list_local_leaders(),
+ rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% we trigger an election and exclude this node from the list of candidates
+ %% by simply shutting its local QQ replica (Ra server)
+ RaLeader = amqqueue:get_pid(Q),
+ rabbit_log:debug("Will stop Ra server ~p", [RaLeader]),
+ case ra:stop_server(RaLeader) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
+
+-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok.
+ transfer_leadership_of_classic_mirrored_queues([]) ->
+ rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
+ Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(),
+ ReadableCandidates = readable_candidate_list(TransferCandidates),
+ rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
+ [length(Queues), ReadableCandidates]),
+
+ [begin
+ Name = amqqueue:get_name(Q),
+ case random_primary_replica_transfer_candidate_node(TransferCandidates) of
+ {ok, Pick} ->
+ rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
+ [rabbit_misc:rs(Name), Pick]),
+ case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
+ {migrated, _} ->
+ rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s",
+ [rabbit_misc:rs(Name), Pick]);
+ Other ->
+ rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p",
+ [rabbit_misc:rs(Name), Pick, Other])
+ end;
+ undefined ->
+ rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?",
+ [Name])
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").
+
+ -spec primary_replica_transfer_candidate_nodes() -> [node()].
+primary_replica_transfer_candidate_nodes() ->
+ filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
+
+-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
+random_primary_replica_transfer_candidate_node([]) ->
+ undefined;
+random_primary_replica_transfer_candidate_node(Candidates) ->
+ Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
+ Candidate = lists:nth(Nth + 1, Candidates),
+ {ok, Candidate}.
+
+revive_local_quorum_queue_replicas() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% start local QQ replica (Ra server) of this queue
+ {Prefix, _Node} = amqqueue:get_pid(Q),
+ RaServer = {Prefix, node()},
+ rabbit_log:debug("Will start Ra server ~p", [RaServer]),
+ case ra:restart_server(RaServer) of
+ ok ->
+ rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]);
+ {error, {already_started, _Pid}} ->
+ rabbit_log:debug("Ra server ~p is already running", [RaServer]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Restart of local quorum queue replicas is complete").
+
+%%
+%% Implementation
+%%
+
+local_listener_fold_fun(Fun) ->
+ fun(#listener{node = Node, ip_address = Addr, port = Port}, Acc) when Node =:= node() ->
+ RanchRef = rabbit_networking:ranch_ref(Addr, Port),
+ [Fun(RanchRef) | Acc];
+ (_, Acc) ->
+ Acc
+ end.
+
+ok_or_first_error(ok, Acc) ->
+ Acc;
+ok_or_first_error({error, _} = Err, _Acc) ->
+ Err.
+
+readable_candidate_list(Nodes) ->
+ string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", ").
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ba87617a74..070c6a8205 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -571,7 +571,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
%% Subsequent node in cluster, catch up
maybe_force_load(),
ok = rabbit_table:wait_for_replicated(_Retry = true),
- ok = rabbit_table:create_local_copy(NodeType)
+ ok = rabbit_table:ensure_local_copies(NodeType)
end,
ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin),
ensure_schema_integrity(),
@@ -824,10 +824,17 @@ schema_ok_or_move() ->
%% up only
create_schema() ->
stop_mnesia(),
+ rabbit_log:debug("Will bootstrap a schema database..."),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
+ rabbit_log:debug("Bootstraped a schema database successfully"),
start_mnesia(),
+
+ rabbit_log:debug("Will create schema database tables"),
ok = rabbit_table:create(),
+ rabbit_log:debug("Created schema database tables successfully"),
+ rabbit_log:debug("Will check schema database integrity..."),
ensure_schema_integrity(),
+ rabbit_log:debug("Schema database schema integrity check passed"),
ok = rabbit_version:record_desired().
move_db() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index d31c1e825b..966601d222 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,13 +21,16 @@
-export([boot/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, register_connection/1, unregister_connection/1,
+ node_listeners/1, node_client_listeners/1,
+ register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
- close_connection/2, force_connection_event_refresh/1,
- handshake/2, tcp_host/1]).
+ close_connection/2, close_connections/2,
+ force_connection_event_refresh/1, handshake/2, tcp_host/1,
+ ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
+ listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -37,8 +40,11 @@
-deprecated([{force_connection_event_refresh, 1, eventually}]).
-%% Internal
--export([connections_local/0]).
+-export([
+ local_connections/0,
+ %% prefer local_connections/0
+ connections_local/0
+]).
-include("rabbit.hrl").
@@ -179,13 +185,63 @@ tcp_listener_addresses_auto(Port) ->
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) ->
- {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
- {tcp_listener_sup, start_link,
- [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts,
- {?MODULE, tcp_listener_started, [Protocol, SocketOpts]},
- {?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]},
- NumAcceptors, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}.
+ Args = [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts,
+ {?MODULE, tcp_listener_started, [Protocol, SocketOpts]},
+ {?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]},
+ NumAcceptors, Label],
+ #{
+ id => rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ start => {tcp_listener_sup, start_link, Args},
+ restart => transient,
+ shutdown => infinity,
+ type => supervisor,
+ modules => [tcp_listener_sup]
+ }.
+
+-spec ranch_ref(#listener{} | [{atom(), any()}] | 'undefined') -> ranch:ref() | undefined.
+ranch_ref(#listener{port = Port}) ->
+ [{IPAddress, Port, _Family} | _] = tcp_listener_addresses(Port),
+ {acceptor, IPAddress, Port};
+ranch_ref(Listener) when is_list(Listener) ->
+ Port = rabbit_misc:pget(port, Listener),
+ [{IPAddress, Port, _Family} | _] = tcp_listener_addresses(Port),
+ {acceptor, IPAddress, Port};
+ranch_ref(undefined) ->
+ undefined.
+
+-spec ranch_ref(inet:ip_address(), ip_port()) -> ranch:ref().
+
+%% Returns a reference that identifies a TCP listener in Ranch.
+ranch_ref(IPAddress, Port) ->
+ {acceptor, IPAddress, Port}.
+
+-spec ranch_ref_of_protocol(atom()) -> ranch:ref() | undefined.
+ranch_ref_of_protocol(Protocol) ->
+ ranch_ref(listener_of_protocol(Protocol)).
+
+-spec listener_of_protocol(atom()) -> #listener{}.
+listener_of_protocol(Protocol) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ MatchSpec = #listener{
+ node = node(),
+ protocol = Protocol,
+ _ = '_'
+ },
+ case mnesia:match_object(rabbit_listener, MatchSpec, read) of
+ [] -> undefined;
+ [Row] -> Row
+ end
+ end).
+
+-spec stop_ranch_listener_of_protocol(atom()) -> ok | {error, not_found}.
+stop_ranch_listener_of_protocol(Protocol) ->
+ case rabbit_networking:ranch_ref_of_protocol(Protocol) of
+ undefined -> ok;
+ Ref ->
+ rabbit_log:debug("Stopping Ranch listener for protocol ~s", [Protocol]),
+ ranch:stop_listener(Ref)
+ end.
-spec start_tcp_listener(
listener_config(), integer()) -> 'ok' | {'error', term()}.
@@ -297,6 +353,17 @@ active_listeners() ->
node_listeners(Node) ->
mnesia:dirty_read(rabbit_listener, Node).
+-spec node_client_listeners(node()) -> [rabbit_types:listener()].
+
+node_client_listeners(Node) ->
+ case node_listeners(Node) of
+ [] -> [];
+ Xs ->
+ lists:filter(fun (#listener{protocol = clustering}) -> false;
+ (_) -> true
+ end, Xs)
+ end.
+
-spec on_node_down(node()) -> 'ok'.
on_node_down(Node) ->
@@ -324,8 +391,13 @@ connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
--spec connections_local() -> [rabbit_types:connection()].
+-spec local_connections() -> [rabbit_types:connection()].
+%% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node.
+local_connections() ->
+ connections_local().
+-spec connections_local() -> [rabbit_types:connection()].
+%% @deprecated Prefer {@link local_connections}
connections_local() -> pg_local:get_members(rabbit_connections).
-spec connection_info_keys() -> rabbit_types:info_keys().
@@ -375,8 +447,12 @@ close_connection(Pid, Explanation) ->
ok
end.
--spec force_connection_event_refresh(reference()) -> 'ok'.
+-spec close_connections([pid()], string()) -> 'ok'.
+close_connections(Pids, Explanation) ->
+ [close_connection(Pid, Explanation) || Pid <- Pids],
+ ok.
+-spec force_connection_event_refresh(reference()) -> 'ok'.
force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
index 3eb3d24217..2df1608534 100644
--- a/src/rabbit_queue_location_client_local.erl
+++ b/src/rabbit_queue_location_client_local.erl
@@ -30,4 +30,10 @@ description() ->
[{description, <<"Locate queue master node as the client local node">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
+ %% unlike with other locator strategies we do not check node maintenance
+ %% status for two reasons:
+ %%
+ %% * nodes in maintenance mode will drop their client connections
+ %% * with other strategies, if no nodes are available, the current node
+ %% is returned but this strategy already does just that
{ok, node()}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index 94002cf580..6535f082fe 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -32,7 +32,7 @@ description() ->
queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
QueueNames = rabbit_amqqueue:list_names(),
- MastersPerNode = lists:foldl(
+ MastersPerNode0 = lists:foldl(
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
{ok, Master} when is_atom(Master) ->
@@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) ->
end,
maps:from_list([{N, 0} || N <- Cluster]),
QueueNames),
+
+ MastersPerNode = maps:filter(fun (Node, _N) ->
+ not rabbit_maintenance:is_being_drained_local_read(Node)
+ end, MastersPerNode0),
- {MinNode, _NMasters} = maps:fold(
- fun(Node, NMasters, init) ->
- {Node, NMasters};
- (Node, NMasters, {MinNode, MinMasters}) ->
- case NMasters < MinMasters of
- true -> {Node, NMasters};
- false -> {MinNode, MinMasters}
- end
- end,
- init,
- MastersPerNode),
- {ok, MinNode}.
+ case map_size(MastersPerNode) > 0 of
+ true ->
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init, MastersPerNode),
+ {ok, MinNode};
+ false ->
+ undefined
+ end.
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 9f2445ceac..7232fc6703 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -30,7 +30,13 @@ description() ->
<<"Locate queue master node from cluster in a random manner">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
- MasterNode = lists:nth(RandomPos + 1, Cluster),
- {ok, MasterNode}.
+ Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q),
+ Cluster = rabbit_maintenance:filter_out_drained_nodes_local_read(Cluster0),
+ case Cluster of
+ [] ->
+ undefined;
+ Candidates when is_list(Candidates) ->
+ RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
+ MasterNode = lists:nth(RandomPos + 1, Candidates),
+ {ok, MasterNode}
+ end.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index 7b65ef0d45..37698e184f 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -18,6 +18,7 @@
get_location_mod_by_policy/1,
all_nodes/1]).
+-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}.
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 3a7d2dd1bf..05f9a8d381 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -7,10 +7,12 @@
-module(rabbit_table).
--export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, wait/2,
- force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
- wait_for_replicated/0]).
+-export([
+ create/0, create/2, ensure_local_copies/1, ensure_table_copy/2,
+ wait_for_replicated/1, wait/1, wait/2,
+ force_load/0, is_present/0, is_empty/0, needs_default_data/0,
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
+ wait_for_replicated/0, exists/1]).
%% for testing purposes
-export([definitions/0]).
@@ -28,18 +30,29 @@
-spec create() -> 'ok'.
create() ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end, definitions()),
+ lists:foreach(
+ fun ({Table, Def}) -> create(Table, Def) end,
+ definitions()),
ensure_secondary_indexes(),
ok.
+-spec create(mnesia:table(), list()) -> rabbit_types:ok_or_error(any()).
+
+create(TableName, TableDefinition) ->
+ TableDefinition1 = proplists:delete(match, TableDefinition),
+ rabbit_log:debug("Will create a schema database table '~s'", [TableName]),
+ case mnesia:create_table(TableName, TableDefinition1) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists, TableName}} -> ok;
+ {aborted, {already_exists, TableName, _}} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed, TableName, TableDefinition1, Reason}})
+ end.
+
+-spec exists(mnesia:table()) -> boolean().
+exists(Table) ->
+ lists:member(Table, mnesia:system_info(tables)).
+
%% Sets up secondary indexes in a blank node database.
ensure_secondary_indexes() ->
ensure_secondary_index(rabbit_queue, vhost),
@@ -51,19 +64,15 @@ ensure_secondary_index(Table, Field) ->
{aborted, {already_exists, Table, _}} -> ok
end.
-%% The sequence in which we delete the schema and then the other
-%% tables is important: if we delete the schema first when moving to
-%% RAM mnesia will loudly complain since it doesn't make much sense to
-%% do that. But when moving to disc, we need to move the schema first.
-
--spec create_local_copy('disc' | 'ram') -> 'ok'.
-
-create_local_copy(disc) ->
- create_local_copy(schema, disc_copies),
- create_local_copies(disc);
-create_local_copy(ram) ->
- create_local_copies(ram),
- create_local_copy(schema, ram_copies).
+-spec ensure_table_copy(mnesia:table(), node()) -> ok | {error, any()}.
+ensure_table_copy(TableName, Node) ->
+ rabbit_log:debug("Will add a local schema database copy for table '~s'", [TableName]),
+ case mnesia:add_table_copy(TableName, Node, disc_copies) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists, TableName}} -> ok;
+ {aborted, {already_exists, TableName, _}} -> ok;
+ {aborted, Reason} -> {error, Reason}
+ end.
%% This arity only exists for backwards compatibility with certain
%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
@@ -180,6 +189,20 @@ clear_ram_only_tables() ->
end, names()),
ok.
+%% The sequence in which we delete the schema and then the other
+%% tables is important: if we delete the schema first when moving to
+%% RAM mnesia will loudly complain since it doesn't make much sense to
+%% do that. But when moving to disc, we need to move the schema first.
+
+-spec ensure_local_copies('disc' | 'ram') -> 'ok'.
+
+ensure_local_copies(disc) ->
+ create_local_copy(schema, disc_copies),
+ create_local_copies(disc);
+ensure_local_copies(ram) ->
+ create_local_copies(ram),
+ create_local_copy(schema, ram_copies).
+
%%--------------------------------------------------------------------
%% Internal helpers
%%--------------------------------------------------------------------
@@ -363,7 +386,8 @@ definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, amqqueue:fields()},
- {match, amqqueue:pattern_match_on_name(queue_name_match())}]}]
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]}
+ ]
++ gm:table_definitions()
++ mirrored_supervisor:table_definitions().
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e554e46821..59417c72bb 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,7 +48,7 @@
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
--rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
+-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
-rabbit_upgrade({topic_permission, mnesia, []}).
-rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}).
-rabbit_upgrade({exchange_options, mnesia, [operator_policies]}).
@@ -610,6 +610,7 @@ user_password_hashing() ->
end,
[username, password_hash, tags, hashing_algorithm]).
+-spec topic_permission() -> 'ok'.
topic_permission() ->
create(rabbit_topic_permission,
[{record_name, topic_permission},
@@ -647,6 +648,7 @@ transform(TableName, Fun, FieldList, NewRecordName) ->
ok.
create(Tab, TabDef) ->
+ rabbit_log:debug("Will create a schema table named '~s'", [Tab]),
{atomic, ok} = mnesia:create_table(Tab, TabDef),
ok.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index b0490a0f8d..76ff814ff4 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -17,11 +17,8 @@
-behaviour(supervisor).
-export([start_link/10]).
-
-export([init/1]).
-%%----------------------------------------------------------------------------
-
-type mfargs() :: {atom(), atom(), [any()]}.
-spec start_link
@@ -48,11 +45,16 @@ init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, On
{port, Port} |
SocketOpts]
},
- {ok, {{one_for_all, 10, 10}, [
- ranch:child_spec({acceptor, IPAddress, Port},
- Transport, RanchListenerOpts,
- ProtoSup, ProtoOpts),
- {tcp_listener, {tcp_listener, start_link,
- [IPAddress, Port,
- OnStartup, OnShutdown, Label]},
- transient, 16#ffffffff, worker, [tcp_listener]}]}}.
+ Flags = #{strategy => one_for_all, intensity => 10, period => 10},
+ OurChildSpec = #{
+ id => tcp_listener,
+ start => {tcp_listener, start_link, [IPAddress, Port, OnStartup, OnShutdown, Label]},
+ restart => transient,
+ shutdown => 16#ffffffff,
+ type => worker,
+ modules => [tcp_listener]
+ },
+ RanchChildSpec = ranch:child_spec(rabbit_networking:ranch_ref(IPAddress, Port),
+ Transport, RanchListenerOpts,
+ ProtoSup, ProtoOpts),
+ {ok, {Flags, [RanchChildSpec, OurChildSpec]}}.
diff --git a/test/maintenance_mode_SUITE.erl b/test/maintenance_mode_SUITE.erl
new file mode 100644
index 0000000000..6ec3f61ad5
--- /dev/null
+++ b/test/maintenance_mode_SUITE.erl
@@ -0,0 +1,227 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(maintenance_mode_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_3, [], [
+ maintenance_mode_status,
+ listener_suspension_status,
+ client_connection_closure,
+ classic_mirrored_queue_leadership_transfer,
+ quorum_queue_leadership_transfer
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Setup and teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_all/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test Cases
+%% -------------------------------------------------------------------
+
+maintenance_mode_status(Config) ->
+ Nodes = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ [begin
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_local_read(Config, Node)),
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, Node))
+ end || Node <- Nodes],
+
+ [begin
+ [begin
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, NodeToCheck))
+ end || NodeToCheck <- Nodes]
+ end || TargetNode <- Nodes],
+
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, B),
+ rabbit_ct_helpers:await_condition(
+ fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, B) end,
+ 10000),
+
+ [begin
+ ?assert(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, B))
+ end || TargetNode <- Nodes],
+
+ ?assertEqual(
+ lists:usort([A, C]),
+ lists:usort(rabbit_ct_broker_helpers:rpc(Config, B,
+ rabbit_maintenance, primary_replica_transfer_candidate_nodes, []))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, B),
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, B) end,
+ 10000),
+
+ [begin
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_local_read(Config, TargetNode, B)),
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, TargetNode, B))
+ end || TargetNode <- Nodes],
+
+ ?assertEqual(
+ lists:usort([A, C]),
+ lists:usort(rabbit_ct_broker_helpers:rpc(Config, B,
+ rabbit_maintenance, primary_replica_transfer_candidate_nodes, []))),
+
+ ok.
+
+
+listener_suspension_status(Config) ->
+ Nodes = [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ct:pal("Picked node ~s for maintenance tests...", [A]),
+
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ [begin
+ ?assertNot(rabbit_ct_broker_helpers:is_being_drained_consistent_read(Config, Node))
+ end || Node <- Nodes],
+
+ Conn1 = rabbit_ct_client_helpers:open_connection(Config, A),
+ ?assert(is_pid(Conn1)),
+ rabbit_ct_client_helpers:close_connection(Conn1),
+
+ rabbit_ct_broker_helpers:drain_node(Config, A),
+ rabbit_ct_helpers:await_condition(
+ fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ ?assertEqual({error, econnrefused}, rabbit_ct_client_helpers:open_unmanaged_connection(Config, A)),
+
+ rabbit_ct_broker_helpers:revive_node(Config, A),
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ Conn3 = rabbit_ct_client_helpers:open_connection(Config, A),
+ ?assert(is_pid(Conn3)),
+ rabbit_ct_client_helpers:close_connection(Conn3),
+
+ ok.
+
+
+client_connection_closure(Config) ->
+ [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ct:pal("Picked node ~s for maintenance tests...", [A]),
+
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ Conn1 = rabbit_ct_client_helpers:open_connection(Config, A),
+ ?assert(is_pid(Conn1)),
+ ?assertEqual(1, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_networking, local_connections, []))),
+
+ rabbit_ct_broker_helpers:drain_node(Config, A),
+ ?assertEqual(0, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_networking, local_connections, []))),
+
+ rabbit_ct_broker_helpers:revive_node(Config, A).
+
+
+classic_mirrored_queue_leadership_transfer(Config) ->
+ [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ct:pal("Picked node ~s for maintenance tests...", [A]),
+
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ PolicyPattern = <<"^cq.mirrored">>,
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, PolicyPattern, <<"all">>),
+
+ Conn = rabbit_ct_client_helpers:open_connection(Config, A),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QName = <<"cq.mirrored.1">>,
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true}),
+
+ ?assertEqual(1, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list_local, [<<"/">>]))),
+
+ rabbit_ct_broker_helpers:drain_node(Config, A),
+ rabbit_ct_helpers:await_condition(
+ fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ ?assertEqual(0, length(rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list_local, [<<"/">>]))),
+
+ rabbit_ct_broker_helpers:revive_node(Config, A),
+ %% rabbit_ct_broker_helpers:set_ha_policy/4 uses pattern for policy name
+ rabbit_ct_broker_helpers:clear_policy(Config, A, PolicyPattern).
+
+quorum_queue_leadership_transfer(Config) ->
+ [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ct:pal("Picked node ~s for maintenance tests...", [A]),
+
+ rabbit_ct_helpers:await_condition(
+ fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ Conn = rabbit_ct_client_helpers:open_connection(Config, A),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QName = <<"qq.1">>,
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true, arguments = [
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ]}),
+
+ %% we cannot assert on the number of local leaders here: declaring a QQ on node A
+ %% does not guarantee that the leader will be hosted on node A
+
+ rabbit_ct_broker_helpers:drain_node(Config, A),
+ rabbit_ct_helpers:await_condition(
+ fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, A) end, 10000),
+
+ %% quorum queue leader election is asynchronous
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ LocalLeaders = rabbit_ct_broker_helpers:rpc(Config, A,
+ rabbit_amqqueue, list_local_leaders, []),
+ length(LocalLeaders) =:= 0
+ end, 20000),
+
+ rabbit_ct_broker_helpers:revive_node(Config, A).
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
index c8ea773682..6a346c5618 100644
--- a/test/queue_master_location_SUITE.erl
+++ b/test/queue_master_location_SUITE.erl
@@ -34,7 +34,8 @@
all() ->
[
- {group, cluster_size_3}
+ {group, cluster_size_3},
+ {group, maintenance_mode}
].
groups() ->
@@ -51,11 +52,19 @@ groups() ->
calculate_min_master_with_bindings,
calculate_random,
calculate_client_local
- ]}
+ ]},
+
+ {maintenance_mode, [], [
+ declare_with_min_masters_and_some_nodes_under_maintenance,
+ declare_with_min_masters_and_all_nodes_under_maintenance,
+
+ declare_with_random_and_some_nodes_under_maintenance,
+ declare_with_random_and_all_nodes_under_maintenance
+ ]}
].
%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
+%% Test suite setup/teardown
%% -------------------------------------------------------------------
init_per_suite(Config) ->
@@ -67,7 +76,12 @@ end_per_suite(Config) ->
init_per_group(cluster_size_3, Config) ->
rabbit_ct_helpers:set_config(Config, [
- {rmq_nodes_count, 3} %% Replaced with a list of node names later.
+ %% Replaced with a list of node names later
+ {rmq_nodes_count, 3}
+ ]);
+init_per_group(maintenance_mode, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
]).
end_per_group(_, Config) ->
@@ -98,7 +112,7 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
-%% Testcases.
+%% Test cases
%% -------------------------------------------------------------------
%%
@@ -199,12 +213,71 @@ declare_config(Config) ->
setup_test_environment(Config),
set_location_config(Config, <<"min-masters">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
- declare(Config, QueueName, false, false, _Args=[], none),
+ declare(Config, QueueName, false, false, _Args = [], none),
verify_min_master(Config, Q),
unset_location_config(Config),
ok.
%%
+%% Maintenance mode effects
+%%
+
+declare_with_min_masters_and_some_nodes_under_maintenance(Config) ->
+ set_location_policy(Config, ?POLICY, <<"min-masters">>),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1),
+
+ QName = <<"qm.tests.min_masters.maintenance.case1">>,
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% the only node that's not being drained
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1).
+
+declare_with_min_masters_and_all_nodes_under_maintenance(Config) ->
+ declare_with_all_nodes_under_maintenance(Config, <<"min-masters">>).
+
+declare_with_random_and_some_nodes_under_maintenance(Config) ->
+ set_location_policy(Config, ?POLICY, <<"random">>),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2),
+
+ QName = <<"qm.tests.random.maintenance.case1">>,
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% the only node that's not being drained
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2).
+
+declare_with_random_and_all_nodes_under_maintenance(Config) ->
+ declare_with_all_nodes_under_maintenance(Config, <<"random">>).
+
+declare_with_all_nodes_under_maintenance(Config, Locator) ->
+ set_location_policy(Config, ?POLICY, Locator),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2),
+
+ QName = rabbit_data_coercion:to_binary(
+ rabbit_misc:format("qm.tests.~s.maintenance.case2", [Locator])),
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% when queue master locator returns no node, the node that handles
+ %% the declaration method will be used as a fallback
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2).
+
+%%
%% Test 'calculations'
%%
@@ -333,8 +406,10 @@ unset_location_config(Config) ->
declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) ->
Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>],
- {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1),
- Queue.
+ case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1) of
+ {new, Queue} -> Queue;
+ Other -> Other
+ end.
bind(Config, QueueName, RoutingKey) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<"amq.direct">>),