summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-08-30 18:14:32 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-09-02 04:30:04 +0300
commitc6a5fb6ae5c61275ac994d0c57377e386d88228a (patch)
tree92016038da30ef1931d1bd67ca1ce812002e5d66
parentb80de17a87ba3c354a367e9b664f38ed3c30828d (diff)
downloadrabbitmq-server-git-c6a5fb6ae5c61275ac994d0c57377e386d88228a.tar.gz
Refer to running nodes using rabbit_nodes:all_running/0
to simplify the move to Mnevis in the future.
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_channel_tracking.erl2
-rw-r--r--src/rabbit_connection_tracking.erl6
-rw-r--r--src/rabbit_direct.erl5
-rw-r--r--src/rabbit_mirror_queue_misc.erl12
-rw-r--r--src/rabbit_mnesia_rename.erl2
-rw-r--r--src/rabbit_networking.erl5
-rw-r--r--src/rabbit_node_monitor.erl16
-rw-r--r--src/rabbit_quorum_queue.erl4
-rw-r--r--src/rabbit_tracking.erl6
-rw-r--r--src/rabbit_upgrade.erl2
12 files changed, 34 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9818e689de..4b796b9d66 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -529,7 +529,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
[Type, VhostSpec, QueueSpec]),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
filter_per_type(Type, Q),
@@ -1308,7 +1308,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:await_emitters_termination(Pids).
collect_info_all(VHostPath, Items) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
Ref = make_ref(),
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1896,7 +1896,7 @@ node_permits_offline_promotion(Node) ->
case node() of
Node -> not rabbit:is_running(); %% [1]
_ -> All = rabbit_mnesia:cluster_nodes(all),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
lists:member(Node, All) andalso
not lists:member(Node, Running) %% [2]
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9a56e70db8..74d400950e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -354,9 +354,8 @@ send_drained(Pid, CTagCredit) ->
-spec list() -> [pid()].
list() ->
- Running = rabbit_mnesia:cluster_nodes(running),
- rabbit_misc:append_rpc_all_nodes(Running,
- rabbit_channel, list_local, [], ?RPC_TIMEOUT).
+ Nodes = rabbit_nodes:all_running(),
+ rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
-spec list_local() -> [pid()].
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl
index d75ff6dfb8..2dbc4d0cf3 100644
--- a/src/rabbit_channel_tracking.erl
+++ b/src/rabbit_channel_tracking.erl
@@ -178,7 +178,7 @@ list() ->
fun (Node, Acc) ->
Tab = tracked_channel_table_name_for(Node),
Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
+ end, [], rabbit_nodes:all_running()).
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 85ce317bba..33938a90cc 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -331,7 +331,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
lookup(Name) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
lookup(Name, Nodes).
lookup(_, []) ->
@@ -350,7 +350,7 @@ list() ->
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
+ end, [], rabbit_nodes:all_running()).
-spec count() -> non_neg_integer().
@@ -359,7 +359,7 @@ count() ->
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
Acc + mnesia:table_info(Tab, size)
- end, 0, rabbit_mnesia:cluster_nodes(running)).
+ end, 0, rabbit_nodes:all_running()).
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 8a76c8074c..3fc2d75908 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -44,9 +44,8 @@ list_local() ->
-spec list() -> [pid()].
list() ->
- Running = rabbit_mnesia:cluster_nodes(running),
- rabbit_misc:append_rpc_all_nodes(Running,
- rabbit_direct, list_local, [], ?RPC_TIMEOUT).
+ Nodes = rabbit_nodes:all_running(),
+ rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9676ec3716..02f590e2fb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -152,7 +152,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
slaves_to_start_on_failure(Q, DeadGMPids) ->
%% In case Mnesia has not caught up yet, filter out nodes we know
%% to be dead..
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) --
+ ClusterNodes = rabbit_nodes:all_running() --
[node(P) || P <- DeadGMPids],
{_, OldNodes, _} = actual_queue_nodes(Q),
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -234,16 +234,16 @@ add_mirror(QName, MirrorNode, SyncMode) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
{ok, _} ->
- try
+ try
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
rabbit_mirror_queue_slave:go(SPid, SyncMode)
- of
+ of
_ -> ok
catch
- error:QError ->
+ error:QError ->
log_warning(QName,
"Unable to start queue mirror on node '~p'. "
"Target queue supervisor is not running: ~p~n",
@@ -320,7 +320,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
%% a long time without being removed.
update_recoverable(SPids, RS) ->
SNodes = [node(SPid) || SPid <- SPids],
- RunningNodes = rabbit_mnesia:cluster_nodes(running),
+ RunningNodes = rabbit_nodes:all_running(),
AddNodes = SNodes -- RS,
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.
@@ -384,7 +384,7 @@ suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_runni
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
%% The third argument exists so we can pull a call to
-%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction
+%% rabbit_nodes:all_running() out of a loop or transaction
%% or both.
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
Owner = amqqueue:get_exclusive_owner(Q),
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 341e639f45..e0d88c0f5e 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -233,7 +233,7 @@ update_term(_NodeMap, Term) ->
rename_in_running_mnesia(FromNode, ToNode) ->
All = rabbit_mnesia:cluster_nodes(all),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
{false, true} -> ok;
{true, _} -> exit({old_node_running, FromNode});
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9c5096746d..30f4554cbf 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -390,9 +390,8 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
-spec connections() -> [rabbit_types:connection()].
connections() ->
- Running = rabbit_mnesia:cluster_nodes(running),
- rabbit_misc:append_rpc_all_nodes(Running,
- rabbit_networking, connections_local, [], ?RPC_TIMEOUT).
+ Nodes = rabbit_nodes:all_running(),
+ rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, connections_local, [], ?RPC_TIMEOUT).
-spec local_connections() -> [rabbit_types:connection()].
%% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 3408ea5a51..b56180c54c 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -162,7 +162,7 @@ notify_node_up() ->
-spec notify_joined_cluster() -> 'ok'.
notify_joined_cluster() ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ Nodes = rabbit_nodes:all_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
@@ -170,7 +170,7 @@ notify_joined_cluster() ->
-spec notify_left_cluster(node()) -> 'ok'.
notify_left_cluster(Node) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
ok.
@@ -375,7 +375,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ Nodes = rabbit_nodes:all_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
%% register other active rabbits with this rabbit
@@ -425,7 +425,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
State = #state{guid = MyGUID,
node_guids = GUIDs}) ->
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
+ case lists:member(Node, rabbit_nodes:all_running()) andalso
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
true -> spawn_link( %%[1]
fun () ->
@@ -572,7 +572,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
Node, node(), DownGUID, CheckGUID, MyGUID})
end,
case maps:find(Node, GUIDs) of
- {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
+ {ok, DownGUID} -> Alive = rabbit_nodes:all_running()
-- [node(), Node],
[case maps:find(N, GUIDs) of
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
@@ -771,7 +771,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
%% going away. It's only safe to forget anything about partitions when
%% there are no partitions.
Down = Partitions -- alive_rabbit_nodes(),
- NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
+ NoLongerPartitioned = rabbit_nodes:all_running(),
Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
[] -> [];
_ -> Partitions
@@ -853,7 +853,7 @@ disconnect(Node) ->
%%--------------------------------------------------------------------
%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
+%% rabbit_nodes:all_running()) does not return all nodes
%% when partitioned, just those that we are sharing Mnesia state
%% with. So we have a small set of replacement functions
%% here. "rabbit" in a function's name implies we test if the rabbit
@@ -917,7 +917,7 @@ ping_all() ->
ok.
possibly_partitioned_nodes() ->
- alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
+ alive_rabbit_nodes() -- rabbit_nodes:all_running().
startup_log([]) ->
rabbit_log:info("Starting rabbit_node_monitor~n", []);
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6eff110ca3..915cf9d527 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -862,7 +862,7 @@ add_member(VHost, Name, Node, Timeout) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
QNodes = get_nodes(Q),
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
+ case lists:member(Node, rabbit_nodes:all_running()) of
false ->
{error, node_not_running};
true ->
@@ -993,7 +993,7 @@ shrink_all(Node) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy) ->
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
diff --git a/src/rabbit_tracking.erl b/src/rabbit_tracking.erl
index f17778d987..a124d20226 100644
--- a/src/rabbit_tracking.erl
+++ b/src/rabbit_tracking.erl
@@ -56,7 +56,7 @@ count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) ->
[ContextMsg, Key, Node, Err]),
Acc
end
- end, 0, rabbit_mnesia:cluster_nodes(running)).
+ end, 0, rabbit_nodes:all_running()).
-spec match_tracked_items(function(), tuple()) -> term().
@@ -67,7 +67,7 @@ match_tracked_items(TableNameFun, MatchSpec) ->
Acc ++ mnesia:dirty_match_object(
Tab,
MatchSpec)
- end, [], rabbit_mnesia:cluster_nodes(running)).
+ end, [], rabbit_nodes:all_running()).
-spec clear_tracking_table(atom()) -> ok.
@@ -92,7 +92,7 @@ delete_tracking_table(TableName, Node, ContextMsg) ->
-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok.
delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) ->
- ClusterNodes = rabbit_mnesia:cluster_nodes(running),
+ ClusterNodes = rabbit_nodes:all_running(),
ExistsInCluster =
lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
case ExistsInCluster of
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index ba72ecaebc..b1b128fecc 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -145,7 +145,7 @@ run_mnesia_upgrades(Upgrades, AllNodes) ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ AfterUs = rabbit_nodes:all_running() -- [node()],
case {node_type_legacy(), AfterUs} of
{disc, []} ->
primary;