diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-09 18:17:51 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-09 18:17:51 +0000 |
| commit | e713a79bdbe526fd14eb3edec06defd70d2e05f9 (patch) | |
| tree | 9c2de35849bd3f1a03a548cbf3971d0dd6853be9 | |
| parent | 5a8226b644e71770e049517e7ecd143eabe8afae (diff) | |
| download | rabbitmq-server-git-e713a79bdbe526fd14eb3edec06defd70d2e05f9.tar.gz | |
Very hacky first pass at internal flow control between channel and reader.
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 21 |
3 files changed, 79 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9b2fe28ce8..ce0024163c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -204,6 +204,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, fun() -> emit_stats(State1) end), + rabbit_flow:issue_initial(ReaderPid), {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -244,7 +245,8 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State) -> +handle_cast({method, Method, Content}, State = #ch{reader_pid = ReaderPid}) -> + rabbit_flow:maybe_issue(ReaderPid), try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl new file mode 100644 index 0000000000..28fc68b544 --- /dev/null +++ b/src/rabbit_flow.erl @@ -0,0 +1,58 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_flow). + +-define(MAX_CREDIT, 100). +-define(MORE_CREDIT_AT, 50). + +-export([issue_initial/1, maybe_issue/1, bump/1, blocked/0, consume/1]). + +issue_initial(To) -> + To ! {bump_credit, {self(), ?MAX_CREDIT}}, + put({credit_to, To}, ?MAX_CREDIT). + +maybe_issue(To) -> + Credit = + case get({credit_to, To}) - 1 of + ?MORE_CREDIT_AT -> + To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}}, + ?MAX_CREDIT; + C -> + C + end, + put({credit_to, To}, Credit). + +bump({From, NewCredit}) -> + Credit = case get({credit_from, From}) of + undefined -> NewCredit; + 0 -> erase(credit_blocked), + NewCredit; + C -> C + NewCredit + end, + put({credit_from, From}, Credit). + +blocked() -> get(credit_blocked) =:= true. + +consume(From) -> + case get({credit_from, From}) of + undefined -> ok; + Credit -> case Credit of + 1 -> put(credit_blocked, true); + _ -> ok + end, + put({credit_from, From}, Credit - 1) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 045cc969af..4ac387c5d8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -341,6 +341,8 @@ handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other({bump_credit, Msg}, Deb, State) -> + recvloop(Deb, internal_bump_credit(Msg, State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -370,6 +372,16 @@ internal_conserve_memory(false, State = #v1{connection_state = blocked, internal_conserve_memory(_Conserve, State) -> State. +internal_bump_credit(Msg, State) -> + rabbit_flow:bump(Msg), + internal_conserve_memory(false, State). + +internal_check_credit(State) when ?IS_RUNNING(State) -> + case rabbit_flow:blocked() of + true -> internal_conserve_memory(true, State); + false -> State + end. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -504,7 +516,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame, self(), Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, State); + post_process_frame(AnalyzedFrame, ChPid, + internal_check_credit(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -911,9 +924,11 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + {ok, Method, NewAState} -> rabbit_flow:consume(ChPid), + rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, + {ok, Method, Content, NewAState} -> rabbit_flow:consume(ChPid), + rabbit_channel:do(ChPid, Method, Content), NewAState; {error, Reason} -> ErrPid ! {channel_exit, Channel, |
