summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2017-08-09 17:52:53 +0400
committerGitHub <noreply@github.com>2017-08-09 17:52:53 +0400
commitb9a09c712baa07ff85a9c954ffb4e36b95a1fc37 (patch)
tree9b7ff7b39d4c6ff93fd65b31f4296eb84028db6a
parent70fabf3eae712ee5e13f5131b5d8443b7147203c (diff)
parentad9f941f95718e5cef9ee544d20a66e8c3b0b88c (diff)
downloadrabbitmq-server-git-b9a09c712baa07ff85a9c954ffb4e36b95a1fc37.tar.gz
Merge pull request #1313 from rabbitmq/rabbitmq-cli-207
Functions to list unresponsive queues
-rw-r--r--src/rabbit_amqqueue.erl27
1 files changed, 27 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5480f707e..2d85a9f04b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -38,6 +38,7 @@
-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
+-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -682,6 +683,18 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
+is_unresponsive(#amqqueue{ state = crashed }, _Timeout) ->
+ false;
+is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
+ try
+ delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}),
+ false
+ catch
+ %% TODO catch any exit??
+ exit:{timeout, _} ->
+ true
+ end.
+
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
@@ -736,6 +749,20 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath)).
+emit_unresponsive_local(VHostPath, Items, Timeout, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of
+ true -> info_down(Q, Items, unresponsive);
+ false -> []
+ end
+ end, list_local(VHostPath)
+ ).
+
+emit_unresponsive(Nodes, VHostPath, Items, Timeout, Ref, AggregatorPid) ->
+ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local,
+ [VHostPath, Items, Timeout, Ref, AggregatorPid]) || Node <- Nodes ],
+ rabbit_control_misc:await_emitters_termination(Pids).
+
info_local(VHostPath) ->
map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).