diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 69 |
3 files changed, 139 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b77e0118e1..b3ae071b7b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,9 @@ emit_info_local/4, emit_info_down/4]). -export([count/0]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, + list_local_mirrored_classic_names/0, list_local_names_down/0, list_with_possible_retry/1]). --export([list_by_type/1]). +-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -44,8 +45,8 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. --export([list_local_followers/0, - get_quorum_nodes/1]). +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). -export([delete_immediately_by_resource/1]). @@ -955,8 +956,41 @@ is_down(Q) -> true end. + +-spec sample_local_queues() -> [amqqueue:amqqueue()]. +sample_local_queues() -> sample_n_by_name(list_local_names(), 300). + +-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n_by_name([], _N) -> + []; +sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 -> + %% lists:nth/2 throws when position is > list length + M = erlang:min(N, length(Names)), + Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 -> + Acc; + (_, Acc) -> + Pick = lists:nth(rand:uniform(M), Names), + [Pick | Acc] + end, + [], lists:seq(1, M)), + lists:map(fun (Id) -> + {ok, Q} = rabbit_amqqueue:lookup(Id), + Q + end, + lists:usort(Ids)). + +-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n([], _N) -> + []; +sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> + Names = [amqqueue:get_name(Q) || Q <- Queues], + sample_n_by_name(Names, N). + + -spec list_by_type(atom()) -> [amqqueue:amqqueue()]. +list_by_type(classic) -> list_by_type(rabbit_classic_queue); +list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( @@ -967,11 +1001,42 @@ list_by_type(Type) -> end), Qs. +-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()]. + +list_local_quorum_queue_names() -> + [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q)), + is_local_to_node(amqqueue:get_pid(Q), node())]. + +-spec list_local_quorum_queues() -> [amqqueue:amqqueue()]. +list_local_quorum_queues() -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q)), + is_local_to_node(amqqueue:get_pid(Q), node())]. + +-spec list_local_leaders() -> [amqqueue:amqqueue()]. +list_local_leaders() -> + [ Q || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. + +-spec list_local_followers() -> [amqqueue:amqqueue()]. list_local_followers() -> - [ amqqueue:get_name(Q) - || Q <- list(), + [ Q || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. +list_local_mirrored_classic_names() -> + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 326804e342..06cc401d32 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -80,7 +80,7 @@ gc_local_queues() -> GbSetDown = gb_sets:from_list(QueuesDown), gc_queue_metrics(GbSet, GbSetDown), gc_entity(queue_coarse_metrics, GbSet), - Followers = gb_sets:from_list(rabbit_amqqueue:list_local_followers()), + Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]), gc_leader_data(Followers). gc_leader_data(Followers) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 867af6cb75..e27bbb67dc 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -42,10 +42,12 @@ -export([transfer_leadership/2, get_replicas/1, queue_length/1]). -export([file_handle_leader_reservation/1, file_handle_other_reservation/0]). -export([file_handle_release_reservation/0]). +-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, + filter_quorum_critical/1, filter_quorum_critical/2, + all_replica_states/0]). -%%-include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-include("rabbit.hrl"). -include("amqqueue.hrl"). -type msg_id() :: non_neg_integer(). @@ -234,6 +236,69 @@ become_leader(QName, Name) -> end end). +-spec all_replica_states() -> {node(), #{atom() => atom()}}. +all_replica_states() -> + Rows = ets:tab2list(ra_state), + {node(), maps:from_list(Rows)}. + +-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()]. +list_with_minimum_quorum() -> + filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()). + +-spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()]. +list_with_minimum_quorum_for_cli() -> + QQs = list_with_minimum_quorum(), + [begin + #resource{name = Name} = amqqueue:get_name(Q), + #{ + <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)), + <<"name">> => Name, + <<"virtual_host">> => amqqueue:get_vhost(Q), + <<"type">> => <<"quorum">> + } + end || Q <- QQs]. + +-spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. +filter_quorum_critical(Queues) -> + %% Example map of QQ replica states: + %% #{rabbit@warp10 => + %% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader, + %% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader, + %% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader, + %% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader, + %% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader, + %% '%2F_qq.933' => leader,'%2F_qq.38' => leader, + %% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader, + %% '%2F_qq.1694' => leader,'%2F_qq.994' => leader, + %% '%2F_qq.490' => leader,'%2F_qq.1704' => leader, + %% '%2F_qq.58' => leader,'%2F_qq.564' => leader, + %% '%2F_qq.683' => leader,'%2F_qq.386' => leader, + %% '%2F_qq.753' => leader,'%2F_qq.6' => leader, + %% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader, + %% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}} + ReplicaStates = maps:from_list( + rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(), + ?MODULE, all_replica_states, [])), + filter_quorum_critical(Queues, ReplicaStates). + +-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()]. + +filter_quorum_critical(Queues, ReplicaStates) -> + lists:filter(fun (Q) -> + MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q), + {Name, _Node} = amqqueue:get_pid(Q), + AllUp = lists:filter(fun (N) -> + {Name, _} = amqqueue:get_pid(Q), + case maps:get(N, ReplicaStates, undefined) of + #{Name := State} when State =:= follower orelse State =:= leader -> + true; + _ -> false + end + end, MemberNodes), + MinQuorum = length(MemberNodes) div 2 + 1, + length(AllUp) =< MinQuorum + end, Queues). + rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), ets:delete(queue_metrics, QName), |
