diff options
| author | dcorbacho <dparracorbacho@piotal.io> | 2019-11-17 21:50:31 +0000 |
|---|---|---|
| committer | dcorbacho <dparracorbacho@piotal.io> | 2019-11-17 22:41:21 +0000 |
| commit | 3fb366bbbb8fe446995261cf2f4bf2b2753ffe3a (patch) | |
| tree | 09131ad610209f1b35e4177687e2c098f3f6e79f | |
| parent | 880965aa67009dc3fb8dc161b8b1b6df6a71ed9d (diff) | |
| download | rabbitmq-server-git-3fb366bbbb8fe446995261cf2f4bf2b2753ffe3a.tar.gz | |
Add format/1 for classic queues and totals/type_specific info for all types
The functions report the necessary data for mgmt-less UI:
* replica information for individual formatting of classic queues
* queue totals/replica information for all queues
[#169802101]
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 21 |
3 files changed, 66 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 936b18c53a..b77e0118e1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -54,6 +54,7 @@ -export([mark_local_durable_queues_stopped/1]). -export([rebalance/3]). +-export([collect_info_all/2]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -1097,7 +1098,7 @@ is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) -> end. format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); -format(_) -> []. +format(Q) -> rabbit_amqqueue_process:format(Q). -spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). @@ -1128,7 +1129,7 @@ info_down(Q, DownReason) -> info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). info_down(Q, Items, DownReason) -> - [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. + [{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific]. i_down(name, Q, _) -> amqqueue:get_name(Q); i_down(durable, Q, _) -> amqqueue:is_durable(Q); @@ -1165,6 +1166,26 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ], rabbit_control_misc:await_emitters_termination(Pids). +collect_info_all(VHostPath, Items) -> + Nodes = rabbit_mnesia:cluster_nodes(running), + Ref = make_ref(), + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids), + wait_for_queues(Ref, length(Pids), []). + +wait_for_queues(Ref, N, Acc) -> + receive + {Ref, finished} when N == 1 -> + Acc; + {Ref, finished} -> + wait_for_queues(Ref, N - 1, Acc); + {Ref, Items, continue} -> + wait_for_queues(Ref, N, [Items | Acc]) + after + 1000 -> + Acc + end. + emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 46db98ba5a..622e6652e9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -32,6 +32,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). +-export([format/1]). %% Queue's state -record(q, { @@ -1066,7 +1067,16 @@ stop(State) -> stop(noreply, State). stop(noreply, State) -> {stop, normal, State}; stop(Reply, State) -> {stop, normal, Reply, State}. -infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, #q{q = Q} = State) -> + lists:foldr(fun(totals, Acc) -> + [{messages_ready, i(messages_ready, State)}, + {messages, i(messages, State)}, + {messages_unacknowledged, i(messages_unacknowledged, State)}] ++ Acc; + (type_specific, Acc) -> + format(Q) ++ Acc; + (Item, Acc) -> + [{Item, i(Item, State)} | Acc] + end, [], Items). i(name, #q{q = Q}) -> amqqueue:get_name(Q); i(durable, #q{q = Q}) -> amqqueue:is_durable(Q); @@ -1761,6 +1771,18 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +format(Q) when ?is_amqqueue(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> + [{node, node(amqqueue:get_pid(Q))}]; + true -> + Slaves = amqqueue:get_slave_pids(Q), + SSlaves = amqqueue:get_sync_slave_pids(Q), + [{slave_nodes, [node(S) || S <- Slaves]}, + {synchronised_slave_nodes, [node(S) || S <- SSlaves]}, + {node, node(amqqueue:get_pid(Q))}] + end. + log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); log_delete_exclusive(ConPid, #q{ q = Q }) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 75857f81a8..fc8a38be84 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -587,7 +587,13 @@ infos(QName) -> -spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). info(Q, Items) -> - [{Item, i(Item, Q)} || Item <- Items]. + lists:foldr(fun(totals, Acc) -> + i_totals(Q) ++ Acc; + (type_specific, Acc) -> + format(Q) ++ Acc; + (Item, Acc) -> + [{Item, i(Item, Q)} | Acc] + end, [], Items). -spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. @@ -968,6 +974,19 @@ find_quorum_queues(VHost) -> amqqueue:qnode(Q) == Node])) end). +i_totals(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + case ets:lookup(queue_coarse_metrics, QName) of + [{_, MR, MU, M, _}] -> + [{messages_ready, MR}, + {messages_unacknowledged, MU}, + {messages, M}]; + [] -> + [{messages_ready, 0}, + {messages_unacknowledged, 0}, + {messages, 0}] + end. + i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q); i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q); i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q); |
