summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_fifo_client.erl5
2 files changed, 13 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7c07aa07f3..c0f1ff5dc8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -189,6 +189,7 @@
messages_unconfirmed,
messages_uncommitted,
acks_uncommitted,
+ pending_raft_commands,
prefetch_count,
global_prefetch_count,
state,
@@ -2362,6 +2363,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(pending_raft_commands, #ch{queue_states = QS}) ->
+ pending_raft_commands(QS);
i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state();
i(state, #ch{cfg = #conf{state = State}}) -> State;
i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C;
@@ -2377,6 +2380,11 @@ i(reductions, _State) ->
i(Item, _) ->
throw({bad_argument, Item}).
+pending_raft_commands(QStates) ->
+ maps:fold(fun (_, V, Acc) ->
+ Acc + rabbit_fifo_client:pending_size(V)
+ end, 0, QStates).
+
name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index a3c241aff2..136800cc99 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -39,6 +39,7 @@
purge/1,
cluster_name/1,
update_machine_state/2,
+ pending_size/1,
stat/1
]).
@@ -409,6 +410,10 @@ purge(Node) ->
Err
end.
+-spec pending_size(state()) -> non_neg_integer().
+pending_size(#state{pending = Pend}) ->
+ maps:size(Pend).
+
-spec stat(ra_server_id()) ->
{ok, non_neg_integer(), non_neg_integer()}
| {error | timeout, term()}.