summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2017-07-27 17:48:06 +0200
committerDiana Corbacho <diana@rabbitmq.com>2017-07-27 17:48:06 +0200
commit75228f4e000c9a0087f103b2e1e91e3b4b5b0a10 (patch)
treed2d8766a8d8d88e4e77ac860b679399dc1854a22
parentd15c03f1e1467198c7cb9649e0bd7ec900b744f0 (diff)
downloadrabbitmq-server-git-75228f4e000c9a0087f103b2e1e91e3b4b5b0a10.tar.gz
Functions to list unresponsive queues
rabbitmq-cli#207 [#149059849]
-rw-r--r--src/rabbit_amqqueue.erl27
1 files changed, 27 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff57593374..122ba3e4ba 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -38,6 +38,7 @@
-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
+-export([emit_unresponsive/5, emit_unresponsive_local/4, is_unresponsive/2]).
-export([pid_of/1, pid_of/2]).
@@ -641,6 +642,18 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
+is_unresponsive(#amqqueue{ state = crashed }, _Timeout) ->
+ false;
+is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
+ try
+ delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}),
+ false
+ catch
+ %% TODO catch any exit??
+ exit:{timeout, _} ->
+ true
+ end.
+
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
@@ -692,6 +705,20 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath)).
+emit_unresponsive_local(VHostPath, Timeout, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of
+ true -> [{name, Q#amqqueue.name}];
+ false -> []
+ end
+ end, list_local(VHostPath)
+ ).
+
+emit_unresponsive(Nodes, VHostPath, Timeout, Ref, AggregatorPid) ->
+ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local,
+ [VHostPath, Timeout, Ref, AggregatorPid]) || Node <- Nodes ],
+ rabbit_control_misc:await_emitters_termination(Pids).
+
info_local(VHostPath) ->
map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).