summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-11 13:01:50 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-11 13:01:50 +0000
commit35c0ab39abdf6c493bac6a129d37149d7a453179 (patch)
treef92e27219e126e0ca53e218d69f08cef9d8237d4
parent00576790d80c37ac47e3f2d67e22bcbe980ff95a (diff)
downloadrabbitmq-server-git-35c0ab39abdf6c493bac6a129d37149d7a453179.tar.gz
Get the channel to "block" by not issuing any more credit, rather than calling reader:conserve_memory. This allows us to abstract much more into rabbit_flow, and is more general. Also rename some functions.
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_flow.erl48
-rw-r--r--src/rabbit_reader.erl16
-rw-r--r--src/rabbit_router.erl2
5 files changed, 54 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf46a33927..b15334dfe3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1023,7 +1023,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- rabbit_flow:maybe_issue(Delivery#delivery.sender),
+ rabbit_flow:ack(Delivery#delivery.sender),
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f914aaf654..4222a0e745 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,8 +38,7 @@
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state,
- block_reader}).
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -200,8 +199,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- block_reader = false},
+ trace_state = rabbit_trace:init(VHost)},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -246,19 +244,8 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content},
- State0 = #ch{reader_pid = Reader,
- block_reader = BlockReader}) ->
-
- rabbit_flow:maybe_issue(Reader),
- State =
- case {rabbit_flow:blocked(), BlockReader} of
- {true, false} ->
- rabbit_reader:conserve_memory(self(), Reader, true),
- State0#ch{block_reader = true};
- _ ->
- State0
- end,
+handle_cast({method, Method, Content}, State = #ch{reader_pid = Reader}) ->
+ rabbit_flow:ack(Reader),
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -329,15 +316,9 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
-handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader,
- reader_pid = ReaderPid}) ->
- State1 = case {rabbit_flow:bump(Msg), BlockReader} of
- {false, true} -> rabbit_reader:conserve_memory(
- self(), ReaderPid, false),
- State#ch{block_reader = false};
- _ -> State
- end,
- noreply(State1);
+handle_info({bump_credit, Msg}, State) ->
+ rabbit_flow:bump(Msg),
+ noreply(State);
handle_info(timeout, State) ->
noreply(State);
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index 6fcf287e0a..ea1260036c 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -19,18 +19,23 @@
-define(MAX_CREDIT, 100).
-define(MORE_CREDIT_AT, 50).
--export([maybe_issue/1, bump/1, blocked/0, consume/1]).
+-export([ack/1, bump/1, blocked/0, send/1]).
-maybe_issue(To) ->
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+ack(To) ->
Credit =
case get({credit_to, To}) of
- undefined ->
- ?MAX_CREDIT;
- ?MORE_CREDIT_AT + 1 ->
- To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}},
- ?MAX_CREDIT;
- C ->
- C - 1
+ undefined -> ?MAX_CREDIT;
+ ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT),
+ ?MAX_CREDIT;
+ C -> C - 1
end,
put({credit_to, To}, Credit).
@@ -41,7 +46,7 @@ bump({From, MoreCredit}) ->
end,
put({credit_from, From}, Credit),
case Credit > 0 of
- true -> erase(credit_blocked),
+ true -> unblock(),
false;
false -> true
end.
@@ -50,7 +55,7 @@ bump({From, MoreCredit}) ->
blocked() ->
get(credit_blocked) =:= true.
-consume(From) ->
+send(From) ->
Credit = case get({credit_from, From}) of
undefined -> ?MAX_CREDIT;
C -> C
@@ -60,3 +65,24 @@ consume(From) ->
_ -> ok
end,
put({credit_from, From}, Credit).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> Deferred = case get(credit_deferred) of
+ undefined -> [];
+ L -> L
+ end,
+ put(credit_deferred, [{To, Msg} | Deferred])
+ end.
+
+unblock() ->
+ erase(credit_blocked),
+ case get(credit_deferred) of
+ undefined -> ok;
+ Deferred -> [To ! Msg || {To, Msg} <- Deferred],
+ erase(credit_deferred)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 94eb88ad0b..b463dc338a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/3, server_properties/1]).
+-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -137,8 +137,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Blocker, Pid, Conserve) ->
- Pid ! {conserve_memory, Blocker, Conserve},
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
ok.
server_properties(Protocol) ->
@@ -277,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Blocker, Conserve}, Deb, State) ->
- recvloop(Deb, update_blockers(Conserve, Blocker, State));
+handle_other({conserve_memory, Conserve}, Deb, State) ->
+ recvloop(Deb, update_blockers(Conserve, mem, State));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -937,10 +937,10 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
- {ok, Method, NewAState} -> rabbit_flow:consume(ChPid),
+ {ok, Method, NewAState} -> rabbit_flow:send(ChPid),
rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_flow:consume(ChPid),
+ {ok, Method, Content, NewAState} -> rabbit_flow:send(ChPid),
rabbit_channel:do(ChPid,
Method, Content),
NewAState;
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 47030232d5..fd6b1265ac 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
- [rabbit_flow:consume(QPid) || QPid <- QPids],
+ [rabbit_flow:send(QPid) || QPid <- QPids],
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};