diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-08-30 18:14:32 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-09-02 04:30:04 +0300 |
| commit | c6a5fb6ae5c61275ac994d0c57377e386d88228a (patch) | |
| tree | 92016038da30ef1931d1bd67ca1ce812002e5d66 | |
| parent | b80de17a87ba3c354a367e9b664f38ed3c30828d (diff) | |
| download | rabbitmq-server-git-c6a5fb6ae5c61275ac994d0c57377e386d88228a.tar.gz | |
Refer to running nodes using rabbit_nodes:all_running/0
to simplify the move to Mnevis in the future.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel_tracking.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mnesia_rename.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tracking.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 2 |
12 files changed, 34 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9818e689de..4b796b9d66 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -529,7 +529,7 @@ rebalance(Type, VhostSpec, QueueSpec) -> maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", [Type, VhostSpec, QueueSpec]), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), NumRunning = length(Running), ToRebalance = [Q || Q <- rabbit_amqqueue:list(), filter_per_type(Type, Q), @@ -1308,7 +1308,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:await_emitters_termination(Pids). collect_info_all(VHostPath, Items) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), Ref = make_ref(), Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ], rabbit_control_misc:await_emitters_termination(Pids), @@ -1896,7 +1896,7 @@ node_permits_offline_promotion(Node) -> case node() of Node -> not rabbit:is_running(); %% [1] _ -> All = rabbit_mnesia:cluster_nodes(all), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), lists:member(Node, All) andalso not lists:member(Node, Running) %% [2] end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9a56e70db8..74d400950e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -354,9 +354,8 @@ send_drained(Pid, CTagCredit) -> -spec list() -> [pid()]. list() -> - Running = rabbit_mnesia:cluster_nodes(running), - rabbit_misc:append_rpc_all_nodes(Running, - rabbit_channel, list_local, [], ?RPC_TIMEOUT). + Nodes = rabbit_nodes:all_running(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT). -spec list_local() -> [pid()]. diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl index d75ff6dfb8..2dbc4d0cf3 100644 --- a/src/rabbit_channel_tracking.erl +++ b/src/rabbit_channel_tracking.erl @@ -178,7 +178,7 @@ list() -> fun (Node, Acc) -> Tab = tracked_channel_table_name_for(Node), Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). + end, [], rabbit_nodes:all_running()). -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 85ce317bba..33938a90cc 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -331,7 +331,7 @@ get_all_tracked_connection_table_names_for_node(Node) -> -spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'. lookup(Name) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), lookup(Name, Nodes). lookup(_, []) -> @@ -350,7 +350,7 @@ list() -> fun (Node, Acc) -> Tab = tracked_connection_table_name_for(Node), Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). + end, [], rabbit_nodes:all_running()). -spec count() -> non_neg_integer(). @@ -359,7 +359,7 @@ count() -> fun (Node, Acc) -> Tab = tracked_connection_table_name_for(Node), Acc + mnesia:table_info(Tab, size) - end, 0, rabbit_mnesia:cluster_nodes(running)). + end, 0, rabbit_nodes:all_running()). -spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 8a76c8074c..3fc2d75908 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -44,9 +44,8 @@ list_local() -> -spec list() -> [pid()]. list() -> - Running = rabbit_mnesia:cluster_nodes(running), - rabbit_misc:append_rpc_all_nodes(Running, - rabbit_direct, list_local, [], ?RPC_TIMEOUT). + Nodes = rabbit_nodes:all_running(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9676ec3716..02f590e2fb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -152,7 +152,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> slaves_to_start_on_failure(Q, DeadGMPids) -> %% In case Mnesia has not caught up yet, filter out nodes we know %% to be dead.. - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + ClusterNodes = rabbit_nodes:all_running() -- [node(P) || P <- DeadGMPids], {_, OldNodes, _} = actual_queue_nodes(Q), {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), @@ -234,16 +234,16 @@ add_mirror(QName, MirrorNode, SyncMode) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of {ok, _} -> - try + try SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), rabbit_mirror_queue_slave:go(SPid, SyncMode) - of + of _ -> ok catch - error:QError -> + error:QError -> log_warning(QName, "Unable to start queue mirror on node '~p'. " "Target queue supervisor is not running: ~p~n", @@ -320,7 +320,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) -> %% a long time without being removed. update_recoverable(SPids, RS) -> SNodes = [node(SPid) || SPid <- SPids], - RunningNodes = rabbit_mnesia:cluster_nodes(running), + RunningNodes = rabbit_nodes:all_running(), AddNodes = SNodes -- RS, DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave (RS -- DelNodes) ++ AddNodes. @@ -384,7 +384,7 @@ suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_runni suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). %% The third argument exists so we can pull a call to -%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction +%% rabbit_nodes:all_running() out of a loop or transaction %% or both. suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) -> Owner = amqqueue:get_exclusive_owner(Q), diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index 341e639f45..e0d88c0f5e 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -233,7 +233,7 @@ update_term(_NodeMap, Term) -> rename_in_running_mnesia(FromNode, ToNode) -> All = rabbit_mnesia:cluster_nodes(all), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), case {lists:member(FromNode, Running), lists:member(ToNode, All)} of {false, true} -> ok; {true, _} -> exit({old_node_running, FromNode}); diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9c5096746d..30f4554cbf 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -390,9 +390,8 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). -spec connections() -> [rabbit_types:connection()]. connections() -> - Running = rabbit_mnesia:cluster_nodes(running), - rabbit_misc:append_rpc_all_nodes(Running, - rabbit_networking, connections_local, [], ?RPC_TIMEOUT). + Nodes = rabbit_nodes:all_running(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, connections_local, [], ?RPC_TIMEOUT). -spec local_connections() -> [rabbit_types:connection()]. %% @doc Returns pids of AMQP 0-9-1 and AMQP 1.0 connections local to this node. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 3408ea5a51..b56180c54c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -162,7 +162,7 @@ notify_node_up() -> -spec notify_joined_cluster() -> 'ok'. notify_joined_cluster() -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + Nodes = rabbit_nodes:all_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, {joined_cluster, node(), rabbit_mnesia:node_type()}), ok. @@ -170,7 +170,7 @@ notify_joined_cluster() -> -spec notify_left_cluster(node()) -> 'ok'. notify_left_cluster(Node) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), ok. @@ -375,7 +375,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(notify_node_up, State = #state{guid = GUID}) -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + Nodes = rabbit_nodes:all_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, {node_up, node(), rabbit_mnesia:node_type(), GUID}), %% register other active rabbits with this rabbit @@ -425,7 +425,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso + case lists:member(Node, rabbit_nodes:all_running()) andalso maps:find(Node, GUIDs) =:= {ok, NodeGUID} of true -> spawn_link( %%[1] fun () -> @@ -572,7 +572,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, Node, node(), DownGUID, CheckGUID, MyGUID}) end, case maps:find(Node, GUIDs) of - {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) + {ok, DownGUID} -> Alive = rabbit_nodes:all_running() -- [node(), Node], [case maps:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); @@ -771,7 +771,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, %% going away. It's only safe to forget anything about partitions when %% there are no partitions. Down = Partitions -- alive_rabbit_nodes(), - NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running), + NoLongerPartitioned = rabbit_nodes:all_running(), Partitions1 = case Partitions -- Down -- NoLongerPartitioned of [] -> []; _ -> Partitions @@ -853,7 +853,7 @@ disconnect(Node) -> %%-------------------------------------------------------------------- %% mnesia:system_info(db_nodes) (and hence -%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes +%% rabbit_nodes:all_running()) does not return all nodes %% when partitioned, just those that we are sharing Mnesia state %% with. So we have a small set of replacement functions %% here. "rabbit" in a function's name implies we test if the rabbit @@ -917,7 +917,7 @@ ping_all() -> ok. possibly_partitioned_nodes() -> - alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running). + alive_rabbit_nodes() -- rabbit_nodes:all_running(). startup_log([]) -> rabbit_log:info("Starting rabbit_node_monitor~n", []); diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6eff110ca3..915cf9d527 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -862,7 +862,7 @@ add_member(VHost, Name, Node, Timeout) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> QNodes = get_nodes(Q), - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of + case lists:member(Node, rabbit_nodes:all_running()) of false -> {error, node_not_running}; true -> @@ -993,7 +993,7 @@ shrink_all(Node) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy) -> - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), [begin Size = length(get_nodes(Q)), QName = amqqueue:get_name(Q), diff --git a/src/rabbit_tracking.erl b/src/rabbit_tracking.erl index f17778d987..a124d20226 100644 --- a/src/rabbit_tracking.erl +++ b/src/rabbit_tracking.erl @@ -56,7 +56,7 @@ count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) -> [ContextMsg, Key, Node, Err]), Acc end - end, 0, rabbit_mnesia:cluster_nodes(running)). + end, 0, rabbit_nodes:all_running()). -spec match_tracked_items(function(), tuple()) -> term(). @@ -67,7 +67,7 @@ match_tracked_items(TableNameFun, MatchSpec) -> Acc ++ mnesia:dirty_match_object( Tab, MatchSpec) - end, [], rabbit_mnesia:cluster_nodes(running)). + end, [], rabbit_nodes:all_running()). -spec clear_tracking_table(atom()) -> ok. @@ -92,7 +92,7 @@ delete_tracking_table(TableName, Node, ContextMsg) -> -spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok. delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) -> - ClusterNodes = rabbit_mnesia:cluster_nodes(running), + ClusterNodes = rabbit_nodes:all_running(), ExistsInCluster = lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes), case ExistsInCluster of diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ba72ecaebc..b1b128fecc 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -145,7 +145,7 @@ run_mnesia_upgrades(Upgrades, AllNodes) -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()], + AfterUs = rabbit_nodes:all_running() -- [node()], case {node_type_legacy(), AfterUs} of {disc, []} -> primary; |
