diff options
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 19 |
3 files changed, 22 insertions, 6 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 5179eb253c..57fb0827ab 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1101,6 +1101,10 @@ <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> + <varlistentry> + <term>blocked_channelflow</term> + <listitem><para>True if the consuming client blocked the channel, false otherwise.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f19f98d2b4..b53511dde5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -58,7 +58,8 @@ consumer_count, messages_unacknowledged, acks_uncommitted, - prefetch_count]). + prefetch_count, + blocked_channelflow]). -define(CREATION_EVENT_KEYS, [pid, @@ -1125,6 +1126,8 @@ i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> queue:len(UAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); +i(blocked_channelflow, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:is_blocked(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c323d7cef0..177b0bb91b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1]). +-export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -55,6 +55,7 @@ -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -spec(block/1 :: (maybe_pid()) -> 'ok'). -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). +-spec(is_blocked/1 :: (maybe_pid()) -> boolean()). -endif. @@ -119,6 +120,11 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). +is_blocked(undefined) -> + false; +is_blocked(LimiterPid) -> + gen_server2:call(LimiterPid, is_blocked, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -157,7 +163,10 @@ handle_call(unblock, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} - end. + end; + +handle_call(is_blocked, _From, State = #lim{blocked = Blocked}) -> + {reply, Blocked, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -186,8 +195,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse is_blocked(OldState)) andalso - not (limit_reached(NewState) orelse is_blocked(NewState)) of + case (limit_reached(OldState) orelse internal_is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse internal_is_blocked(NewState)) of true -> NewState1 = notify_queues(NewState), {case NewState1#lim.prefetch_count of 0 -> stop; @@ -199,7 +208,7 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -is_blocked(#lim{blocked = Blocked}) -> Blocked. +internal_is_blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of |
