summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl33
1 files changed, 32 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6227a5bbfa..fa7279a8bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,9 +44,12 @@
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
+-export([has_synchronised_mirrors_online/1]).
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
- list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]).
+ list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
+ list_local_mirrored_classic_without_synchronised_mirrors/0,
+ list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@@ -1035,6 +1038,27 @@ list_local_mirrored_classic_names() ->
is_local_to_node(amqqueue:get_pid(Q), node()),
is_replicated(Q)].
+-spec list_local_mirrored_classic_without_synchronised_mirrors() -> [amqqueue:amqqueue()].
+list_local_mirrored_classic_without_synchronised_mirrors() ->
+ [ Q || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q),
+ not has_synchronised_mirrors_online(Q)].
+
+-spec list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> [amqqueue:amqqueue()].
+list_local_mirrored_classic_without_synchronised_mirrors_for_cli() ->
+ QQs = list_local_mirrored_classic_without_synchronised_mirrors(),
+ [begin
+ #resource{name = Name} = amqqueue:get_name(Q),
+ #{
+ <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)),
+ <<"name">> => Name,
+ <<"virtual_host">> => amqqueue:get_vhost(Q),
+ <<"type">> => <<"quorum">>
+ }
+ end || Q <- QQs].
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
@@ -1882,6 +1906,13 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
Pid = amqqueue:get_pid(Q),
not rabbit_mnesia:is_process_alive(Pid).
+-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean().
+has_synchronised_mirrors_online(Q) ->
+ %% a queue with all mirrors down would have no mirror pids.
+ %% We treat these as in sync intentionally to avoid false positives.
+ MirrorPids = amqqueue:get_sync_slave_pids(Q),
+ MirrorPids =/= [] andalso lists:any(fun rabbit_misc:is_process_alive/1, MirrorPids).
+
-spec on_node_up(node()) -> 'ok'.
on_node_up(Node) ->