summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl23
-rw-r--r--src/rabbit_fifo_client.erl20
-rw-r--r--src/rabbit_quorum_queue.erl10
3 files changed, 48 insertions, 5 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index e89f4a9e22..37847fa3e5 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -57,7 +57,8 @@
make_discard/2,
make_credit/4,
make_purge/0,
- make_update_config/1
+ make_update_config/1,
+ make_stat/0
]).
-type raw_msg() :: term().
@@ -133,6 +134,7 @@
drain :: boolean()}).
-record(purge, {}).
-record(update_config, {config :: config()}).
+-record(stat, {}).
@@ -144,7 +146,8 @@
#discard{} |
#credit{} |
#purge{} |
- #update_config{}.
+ #update_config{} |
+ #stat{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -431,6 +434,19 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
+apply(_, #stat{}, #state{name = Name,
+ messages = Messages,
+ ra_indexes = Indexes,
+ consumers = Cons,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
+ Metrics = {maps:size(Messages), % Ready
+ num_checked_out(State), % checked out
+ rabbit_fifo_index:size(Indexes), %% Total
+ maps:size(Cons), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ {State, {stat, Metrics}, []};
apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -1181,6 +1197,9 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
+-spec make_stat() -> protocol().
+make_stat() -> #stat{}.
+
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 955c0e4d9d..7e21ba222a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -38,7 +38,8 @@
untracked_enqueue/2,
purge/1,
cluster_name/1,
- update_machine_state/2
+ update_machine_state/2,
+ stat/1
]).
-include_lib("ra/include/ra.hrl").
@@ -398,6 +399,23 @@ purge(Node) ->
Err
end.
+-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer()}}
+ | {error | timeout, term()}.
+stat(Servers) ->
+ try_process_stat(Servers, rabbit_fifo:make_stat()).
+
+try_process_stat([Server | Rem], Cmd) ->
+ case ra:process_command(Server, Cmd, 30000) of
+ {ok, {stat, Reply}, _} ->
+ {ok, Reply};
+ Err when length(Rem) =:= 0 ->
+ Err;
+ _ ->
+ try_process_stat(Rem, Cmd)
+ end.
+
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index c43ef4dfd4..540d130243 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -424,8 +424,14 @@ infos(QName) ->
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
-stat(_Q) ->
- {ok, 0, 0}. %% TODO length, consumers count
+stat(#amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) ->
+ case rabbit_fifo_client:stat([{Name, N} || N <- Nodes]) of
+ {ok, {Ready, _, _, Consumers, _, _}} ->
+ {ok, Ready, Consumers};
+ _ ->
+ %% Leader is not available, cluster might be in minority
+ {ok, 0, 0}
+ end.
purge(Node) ->
rabbit_fifo_client:purge(Node).