diff options
| -rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 5 |
2 files changed, 13 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7c07aa07f3..c0f1ff5dc8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -189,6 +189,7 @@ messages_unconfirmed, messages_uncommitted, acks_uncommitted, + pending_raft_commands, prefetch_count, global_prefetch_count, state, @@ -2362,6 +2363,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(pending_raft_commands, #ch{queue_states = QS}) -> + pending_raft_commands(QS); i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); i(state, #ch{cfg = #conf{state = State}}) -> State; i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; @@ -2377,6 +2380,11 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). +pending_raft_commands(QStates) -> + maps:fold(fun (_, V, Acc) -> + Acc + rabbit_fifo_client:pending_size(V) + end, 0, QStates). + name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a3c241aff2..136800cc99 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -39,6 +39,7 @@ purge/1, cluster_name/1, update_machine_state/2, + pending_size/1, stat/1 ]). @@ -409,6 +410,10 @@ purge(Node) -> Err end. +-spec pending_size(state()) -> non_neg_integer(). +pending_size(#state{pending = Pend}) -> + maps:size(Pend). + -spec stat(ra_server_id()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. |
