summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_quorum_queue.erl21
3 files changed, 66 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 936b18c53a..b77e0118e1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -54,6 +54,7 @@
-export([mark_local_durable_queues_stopped/1]).
-export([rebalance/3]).
+-export([collect_info_all/2]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -1097,7 +1098,7 @@ is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) ->
end.
format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
-format(_) -> [].
+format(Q) -> rabbit_amqqueue_process:format(Q).
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
@@ -1128,7 +1129,7 @@ info_down(Q, DownReason) ->
info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason).
info_down(Q, Items, DownReason) ->
- [{Item, i_down(Item, Q, DownReason)} || Item <- Items].
+ [{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific].
i_down(name, Q, _) -> amqqueue:get_name(Q);
i_down(durable, Q, _) -> amqqueue:is_durable(Q);
@@ -1165,6 +1166,26 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids).
+collect_info_all(VHostPath, Items) ->
+ Nodes = rabbit_mnesia:cluster_nodes(running),
+ Ref = make_ref(),
+ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
+ rabbit_control_misc:await_emitters_termination(Pids),
+ wait_for_queues(Ref, length(Pids), []).
+
+wait_for_queues(Ref, N, Acc) ->
+ receive
+ {Ref, finished} when N == 1 ->
+ Acc;
+ {Ref, finished} ->
+ wait_for_queues(Ref, N - 1, Acc);
+ {Ref, Items, continue} ->
+ wait_for_queues(Ref, N, [Items | Acc])
+ after
+ 1000 ->
+ Acc
+ end.
+
emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 46db98ba5a..622e6652e9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -32,6 +32,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+-export([format/1]).
%% Queue's state
-record(q, {
@@ -1066,7 +1067,16 @@ stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, #q{q = Q} = State) ->
+ lists:foldr(fun(totals, Acc) ->
+ [{messages_ready, i(messages_ready, State)},
+ {messages, i(messages, State)},
+ {messages_unacknowledged, i(messages_unacknowledged, State)}] ++ Acc;
+ (type_specific, Acc) ->
+ format(Q) ++ Acc;
+ (Item, Acc) ->
+ [{Item, i(Item, State)} | Acc]
+ end, [], Items).
i(name, #q{q = Q}) -> amqqueue:get_name(Q);
i(durable, #q{q = Q}) -> amqqueue:is_durable(Q);
@@ -1761,6 +1771,18 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+format(Q) when ?is_amqqueue(Q) ->
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false ->
+ [{node, node(amqqueue:get_pid(Q))}];
+ true ->
+ Slaves = amqqueue:get_slave_pids(Q),
+ SSlaves = amqqueue:get_sync_slave_pids(Q),
+ [{slave_nodes, [node(S) || S <- Slaves]},
+ {synchronised_slave_nodes, [node(S) || S <- SSlaves]},
+ {node, node(amqqueue:get_pid(Q))}]
+ end.
+
log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = Q }) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 75857f81a8..fc8a38be84 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -587,7 +587,13 @@ infos(QName) ->
-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
info(Q, Items) ->
- [{Item, i(Item, Q)} || Item <- Items].
+ lists:foldr(fun(totals, Acc) ->
+ i_totals(Q) ++ Acc;
+ (type_specific, Acc) ->
+ format(Q) ++ Acc;
+ (Item, Acc) ->
+ [{Item, i(Item, Q)} | Acc]
+ end, [], Items).
-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
@@ -968,6 +974,19 @@ find_quorum_queues(VHost) ->
amqqueue:qnode(Q) == Node]))
end).
+i_totals(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ case ets:lookup(queue_coarse_metrics, QName) of
+ [{_, MR, MU, M, _}] ->
+ [{messages_ready, MR},
+ {messages_unacknowledged, MU},
+ {messages, M}];
+ [] ->
+ [{messages_ready, 0},
+ {messages_unacknowledged, 0},
+ {messages, 0}]
+ end.
+
i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q);
i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);