diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 |
2 files changed, 19 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9899964709..2035d5611c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -58,7 +58,8 @@ vhost, transactional, consumer_count, - messages_unacknowledged]). + messages_unacknowledged, + prefetch_count]). %%---------------------------------------------------------------------------- @@ -1010,5 +1011,7 @@ i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64d9..6bd803a27b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -31,12 +31,13 @@ -module(rabbit_limiter). --behaviour(gen_server). +-behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([get_limit/1]). %%---------------------------------------------------------------------------- @@ -51,6 +52,7 @@ -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -endif. @@ -69,7 +71,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []), Pid. shutdown(undefined) -> @@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). unregister(undefined, _QPid) -> ok; unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +get_limit(undefined) -> + 0; +get_limit(Pid) -> + rabbit_misc:with_exit_handler( + fun () -> 0 end, + fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From, false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end. + end; + +handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}. handle_cast(shutdown, State) -> {stop, normal, State}; |
