diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2010-09-17 16:52:51 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2010-09-17 16:52:51 +0100 |
| commit | a9d4538cc7103f0b4516ad6b581a1e081bee98df (patch) | |
| tree | 632061c41e4e6080f4b15806b1d609dfc097e528 /src | |
| parent | a23aba3b2b534feeaaaad3aeef7d04eb7348b5b9 (diff) | |
| download | rabbitmq-server-git-a9d4538cc7103f0b4516ad6b581a1e081bee98df.tar.gz | |
rabbitmqctl reports channel blocked status
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 19 |
2 files changed, 18 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f19f98d2b4..b53511dde5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -58,7 +58,8 @@ consumer_count, messages_unacknowledged, acks_uncommitted, - prefetch_count]). + prefetch_count, + blocked_channelflow]). -define(CREATION_EVENT_KEYS, [pid, @@ -1125,6 +1126,8 @@ i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> queue:len(UAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); +i(blocked_channelflow, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:is_blocked(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c323d7cef0..177b0bb91b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1]). +-export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -55,6 +55,7 @@ -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -spec(block/1 :: (maybe_pid()) -> 'ok'). -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). +-spec(is_blocked/1 :: (maybe_pid()) -> boolean()). -endif. @@ -119,6 +120,11 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). +is_blocked(undefined) -> + false; +is_blocked(LimiterPid) -> + gen_server2:call(LimiterPid, is_blocked, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -157,7 +163,10 @@ handle_call(unblock, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} - end. + end; + +handle_call(is_blocked, _From, State = #lim{blocked = Blocked}) -> + {reply, Blocked, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -186,8 +195,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse is_blocked(OldState)) andalso - not (limit_reached(NewState) orelse is_blocked(NewState)) of + case (limit_reached(OldState) orelse internal_is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse internal_is_blocked(NewState)) of true -> NewState1 = notify_queues(NewState), {case NewState1#lim.prefetch_count of 0 -> stop; @@ -199,7 +208,7 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -is_blocked(#lim{blocked = Blocked}) -> Blocked. +internal_is_blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of |
