diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 10 |
3 files changed, 48 insertions, 5 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index e89f4a9e22..37847fa3e5 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -57,7 +57,8 @@ make_discard/2, make_credit/4, make_purge/0, - make_update_config/1 + make_update_config/1, + make_stat/0 ]). -type raw_msg() :: term(). @@ -133,6 +134,7 @@ drain :: boolean()}). -record(purge, {}). -record(update_config, {config :: config()}). +-record(stat, {}). @@ -144,7 +146,8 @@ #discard{} | #credit{} | #purge{} | - #update_config{}. + #update_config{} | + #stat{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -431,6 +434,19 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; +apply(_, #stat{}, #state{name = Name, + messages = Messages, + ra_indexes = Indexes, + consumers = Cons, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> + Metrics = {maps:size(Messages), % Ready + num_checked_out(State), % checked out + rabbit_fifo_index:size(Indexes), %% Total + maps:size(Cons), % Consumers + EnqueueBytes, + CheckoutBytes}, + {State, {stat, Metrics}, []}; apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -1181,6 +1197,9 @@ make_purge() -> #purge{}. make_update_config(Config) -> #update_config{config = Config}. +-spec make_stat() -> protocol(). +make_stat() -> #stat{}. + add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), State#state{msg_bytes_enqueue = Enqueue + Bytes}. diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 955c0e4d9d..7e21ba222a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -38,7 +38,8 @@ untracked_enqueue/2, purge/1, cluster_name/1, - update_machine_state/2 + update_machine_state/2, + stat/1 ]). -include_lib("ra/include/ra.hrl"). @@ -398,6 +399,23 @@ purge(Node) -> Err end. +-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer(), + non_neg_integer(), non_neg_integer(), + non_neg_integer(), non_neg_integer()}} + | {error | timeout, term()}. +stat(Servers) -> + try_process_stat(Servers, rabbit_fifo:make_stat()). + +try_process_stat([Server | Rem], Cmd) -> + case ra:process_command(Server, Cmd, 30000) of + {ok, {stat, Reply}, _} -> + {ok, Reply}; + Err when length(Rem) =:= 0 -> + Err; + _ -> + try_process_stat(Rem, Cmd) + end. + %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index c43ef4dfd4..540d130243 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -424,8 +424,14 @@ infos(QName) -> info(Q, Items) -> [{Item, i(Item, Q)} || Item <- Items]. -stat(_Q) -> - {ok, 0, 0}. %% TODO length, consumers count +stat(#amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) -> + case rabbit_fifo_client:stat([{Name, N} || N <- Nodes]) of + {ok, {Ready, _, _, Consumers, _, _}} -> + {ok, Ready, Consumers}; + _ -> + %% Leader is not available, cluster might be in minority + {ok, 0, 0} + end. purge(Node) -> rabbit_fifo_client:purge(Node). |
