diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-11 13:01:50 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-11 13:01:50 +0000 |
| commit | 35c0ab39abdf6c493bac6a129d37149d7a453179 (patch) | |
| tree | f92e27219e126e0ca53e218d69f08cef9d8237d4 | |
| parent | 00576790d80c37ac47e3f2d67e22bcbe980ff95a (diff) | |
| download | rabbitmq-server-git-35c0ab39abdf6c493bac6a129d37149d7a453179.tar.gz | |
Get the channel to "block" by not issuing any more credit, rather than calling reader:conserve_memory. This allows us to abstract much more into rabbit_flow, and is more general. Also rename some functions.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 2 |
5 files changed, 54 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf46a33927..b15334dfe3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1023,7 +1023,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - rabbit_flow:maybe_issue(Delivery#delivery.sender), + rabbit_flow:ack(Delivery#delivery.sender), noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f914aaf654..4222a0e745 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,8 +38,7 @@ user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state, - block_reader}). + unconfirmed_qm, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -200,8 +199,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_qm = gb_trees:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - block_reader = false}, + trace_state = rabbit_trace:init(VHost)}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -246,19 +244,8 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, - State0 = #ch{reader_pid = Reader, - block_reader = BlockReader}) -> - - rabbit_flow:maybe_issue(Reader), - State = - case {rabbit_flow:blocked(), BlockReader} of - {true, false} -> - rabbit_reader:conserve_memory(self(), Reader, true), - State0#ch{block_reader = true}; - _ -> - State0 - end, +handle_cast({method, Method, Content}, State = #ch{reader_pid = Reader}) -> + rabbit_flow:ack(Reader), try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -329,15 +316,9 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). -handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader, - reader_pid = ReaderPid}) -> - State1 = case {rabbit_flow:bump(Msg), BlockReader} of - {false, true} -> rabbit_reader:conserve_memory( - self(), ReaderPid, false), - State#ch{block_reader = false}; - _ -> State - end, - noreply(State1); +handle_info({bump_credit, Msg}, State) -> + rabbit_flow:bump(Msg), + noreply(State); handle_info(timeout, State) -> noreply(State); diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 6fcf287e0a..ea1260036c 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -19,18 +19,23 @@ -define(MAX_CREDIT, 100). -define(MORE_CREDIT_AT, 50). --export([maybe_issue/1, bump/1, blocked/0, consume/1]). +-export([ack/1, bump/1, blocked/0, send/1]). -maybe_issue(To) -> +%% There are two "flows" here; of messages and of credit, going in +%% opposite directions. The variable names "From" and "To" refer to +%% the flow of credit, but the function names refer to the flow of +%% messages. This is the clearest I can make it (since the function +%% names form the API and want to make sense externally, while the +%% variable names are used in credit bookkeeping and want to make +%% sense internally). + +ack(To) -> Credit = case get({credit_to, To}) of - undefined -> - ?MAX_CREDIT; - ?MORE_CREDIT_AT + 1 -> - To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}}, - ?MAX_CREDIT; - C -> - C - 1 + undefined -> ?MAX_CREDIT; + ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT), + ?MAX_CREDIT; + C -> C - 1 end, put({credit_to, To}, Credit). @@ -41,7 +46,7 @@ bump({From, MoreCredit}) -> end, put({credit_from, From}, Credit), case Credit > 0 of - true -> erase(credit_blocked), + true -> unblock(), false; false -> true end. @@ -50,7 +55,7 @@ bump({From, MoreCredit}) -> blocked() -> get(credit_blocked) =:= true. -consume(From) -> +send(From) -> Credit = case get({credit_from, From}) of undefined -> ?MAX_CREDIT; C -> C @@ -60,3 +65,24 @@ consume(From) -> _ -> ok end, put({credit_from, From}, Credit). + +%% -------------------------------------------------------------------------- + +grant(To, Quantity) -> + Msg = {bump_credit, {self(), Quantity}}, + case blocked() of + false -> To ! Msg; + true -> Deferred = case get(credit_deferred) of + undefined -> []; + L -> L + end, + put(credit_deferred, [{To, Msg} | Deferred]) + end. + +unblock() -> + erase(credit_blocked), + case get(credit_deferred) of + undefined -> ok; + Deferred -> [To ! Msg || {To, Msg} <- Deferred], + erase(credit_deferred) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 94eb88ad0b..b463dc338a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/3, server_properties/1]). +-export([conserve_memory/2, server_properties/1]). -export([process_channel_frame/5]). %% used by erlang-client @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -137,8 +137,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Blocker, Pid, Conserve) -> - Pid ! {conserve_memory, Blocker, Conserve}, +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, ok. server_properties(Protocol) -> @@ -277,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Blocker, Conserve}, Deb, State) -> - recvloop(Deb, update_blockers(Conserve, Blocker, State)); +handle_other({conserve_memory, Conserve}, Deb, State) -> + recvloop(Deb, update_blockers(Conserve, mem, State)); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -937,10 +937,10 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; - {ok, Method, NewAState} -> rabbit_flow:consume(ChPid), + {ok, Method, NewAState} -> rabbit_flow:send(ChPid), rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_flow:consume(ChPid), + {ok, Method, Content, NewAState} -> rabbit_flow:send(ChPid), rabbit_channel:do(ChPid, Method, Content), NewAState; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 47030232d5..fd6b1265ac 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. QPids = lookup_qpids(QNames), - [rabbit_flow:consume(QPid) || QPid <- QPids], + [rabbit_flow:send(QPid) || QPid <- QPids], delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; |
