summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl77
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_quorum_queue.erl69
3 files changed, 139 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b77e0118e1..b3ae071b7b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,9 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
+ list_local_mirrored_classic_names/0,
list_local_names_down/0, list_with_possible_retry/1]).
--export([list_by_type/1]).
+-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -44,8 +45,8 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
--export([list_local_followers/0,
- get_quorum_nodes/1]).
+-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
+ list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@@ -955,8 +956,41 @@ is_down(Q) ->
true
end.
+
+-spec sample_local_queues() -> [amqqueue:amqqueue()].
+sample_local_queues() -> sample_n_by_name(list_local_names(), 300).
+
+-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n_by_name([], _N) ->
+ [];
+sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 ->
+ %% lists:nth/2 throws when position is > list length
+ M = erlang:min(N, length(Names)),
+ Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 ->
+ Acc;
+ (_, Acc) ->
+ Pick = lists:nth(rand:uniform(M), Names),
+ [Pick | Acc]
+ end,
+ [], lists:seq(1, M)),
+ lists:map(fun (Id) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Id),
+ Q
+ end,
+ lists:usort(Ids)).
+
+-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n([], _N) ->
+ [];
+sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 ->
+ Names = [amqqueue:get_name(Q) || Q <- Queues],
+ sample_n_by_name(Names, N).
+
+
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
+list_by_type(classic) -> list_by_type(rabbit_classic_queue);
+list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
@@ -967,11 +1001,42 @@ list_by_type(Type) ->
end),
Qs.
+-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()].
+
+list_local_quorum_queue_names() ->
+ [ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q)),
+ is_local_to_node(amqqueue:get_pid(Q), node())].
+
+-spec list_local_quorum_queues() -> [amqqueue:amqqueue()].
+list_local_quorum_queues() ->
+ [ Q || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q)),
+ is_local_to_node(amqqueue:get_pid(Q), node())].
+
+-spec list_local_leaders() -> [amqqueue:amqqueue()].
+list_local_leaders() ->
+ [ Q || Q <- list(),
+ amqqueue:is_quorum(Q),
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
+
+-spec list_local_followers() -> [amqqueue:amqqueue()].
list_local_followers() ->
- [ amqqueue:get_name(Q)
- || Q <- list(),
+ [ Q || Q <- list(),
amqqueue:is_quorum(Q),
- amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))].
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(),
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
+list_local_mirrored_classic_names() ->
+ [ amqqueue:get_name(Q) || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q)].
+
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 326804e342..06cc401d32 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -80,7 +80,7 @@ gc_local_queues() ->
GbSetDown = gb_sets:from_list(QueuesDown),
gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
- Followers = gb_sets:from_list(rabbit_amqqueue:list_local_followers()),
+ Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
gc_leader_data(Followers).
gc_leader_data(Followers) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 867af6cb75..e27bbb67dc 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -42,10 +42,12 @@
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
+-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
+ filter_quorum_critical/1, filter_quorum_critical/2,
+ all_replica_states/0]).
-%%-include_lib("rabbit_common/include/rabbit.hrl").
--include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-include("rabbit.hrl").
-include("amqqueue.hrl").
-type msg_id() :: non_neg_integer().
@@ -234,6 +236,69 @@ become_leader(QName, Name) ->
end
end).
+-spec all_replica_states() -> {node(), #{atom() => atom()}}.
+all_replica_states() ->
+ Rows = ets:tab2list(ra_state),
+ {node(), maps:from_list(Rows)}.
+
+-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
+list_with_minimum_quorum() ->
+ filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()).
+
+-spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()].
+list_with_minimum_quorum_for_cli() ->
+ QQs = list_with_minimum_quorum(),
+ [begin
+ #resource{name = Name} = amqqueue:get_name(Q),
+ #{
+ <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)),
+ <<"name">> => Name,
+ <<"virtual_host">> => amqqueue:get_vhost(Q),
+ <<"type">> => <<"quorum">>
+ }
+ end || Q <- QQs].
+
+-spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
+filter_quorum_critical(Queues) ->
+ %% Example map of QQ replica states:
+ %% #{rabbit@warp10 =>
+ %% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader,
+ %% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader,
+ %% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader,
+ %% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader,
+ %% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader,
+ %% '%2F_qq.933' => leader,'%2F_qq.38' => leader,
+ %% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader,
+ %% '%2F_qq.1694' => leader,'%2F_qq.994' => leader,
+ %% '%2F_qq.490' => leader,'%2F_qq.1704' => leader,
+ %% '%2F_qq.58' => leader,'%2F_qq.564' => leader,
+ %% '%2F_qq.683' => leader,'%2F_qq.386' => leader,
+ %% '%2F_qq.753' => leader,'%2F_qq.6' => leader,
+ %% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader,
+ %% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}}
+ ReplicaStates = maps:from_list(
+ rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
+ ?MODULE, all_replica_states, [])),
+ filter_quorum_critical(Queues, ReplicaStates).
+
+-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()].
+
+filter_quorum_critical(Queues, ReplicaStates) ->
+ lists:filter(fun (Q) ->
+ MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q),
+ {Name, _Node} = amqqueue:get_pid(Q),
+ AllUp = lists:filter(fun (N) ->
+ {Name, _} = amqqueue:get_pid(Q),
+ case maps:get(N, ReplicaStates, undefined) of
+ #{Name := State} when State =:= follower orelse State =:= leader ->
+ true;
+ _ -> false
+ end
+ end, MemberNodes),
+ MinQuorum = length(MemberNodes) div 2 + 1,
+ length(AllUp) =< MinQuorum
+ end, Queues).
+
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
ets:delete(queue_metrics, QName),