summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_maintenance.erl11
-rw-r--r--src/rabbit_queue_location_client_local.erl6
-rw-r--r--src/rabbit_queue_location_min_masters.erl34
-rw-r--r--src/rabbit_queue_location_random.erl14
-rw-r--r--src/rabbit_queue_master_location_misc.erl1
-rw-r--r--test/queue_master_location_SUITE.erl90
7 files changed, 130 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8855c8bbf7..532f8894d0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) ->
_ ->
case rabbit_queue_master_location_misc:get_location(Q) of
{ok, Node0} -> Node0;
+ undefined -> Node;
{error, _} -> Node
end
end,
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 108381039d..3361173e11 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -23,6 +23,7 @@
unmark_as_being_drained/0,
is_being_drained_local_read/1,
is_being_drained_consistent_read/1,
+ filter_out_drained_nodes/1,
suspend_all_client_listeners/0,
resume_all_client_listeners/0,
close_all_client_connections/0]).
@@ -71,18 +72,24 @@ is_being_drained_local_read(Node) ->
case mnesia:dirty_read(?TABLE, Node) of
[] -> false;
[#node_maintenance_state{node = Node, status = Status}] ->
- Status =:= ?DRAINING_STATUS
+ Status =:= ?DRAINING_STATUS;
+ _ -> false
end.
-spec is_being_drained_consistent_read(node()) -> boolean().
is_being_drained_consistent_read(Node) ->
case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of
- {atomic, []} -> false;
+ {atomic, []} -> false;
{atomic, [#node_maintenance_state{node = Node, status = Status}]} ->
Status =:= ?DRAINING_STATUS;
+ {atomic, _} -> false;
{aborted, _Reason} -> false
end.
+ -spec filter_out_drained_nodes([node()]) -> [node()].
+filter_out_drained_nodes(Nodes) ->
+ lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes).
+
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
%% Pauses all listeners on the current node except for
%% Erlang distribution (clustering and CLI tools).
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
index 3eb3d24217..2df1608534 100644
--- a/src/rabbit_queue_location_client_local.erl
+++ b/src/rabbit_queue_location_client_local.erl
@@ -30,4 +30,10 @@ description() ->
[{description, <<"Locate queue master node as the client local node">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
+ %% unlike with other locator strategies we do not check node maintenance
+ %% status for two reasons:
+ %%
+ %% * nodes in maintenance mode will drop their client connections
+ %% * with other strategies, if no nodes are available, the current node
+ %% is returned but this strategy already does just that
{ok, node()}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index 94002cf580..6535f082fe 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -32,7 +32,7 @@ description() ->
queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
QueueNames = rabbit_amqqueue:list_names(),
- MastersPerNode = lists:foldl(
+ MastersPerNode0 = lists:foldl(
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
{ok, Master} when is_atom(Master) ->
@@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) ->
end,
maps:from_list([{N, 0} || N <- Cluster]),
QueueNames),
+
+ MastersPerNode = maps:filter(fun (Node, _N) ->
+ not rabbit_maintenance:is_being_drained_local_read(Node)
+ end, MastersPerNode0),
- {MinNode, _NMasters} = maps:fold(
- fun(Node, NMasters, init) ->
- {Node, NMasters};
- (Node, NMasters, {MinNode, MinMasters}) ->
- case NMasters < MinMasters of
- true -> {Node, NMasters};
- false -> {MinNode, MinMasters}
- end
- end,
- init,
- MastersPerNode),
- {ok, MinNode}.
+ case map_size(MastersPerNode) > 0 of
+ true ->
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init, MastersPerNode),
+ {ok, MinNode};
+ false ->
+ undefined
+ end.
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 9f2445ceac..1c3ca7c3a6 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -30,7 +30,13 @@ description() ->
<<"Locate queue master node from cluster in a random manner">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
- MasterNode = lists:nth(RandomPos + 1, Cluster),
- {ok, MasterNode}.
+ Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q),
+ Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0),
+ case Cluster of
+ [] ->
+ undefined;
+ Candidates when is_list(Candidates) ->
+ RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
+ MasterNode = lists:nth(RandomPos + 1, Candidates),
+ {ok, MasterNode}
+ end.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index 7b65ef0d45..37698e184f 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -18,6 +18,7 @@
get_location_mod_by_policy/1,
all_nodes/1]).
+-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}.
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
index c8ea773682..d7ce088632 100644
--- a/test/queue_master_location_SUITE.erl
+++ b/test/queue_master_location_SUITE.erl
@@ -34,7 +34,8 @@
all() ->
[
- {group, cluster_size_3}
+ {group, cluster_size_3},
+ {group, maintenance_mode}
].
groups() ->
@@ -51,11 +52,19 @@ groups() ->
calculate_min_master_with_bindings,
calculate_random,
calculate_client_local
- ]}
+ ]},
+
+ {maintenance_mode, [], [
+ declare_with_min_masters_and_some_nodes_under_maintenance,
+ declare_with_min_masters_and_all_nodes_under_maintenance,
+
+ declare_with_random_and_some_nodes_under_maintenance,
+ declare_with_random_and_all_nodes_under_maintenance
+ ]}
].
%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
+%% Test suite setup/teardown
%% -------------------------------------------------------------------
init_per_suite(Config) ->
@@ -67,7 +76,12 @@ end_per_suite(Config) ->
init_per_group(cluster_size_3, Config) ->
rabbit_ct_helpers:set_config(Config, [
- {rmq_nodes_count, 3} %% Replaced with a list of node names later.
+ %% Replaced with a list of node names later
+ {rmq_nodes_count, 3}
+ ]);
+init_per_group(maintenance_mode, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
]).
end_per_group(_, Config) ->
@@ -98,7 +112,7 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
-%% Testcases.
+%% Test cases
%% -------------------------------------------------------------------
%%
@@ -199,12 +213,70 @@ declare_config(Config) ->
setup_test_environment(Config),
set_location_config(Config, <<"min-masters">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
- declare(Config, QueueName, false, false, _Args=[], none),
+ declare(Config, QueueName, false, false, _Args = [], none),
verify_min_master(Config, Q),
unset_location_config(Config),
ok.
%%
+%% Maintenance mode effects
+%%
+
+declare_with_min_masters_and_some_nodes_under_maintenance(Config) ->
+ set_location_policy(Config, ?POLICY, <<"min-masters">>),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1),
+
+ QName = <<"qm.tests.min_masters.maintenance.case1">>,
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% the only node that's not being drained
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1).
+
+declare_with_min_masters_and_all_nodes_under_maintenance(Config) ->
+ declare_with_all_nodes_under_maintenance(Config, <<"min-masters">>).
+
+declare_with_random_and_some_nodes_under_maintenance(Config) ->
+ set_location_policy(Config, ?POLICY, <<"random">>),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2),
+
+ QName = <<"qm.tests.min_masters.maintenance.case1">>,
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% the only node that's not being drained
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2).
+
+declare_with_random_and_all_nodes_under_maintenance(Config) ->
+ declare_with_all_nodes_under_maintenance(Config, <<"random">>).
+
+declare_with_all_nodes_under_maintenance(Config, Locator) ->
+ set_location_policy(Config, ?POLICY, Locator),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1),
+ rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2),
+
+ QName = <<"qm.tests.min_masters.maintenance.case2">>,
+ Resource = rabbit_misc:r(<<"/">>, queue, QName),
+ Record = declare(Config, Resource, false, false, _Args = [], none),
+ %% when queue master locator returns no node, the node that handles
+ %% the declaration method will be used as a fallback
+ ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ node(amqqueue:get_pid(Record))),
+
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1),
+ rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2).
+
+%%
%% Test 'calculations'
%%
@@ -333,8 +405,10 @@ unset_location_config(Config) ->
declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) ->
Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>],
- {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1),
- Queue.
+ case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1) of
+ {new, Queue} -> Queue;
+ Other -> Other
+ end.
bind(Config, QueueName, RoutingKey) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<"amq.direct">>),