diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 1 | ||||
| -rw-r--r-- | test/queue_master_location_SUITE.erl | 90 |
7 files changed, 130 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8855c8bbf7..532f8894d0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) -> _ -> case rabbit_queue_master_location_misc:get_location(Q) of {ok, Node0} -> Node0; + undefined -> Node; {error, _} -> Node end end, diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 108381039d..3361173e11 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -23,6 +23,7 @@ unmark_as_being_drained/0, is_being_drained_local_read/1, is_being_drained_consistent_read/1, + filter_out_drained_nodes/1, suspend_all_client_listeners/0, resume_all_client_listeners/0, close_all_client_connections/0]). @@ -71,18 +72,24 @@ is_being_drained_local_read(Node) -> case mnesia:dirty_read(?TABLE, Node) of [] -> false; [#node_maintenance_state{node = Node, status = Status}] -> - Status =:= ?DRAINING_STATUS + Status =:= ?DRAINING_STATUS; + _ -> false end. -spec is_being_drained_consistent_read(node()) -> boolean(). is_being_drained_consistent_read(Node) -> case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of - {atomic, []} -> false; + {atomic, []} -> false; {atomic, [#node_maintenance_state{node = Node, status = Status}]} -> Status =:= ?DRAINING_STATUS; + {atomic, _} -> false; {aborted, _Reason} -> false end. + -spec filter_out_drained_nodes([node()]) -> [node()]. +filter_out_drained_nodes(Nodes) -> + lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes). + -spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). %% Pauses all listeners on the current node except for %% Erlang distribution (clustering and CLI tools). diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl index 3eb3d24217..2df1608534 100644 --- a/src/rabbit_queue_location_client_local.erl +++ b/src/rabbit_queue_location_client_local.erl @@ -30,4 +30,10 @@ description() -> [{description, <<"Locate queue master node as the client local node">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> + %% unlike with other locator strategies we do not check node maintenance + %% status for two reasons: + %% + %% * nodes in maintenance mode will drop their client connections + %% * with other strategies, if no nodes are available, the current node + %% is returned but this strategy already does just that {ok, node()}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index 94002cf580..6535f082fe 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -32,7 +32,7 @@ description() -> queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster = rabbit_queue_master_location_misc:all_nodes(Q), QueueNames = rabbit_amqqueue:list_names(), - MastersPerNode = lists:foldl( + MastersPerNode0 = lists:foldl( fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of {ok, Master} when is_atom(Master) -> @@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) -> end, maps:from_list([{N, 0} || N <- Cluster]), QueueNames), + + MastersPerNode = maps:filter(fun (Node, _N) -> + not rabbit_maintenance:is_being_drained_local_read(Node) + end, MastersPerNode0), - {MinNode, _NMasters} = maps:fold( - fun(Node, NMasters, init) -> - {Node, NMasters}; - (Node, NMasters, {MinNode, MinMasters}) -> - case NMasters < MinMasters of - true -> {Node, NMasters}; - false -> {MinNode, MinMasters} - end - end, - init, - MastersPerNode), - {ok, MinNode}. + case map_size(MastersPerNode) > 0 of + true -> + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, MastersPerNode), + {ok, MinNode}; + false -> + undefined + end. diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 9f2445ceac..1c3ca7c3a6 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -30,7 +30,13 @@ description() -> <<"Locate queue master node from cluster in a random manner">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), - MasterNode = lists:nth(RandomPos + 1, Cluster), - {ok, MasterNode}. + Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q), + Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0), + case Cluster of + [] -> + undefined; + Candidates when is_list(Candidates) -> + RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)), + MasterNode = lists:nth(RandomPos + 1, Candidates), + {ok, MasterNode} + end. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index 7b65ef0d45..37698e184f 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -18,6 +18,7 @@ get_location_mod_by_policy/1, all_nodes/1]). +-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}. lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin), diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index c8ea773682..d7ce088632 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -34,7 +34,8 @@ all() -> [ - {group, cluster_size_3} + {group, cluster_size_3}, + {group, maintenance_mode} ]. groups() -> @@ -51,11 +52,19 @@ groups() -> calculate_min_master_with_bindings, calculate_random, calculate_client_local - ]} + ]}, + + {maintenance_mode, [], [ + declare_with_min_masters_and_some_nodes_under_maintenance, + declare_with_min_masters_and_all_nodes_under_maintenance, + + declare_with_random_and_some_nodes_under_maintenance, + declare_with_random_and_all_nodes_under_maintenance + ]} ]. %% ------------------------------------------------------------------- -%% Testsuite setup/teardown. +%% Test suite setup/teardown %% ------------------------------------------------------------------- init_per_suite(Config) -> @@ -67,7 +76,12 @@ end_per_suite(Config) -> init_per_group(cluster_size_3, Config) -> rabbit_ct_helpers:set_config(Config, [ - {rmq_nodes_count, 3} %% Replaced with a list of node names later. + %% Replaced with a list of node names later + {rmq_nodes_count, 3} + ]); +init_per_group(maintenance_mode, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 3} ]). end_per_group(_, Config) -> @@ -98,7 +112,7 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config1, Testcase). %% ------------------------------------------------------------------- -%% Testcases. +%% Test cases %% ------------------------------------------------------------------- %% @@ -199,12 +213,70 @@ declare_config(Config) -> setup_test_environment(Config), set_location_config(Config, <<"min-masters">>), QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), - declare(Config, QueueName, false, false, _Args=[], none), + declare(Config, QueueName, false, false, _Args = [], none), verify_min_master(Config, Q), unset_location_config(Config), ok. %% +%% Maintenance mode effects +%% + +declare_with_min_masters_and_some_nodes_under_maintenance(Config) -> + set_location_policy(Config, ?POLICY, <<"min-masters">>), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1), + + QName = <<"qm.tests.min_masters.maintenance.case1">>, + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% the only node that's not being drained + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1). + +declare_with_min_masters_and_all_nodes_under_maintenance(Config) -> + declare_with_all_nodes_under_maintenance(Config, <<"min-masters">>). + +declare_with_random_and_some_nodes_under_maintenance(Config) -> + set_location_policy(Config, ?POLICY, <<"random">>), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2), + + QName = <<"qm.tests.min_masters.maintenance.case1">>, + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% the only node that's not being drained + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2). + +declare_with_random_and_all_nodes_under_maintenance(Config) -> + declare_with_all_nodes_under_maintenance(Config, <<"random">>). + +declare_with_all_nodes_under_maintenance(Config, Locator) -> + set_location_policy(Config, ?POLICY, Locator), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 1), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, 2), + + QName = <<"qm.tests.min_masters.maintenance.case2">>, + Resource = rabbit_misc:r(<<"/">>, queue, QName), + Record = declare(Config, Resource, false, false, _Args = [], none), + %% when queue master locator returns no node, the node that handles + %% the declaration method will be used as a fallback + ?assertEqual(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + node(amqqueue:get_pid(Record))), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 0), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 1), + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, 2). + +%% %% Test 'calculations' %% @@ -333,8 +405,10 @@ unset_location_config(Config) -> declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) -> Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>], - {new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1), - Queue. + case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1) of + {new, Queue} -> Queue; + Other -> Other + end. bind(Config, QueueName, RoutingKey) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<"amq.direct">>), |
