summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-09 18:17:51 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-09 18:17:51 +0000
commite713a79bdbe526fd14eb3edec06defd70d2e05f9 (patch)
tree9c2de35849bd3f1a03a548cbf3971d0dd6853be9
parent5a8226b644e71770e049517e7ecd143eabe8afae (diff)
downloadrabbitmq-server-git-e713a79bdbe526fd14eb3edec06defd70d2e05f9.tar.gz
Very hacky first pass at internal flow control between channel and reader.
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_flow.erl58
-rw-r--r--src/rabbit_reader.erl21
3 files changed, 79 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b2fe28ce8..ce0024163c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -204,6 +204,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
fun() -> emit_stats(State1) end),
+ rabbit_flow:issue_initial(ReaderPid),
{ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -244,7 +245,8 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content}, State = #ch{reader_pid = ReaderPid}) ->
+ rabbit_flow:maybe_issue(ReaderPid),
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
new file mode 100644
index 0000000000..28fc68b544
--- /dev/null
+++ b/src/rabbit_flow.erl
@@ -0,0 +1,58 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_flow).
+
+-define(MAX_CREDIT, 100).
+-define(MORE_CREDIT_AT, 50).
+
+-export([issue_initial/1, maybe_issue/1, bump/1, blocked/0, consume/1]).
+
+issue_initial(To) ->
+ To ! {bump_credit, {self(), ?MAX_CREDIT}},
+ put({credit_to, To}, ?MAX_CREDIT).
+
+maybe_issue(To) ->
+ Credit =
+ case get({credit_to, To}) - 1 of
+ ?MORE_CREDIT_AT ->
+ To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}},
+ ?MAX_CREDIT;
+ C ->
+ C
+ end,
+ put({credit_to, To}, Credit).
+
+bump({From, NewCredit}) ->
+ Credit = case get({credit_from, From}) of
+ undefined -> NewCredit;
+ 0 -> erase(credit_blocked),
+ NewCredit;
+ C -> C + NewCredit
+ end,
+ put({credit_from, From}, Credit).
+
+blocked() -> get(credit_blocked) =:= true.
+
+consume(From) ->
+ case get({credit_from, From}) of
+ undefined -> ok;
+ Credit -> case Credit of
+ 1 -> put(credit_blocked, true);
+ _ -> ok
+ end,
+ put({credit_from, From}, Credit - 1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 045cc969af..4ac387c5d8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -341,6 +341,8 @@ handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ recvloop(Deb, internal_bump_credit(Msg, State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -370,6 +372,16 @@ internal_conserve_memory(false, State = #v1{connection_state = blocked,
internal_conserve_memory(_Conserve, State) ->
State.
+internal_bump_credit(Msg, State) ->
+ rabbit_flow:bump(Msg),
+ internal_conserve_memory(false, State).
+
+internal_check_credit(State) when ?IS_RUNNING(State) ->
+ case rabbit_flow:blocked() of
+ true -> internal_conserve_memory(true, State);
+ false -> State
+ end.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -504,7 +516,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame, self(),
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
+ post_process_frame(AnalyzedFrame, ChPid,
+ internal_check_credit(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -911,9 +924,11 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, Method, NewAState} -> rabbit_flow:consume(ChPid),
+ rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ {ok, Method, Content, NewAState} -> rabbit_flow:consume(ChPid),
+ rabbit_channel:do(ChPid,
Method, Content),
NewAState;
{error, Reason} -> ErrPid ! {channel_exit, Channel,