summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-02 14:57:08 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-02 14:57:08 +0000
commit2e2d170fa382c0eea224e4e7c7bbd74f4451dcaa (patch)
tree9a7352a4e0b1bac49ab0c6683b6f34faeea1da98
parentaf6926d4371aedfc892a317392935ca6d92c7e0b (diff)
downloadrabbitmq-server-git-2e2d170fa382c0eea224e4e7c7bbd74f4451dcaa.tar.gz
Announce credit flow state for channels and queues.
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl11
3 files changed, 31 insertions, 7 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649ef3..39a257aca4 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 65ab15c09a..7002fd367c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -101,7 +101,7 @@
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -1091,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6aa888981b..4d778f946f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -53,7 +53,8 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ client_flow_blocked,
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -600,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -1624,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->