diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2017-08-09 17:52:53 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-08-09 17:52:53 +0400 |
| commit | b9a09c712baa07ff85a9c954ffb4e36b95a1fc37 (patch) | |
| tree | 9b7ff7b39d4c6ff93fd65b31f4296eb84028db6a | |
| parent | 70fabf3eae712ee5e13f5131b5d8443b7147203c (diff) | |
| parent | ad9f941f95718e5cef9ee544d20a66e8c3b0b88c (diff) | |
| download | rabbitmq-server-git-b9a09c712baa07ff85a9c954ffb4e36b95a1fc37.tar.gz | |
Merge pull request #1313 from rabbitmq/rabbitmq-cli-207
Functions to list unresponsive queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a5480f707e..2d85a9f04b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -38,6 +38,7 @@ -export([on_node_up/1, on_node_down/1]). -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). +-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). @@ -682,6 +683,18 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). +is_unresponsive(#amqqueue{ state = crashed }, _Timeout) -> + false; +is_unresponsive(#amqqueue{ pid = QPid }, Timeout) -> + try + delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), + false + catch + %% TODO catch any exit?? + exit:{timeout, _} -> + true + end. + info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). @@ -736,6 +749,20 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, list_down(VHostPath)). +emit_unresponsive_local(VHostPath, Items, Timeout, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of + true -> info_down(Q, Items, unresponsive); + false -> [] + end + end, list_local(VHostPath) + ). + +emit_unresponsive(Nodes, VHostPath, Items, Timeout, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local, + [VHostPath, Items, Timeout, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + info_local(VHostPath) -> map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). |
