diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-10 17:11:19 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-10 17:11:19 +0000 |
| commit | e099c912c60c7ed4c51f2754227809b490bb279b (patch) | |
| tree | 5d97bb868d70e6fcbbee5403c5b0c007a267aee9 | |
| parent | e713a79bdbe526fd14eb3edec06defd70d2e05f9 (diff) | |
| download | rabbitmq-server-git-e099c912c60c7ed4c51f2754227809b490bb279b.tar.gz | |
Flow control channel -> queue.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 1 |
5 files changed, 103 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba20b35524..cf46a33927 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1023,6 +1023,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + rabbit_flow:maybe_issue(Delivery#delivery.sender), noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ce0024163c..13625d6373 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,8 @@ user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + unconfirmed_qm, confirmed, capabilities, trace_state, + block_reader}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -199,12 +200,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_qm = gb_trees:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + block_reader = false}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, fun() -> emit_stats(State1) end), - rabbit_flow:issue_initial(ReaderPid), {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -245,8 +246,19 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State = #ch{reader_pid = ReaderPid}) -> - rabbit_flow:maybe_issue(ReaderPid), +handle_cast({method, Method, Content}, + State0 = #ch{reader_pid = Reader, + block_reader = BlockReader}) -> + + rabbit_flow:maybe_issue(Reader), + State = + case {rabbit_flow:blocked(), BlockReader} of + {true, false} -> + rabbit_reader:conserve_memory(self(), Reader, true), + State0#ch{block_reader = true}; + _ -> + State0 + end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -317,6 +329,17 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). +handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader, + reader_pid = ReaderPid}) -> + State1 = + case {rabbit_flow:bump(Msg), BlockReader} of + {false, true} -> rabbit_reader:conserve_memory( + self(), ReaderPid, false), + State#ch{block_reader = false}; + _ -> State + end, + noreply(State1); + handle_info(timeout, State) -> noreply(State); diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 28fc68b544..6fcf287e0a 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -19,40 +19,44 @@ -define(MAX_CREDIT, 100). -define(MORE_CREDIT_AT, 50). --export([issue_initial/1, maybe_issue/1, bump/1, blocked/0, consume/1]). - -issue_initial(To) -> - To ! {bump_credit, {self(), ?MAX_CREDIT}}, - put({credit_to, To}, ?MAX_CREDIT). +-export([maybe_issue/1, bump/1, blocked/0, consume/1]). maybe_issue(To) -> Credit = - case get({credit_to, To}) - 1 of - ?MORE_CREDIT_AT -> + case get({credit_to, To}) of + undefined -> + ?MAX_CREDIT; + ?MORE_CREDIT_AT + 1 -> To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}}, ?MAX_CREDIT; C -> - C + C - 1 end, put({credit_to, To}, Credit). -bump({From, NewCredit}) -> +bump({From, MoreCredit}) -> Credit = case get({credit_from, From}) of - undefined -> NewCredit; - 0 -> erase(credit_blocked), - NewCredit; - C -> C + NewCredit + undefined -> MoreCredit; + C -> C + MoreCredit end, - put({credit_from, From}, Credit). + put({credit_from, From}, Credit), + case Credit > 0 of + true -> erase(credit_blocked), + false; + false -> true + end. -blocked() -> get(credit_blocked) =:= true. +%% TODO we assume only one From can block at once. Is this true? +blocked() -> + get(credit_blocked) =:= true. consume(From) -> - case get({credit_from, From}) of - undefined -> ok; - Credit -> case Credit of - 1 -> put(credit_blocked, true); - _ -> ok - end, - put({credit_from, From}, Credit - 1) - end. + Credit = case get({credit_from, From}) of + undefined -> ?MAX_CREDIT; + C -> C + end - 1, + case Credit of + 0 -> put(credit_blocked, true); + _ -> ok + end, + put({credit_from, From}, Credit). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4ac387c5d8..9e3b58aabe 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_memory/3, server_properties/1]). -export([process_channel_frame/5]). %% used by erlang-client @@ -40,7 +40,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, blockers}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -137,8 +137,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_memory(Blocker, Pid, Conserve) -> + Pid ! {conserve_memory, Blocker, Conserve}, ok. server_properties(Protocol) -> @@ -220,7 +220,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + blockers = sets:new()}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -276,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); +handle_other({conserve_memory, Blocker, Conserve}, Deb, State) -> + recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State)); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -361,24 +362,47 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +internal_conserve_memory(true, Blocker, + State = #v1{connection_state = running, + blockers = Blockers}) -> + 0 = sets:size(Blockers), %% ASSERT + State#v1{connection_state = blocking, + blockers = sets:add_element(Blocker, Blockers)}; +internal_conserve_memory(true, Blocker, + State = #v1{blockers = Blockers}) -> + State#v1{blockers = sets:add_element(Blocker, Blockers)}; +internal_conserve_memory(false, Blocker, + State = #v1{connection_state = blocking, + blockers = Blockers}) -> + NewBlockers = sets:del_element(Blocker, Blockers), + case sets:size(NewBlockers) of + 0 -> State#v1{connection_state = running, + blockers = NewBlockers}; + + _ -> State#v1{blockers = NewBlockers} + end; +internal_conserve_memory(false, Blocker, + State = #v1{connection_state = blocked, + blockers = Blockers, + heartbeater = Heartbeater}) -> + NewBlockers = sets:del_element(Blocker, Blockers), + case sets:size(NewBlockers) of + 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#v1{connection_state = running, + blockers = NewBlockers}; + + _ -> State#v1{blockers = NewBlockers} + end; +internal_conserve_memory(_Conserve, _Blocker, State) -> State. internal_bump_credit(Msg, State) -> rabbit_flow:bump(Msg), - internal_conserve_memory(false, State). + internal_conserve_memory(false, self(), State). internal_check_credit(State) when ?IS_RUNNING(State) -> case rabbit_flow:blocked() of - true -> internal_conserve_memory(true, State); + true -> internal_conserve_memory(true, self(), State); false -> State end. @@ -713,7 +737,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, + [memory_alarm]}), + memory_alarm, State#v1{connection_state = running, connection = NewConnection}), rabbit_event:notify(connection_created, diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 31f5ad14ea..47030232d5 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -59,6 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. QPids = lookup_qpids(QNames), + [rabbit_flow:consume(QPid) || QPid <- QPids], delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; |
