summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-06-15 23:12:51 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-07-14 03:50:30 +0300
commita96d280855a1fb0c0b5e79501b4a9b4da9e838fe (patch)
treeda672365e24e011fb10c8a2d4dc0a2bc3dfcf69c /src
parent10dc8b29687e132cee6e8bc910f1b043969d5c1e (diff)
downloadrabbitmq-server-git-a96d280855a1fb0c0b5e79501b4a9b4da9e838fe.tar.gz
Integrate node maintenance information into queue master locators
For cases where no node is eligible we use the local node. This should only be possible during a brief window of time before a node that's been drained drops its connections. However, be benign and optimistically let the queue operation proceed. Part of rabbitmq/rabbitmq-server#2321
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_maintenance.erl11
-rw-r--r--src/rabbit_queue_location_client_local.erl6
-rw-r--r--src/rabbit_queue_location_min_masters.erl34
-rw-r--r--src/rabbit_queue_location_random.erl14
-rw-r--r--src/rabbit_queue_master_location_misc.erl1
6 files changed, 48 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8855c8bbf7..532f8894d0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -313,6 +313,7 @@ declare_classic_queue(Q, Node) ->
_ ->
case rabbit_queue_master_location_misc:get_location(Q) of
{ok, Node0} -> Node0;
+ undefined -> Node;
{error, _} -> Node
end
end,
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 108381039d..3361173e11 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -23,6 +23,7 @@
unmark_as_being_drained/0,
is_being_drained_local_read/1,
is_being_drained_consistent_read/1,
+ filter_out_drained_nodes/1,
suspend_all_client_listeners/0,
resume_all_client_listeners/0,
close_all_client_connections/0]).
@@ -71,18 +72,24 @@ is_being_drained_local_read(Node) ->
case mnesia:dirty_read(?TABLE, Node) of
[] -> false;
[#node_maintenance_state{node = Node, status = Status}] ->
- Status =:= ?DRAINING_STATUS
+ Status =:= ?DRAINING_STATUS;
+ _ -> false
end.
-spec is_being_drained_consistent_read(node()) -> boolean().
is_being_drained_consistent_read(Node) ->
case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of
- {atomic, []} -> false;
+ {atomic, []} -> false;
{atomic, [#node_maintenance_state{node = Node, status = Status}]} ->
Status =:= ?DRAINING_STATUS;
+ {atomic, _} -> false;
{aborted, _Reason} -> false
end.
+ -spec filter_out_drained_nodes([node()]) -> [node()].
+filter_out_drained_nodes(Nodes) ->
+ lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes).
+
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
%% Pauses all listeners on the current node except for
%% Erlang distribution (clustering and CLI tools).
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
index 3eb3d24217..2df1608534 100644
--- a/src/rabbit_queue_location_client_local.erl
+++ b/src/rabbit_queue_location_client_local.erl
@@ -30,4 +30,10 @@ description() ->
[{description, <<"Locate queue master node as the client local node">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
+ %% unlike with other locator strategies we do not check node maintenance
+ %% status for two reasons:
+ %%
+ %% * nodes in maintenance mode will drop their client connections
+ %% * with other strategies, if no nodes are available, the current node
+ %% is returned but this strategy already does just that
{ok, node()}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index 94002cf580..6535f082fe 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -32,7 +32,7 @@ description() ->
queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
QueueNames = rabbit_amqqueue:list_names(),
- MastersPerNode = lists:foldl(
+ MastersPerNode0 = lists:foldl(
fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
{ok, Master} when is_atom(Master) ->
@@ -47,16 +47,24 @@ queue_master_location(Q) when ?is_amqqueue(Q) ->
end,
maps:from_list([{N, 0} || N <- Cluster]),
QueueNames),
+
+ MastersPerNode = maps:filter(fun (Node, _N) ->
+ not rabbit_maintenance:is_being_drained_local_read(Node)
+ end, MastersPerNode0),
- {MinNode, _NMasters} = maps:fold(
- fun(Node, NMasters, init) ->
- {Node, NMasters};
- (Node, NMasters, {MinNode, MinMasters}) ->
- case NMasters < MinMasters of
- true -> {Node, NMasters};
- false -> {MinNode, MinMasters}
- end
- end,
- init,
- MastersPerNode),
- {ok, MinNode}.
+ case map_size(MastersPerNode) > 0 of
+ true ->
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init, MastersPerNode),
+ {ok, MinNode};
+ false ->
+ undefined
+ end.
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 9f2445ceac..1c3ca7c3a6 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -30,7 +30,13 @@ description() ->
<<"Locate queue master node from cluster in a random manner">>}].
queue_master_location(Q) when ?is_amqqueue(Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
- MasterNode = lists:nth(RandomPos + 1, Cluster),
- {ok, MasterNode}.
+ Cluster0 = rabbit_queue_master_location_misc:all_nodes(Q),
+ Cluster = rabbit_maintenance:filter_out_drained_nodes(Cluster0),
+ case Cluster of
+ [] ->
+ undefined;
+ Candidates when is_list(Candidates) ->
+ RandomPos = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
+ MasterNode = lists:nth(RandomPos + 1, Candidates),
+ {ok, MasterNode}
+ end.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index 7b65ef0d45..37698e184f 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -18,6 +18,7 @@
get_location_mod_by_policy/1,
all_nodes/1]).
+-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}.
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),