diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 1 |
6 files changed, 48 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8855c8bbf7..532f8894d0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) -> _ -> case rabbit_queue_master_location_misc:get_location(Q) of {ok, Node0} -> Node0; + undefined -> Node; {error, _} -> Node end end, diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 108381039d..3361173e11 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -23,6 +23,7 @@ unmark_as_being_drained/0, is_being_drained_local_read/1, is_being_drained_consistent_read/1, + filter_out_drained_nodes/1, suspend_all_client_listeners/0, resume_all_client_listeners/0, close_all_client_connections/0]). @@ -71,18 +72,24 @@ is_being_drained_local_read(Node) -> case mnesia:dirty_read(?TABLE, Node) of [] -> false; [#node_maintenance_state{node = Node, status = Status}] -> - Status =:= ?DRAINING_STATUS + Status =:= ?DRAINING_STATUS; + _ -> false end. -spec is_being_drained_consistent_read(node()) -> boolean(). is_being_drained_consistent_read(Node) -> case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of - {atomic, []} -> false; + {atomic, []} -> false; {atomic, [#node_maintenance_state{node = Node, status = Status}]} -> Status =:= ?DRAINING_STATUS; + {atomic, _} -> false; {aborted, _Reason} -> false end. + -spec filter_out_drained_nodes([node()]) -> [node()]. +filter_out_drained_nodes(Nodes) -> + lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes). + -spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()). %% Pauses all listeners on the current node except for %% Erlang distribution (clustering and CLI tools). diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl index 3eb3d24217..2df1608534 100644 --- a/src/rabbit_queue_location_client_local.erl +++ b/src/rabbit_queue_location_client_local.erl @@ -30,4 +30,10 @@ description() -> [{description, <<"Locate queue master node as the client local node">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> + %% unlike with other locator strategies we do not check node maintenance + %% status for two reasons: + %% + %% * nodes in maintenance mode will drop their client connections + %% * with other strategies, if no nodes are available, the current node + %% is returned but this strategy already does just that {ok, node()}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index 94002cf580..6535f082fe 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -32,7 +32,7 @@ description() -> queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster = rabbit_queue_master_location_misc:all_nodes(Q), QueueNames = rabbit_amqqueue:list_names(), - MastersPerNode = lists:foldl( + MastersPerNode0 = lists:foldl( fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of {ok, Master} when is_atom(Master) -> @@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) -> end, maps:from_list([{N, 0} || N <- Cluster]), QueueNames), + + MastersPerNode = maps:filter(fun (Node, _N) -> + not rabbit_maintenance:is_being_drained_local_read(Node) + end, MastersPerNode0), - {MinNode, _NMasters} = maps:fold( - fun(Node, NMasters, init) -> - {Node, NMasters}; - (Node, NMasters, {MinNode, MinMasters}) -> - case NMasters < MinMasters of - true -> {Node, NMasters}; - false -> {MinNode, MinMasters} - end - end, - init, - MastersPerNode), - {ok, MinNode}. + case map_size(MastersPerNode) > 0 of + true -> + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, MastersPerNode), + {ok, MinNode}; + false -> + undefined + end. diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 9f2445ceac..1c3ca7c3a6 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -30,7 +30,13 @@ description() -> <<"Locate queue master node from cluster in a random manner">>}]. queue_master_location(Q) when ?is_amqqueue(Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), - MasterNode = lists:nth(RandomPos + 1, Cluster), - {ok, MasterNode}. + Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q), + Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0), + case Cluster of + [] -> + undefined; + Candidates when is_list(Candidates) -> + RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)), + MasterNode = lists:nth(RandomPos + 1, Candidates), + {ok, MasterNode} + end. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index 7b65ef0d45..37698e184f 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -18,6 +18,7 @@ get_location_mod_by_policy/1, all_nodes/1]). +-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}. lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin), |
