diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2017-07-27 17:48:06 +0200 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2017-07-27 17:48:06 +0200 |
| commit | 75228f4e000c9a0087f103b2e1e91e3b4b5b0a10 (patch) | |
| tree | d2d8766a8d8d88e4e77ac860b679399dc1854a22 | |
| parent | d15c03f1e1467198c7cb9649e0bd7ec900b744f0 (diff) | |
| download | rabbitmq-server-git-75228f4e000c9a0087f103b2e1e91e3b4b5b0a10.tar.gz | |
Functions to list unresponsive queues
rabbitmq-cli#207
[#149059849]
| -rw-r--r-- | src/rabbit_amqqueue.erl | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff57593374..122ba3e4ba 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -38,6 +38,7 @@ -export([on_node_up/1, on_node_down/1]). -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). +-export([emit_unresponsive/5, emit_unresponsive_local/4, is_unresponsive/2]). -export([pid_of/1, pid_of/2]). @@ -641,6 +642,18 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). +is_unresponsive(#amqqueue{ state = crashed }, _Timeout) -> + false; +is_unresponsive(#amqqueue{ pid = QPid }, Timeout) -> + try + delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), + false + catch + %% TODO catch any exit?? + exit:{timeout, _} -> + true + end. + info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). @@ -692,6 +705,20 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, list_down(VHostPath)). +emit_unresponsive_local(VHostPath, Timeout, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of + true -> [{name, Q#amqqueue.name}]; + false -> [] + end + end, list_local(VHostPath) + ). + +emit_unresponsive(Nodes, VHostPath, Timeout, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local, + [VHostPath, Timeout, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + info_local(VHostPath) -> map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). |
