diff options
| -rw-r--r-- | src/credit_flow.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 11 |
3 files changed, 31 insertions, 7 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index d48d649ef3..39a257aca4 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -30,7 +30,7 @@ -define(DEFAULT_CREDIT, {200, 50}). --export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of _ -> true end. +state() -> case blocked() of + true -> flow; + false -> case get(credit_blocked_at) of + undefined -> running; + B -> Diff = timer:now_diff(erlang:now(), B), + case Diff < 5000000 of + true -> flow; + false -> running + end + end + end. + peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -128,7 +140,12 @@ grant(To, Quantity) -> true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). +block(From) -> + case blocked() of + false -> put(credit_blocked_at, erlang:now()); + true -> ok + end, + ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 65ab15c09a..7002fd367c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -101,7 +101,7 @@ slave_pids, synchronised_slave_pids, backing_queue_status, - status + state ]). -define(CREATION_EVENT_KEYS, @@ -1091,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(status, #q{status = Status}) -> - Status; +i(state, #q{status = running}) -> credit_flow:state(); +i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6aa888981b..4d778f946f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -53,7 +53,8 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked]). + client_flow_blocked, + state]). -define(CREATION_EVENT_KEYS, [pid, @@ -600,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -1624,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> |
