summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_flow.erl50
-rw-r--r--src/rabbit_reader.erl66
-rw-r--r--src/rabbit_router.erl1
5 files changed, 103 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba20b35524..cf46a33927 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1023,6 +1023,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ rabbit_flow:maybe_issue(Delivery#delivery.sender),
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ce0024163c..13625d6373 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,8 @@
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ unconfirmed_qm, confirmed, capabilities, trace_state,
+ block_reader}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -199,12 +200,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost)},
+ trace_state = rabbit_trace:init(VHost),
+ block_reader = false},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
fun() -> emit_stats(State1) end),
- rabbit_flow:issue_initial(ReaderPid),
{ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -245,8 +246,19 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State = #ch{reader_pid = ReaderPid}) ->
- rabbit_flow:maybe_issue(ReaderPid),
+handle_cast({method, Method, Content},
+ State0 = #ch{reader_pid = Reader,
+ block_reader = BlockReader}) ->
+
+ rabbit_flow:maybe_issue(Reader),
+ State =
+ case {rabbit_flow:blocked(), BlockReader} of
+ {true, false} ->
+ rabbit_reader:conserve_memory(self(), Reader, true),
+ State0#ch{block_reader = true};
+ _ ->
+ State0
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -317,6 +329,17 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader,
+ reader_pid = ReaderPid}) ->
+ State1 =
+ case {rabbit_flow:bump(Msg), BlockReader} of
+ {false, true} -> rabbit_reader:conserve_memory(
+ self(), ReaderPid, false),
+ State#ch{block_reader = false};
+ _ -> State
+ end,
+ noreply(State1);
+
handle_info(timeout, State) ->
noreply(State);
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index 28fc68b544..6fcf287e0a 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -19,40 +19,44 @@
-define(MAX_CREDIT, 100).
-define(MORE_CREDIT_AT, 50).
--export([issue_initial/1, maybe_issue/1, bump/1, blocked/0, consume/1]).
-
-issue_initial(To) ->
- To ! {bump_credit, {self(), ?MAX_CREDIT}},
- put({credit_to, To}, ?MAX_CREDIT).
+-export([maybe_issue/1, bump/1, blocked/0, consume/1]).
maybe_issue(To) ->
Credit =
- case get({credit_to, To}) - 1 of
- ?MORE_CREDIT_AT ->
+ case get({credit_to, To}) of
+ undefined ->
+ ?MAX_CREDIT;
+ ?MORE_CREDIT_AT + 1 ->
To ! {bump_credit, {self(), ?MAX_CREDIT - ?MORE_CREDIT_AT}},
?MAX_CREDIT;
C ->
- C
+ C - 1
end,
put({credit_to, To}, Credit).
-bump({From, NewCredit}) ->
+bump({From, MoreCredit}) ->
Credit = case get({credit_from, From}) of
- undefined -> NewCredit;
- 0 -> erase(credit_blocked),
- NewCredit;
- C -> C + NewCredit
+ undefined -> MoreCredit;
+ C -> C + MoreCredit
end,
- put({credit_from, From}, Credit).
+ put({credit_from, From}, Credit),
+ case Credit > 0 of
+ true -> erase(credit_blocked),
+ false;
+ false -> true
+ end.
-blocked() -> get(credit_blocked) =:= true.
+%% TODO we assume only one From can block at once. Is this true?
+blocked() ->
+ get(credit_blocked) =:= true.
consume(From) ->
- case get({credit_from, From}) of
- undefined -> ok;
- Credit -> case Credit of
- 1 -> put(credit_blocked, true);
- _ -> ok
- end,
- put({credit_from, From}, Credit - 1)
- end.
+ Credit = case get({credit_from, From}) of
+ undefined -> ?MAX_CREDIT;
+ C -> C
+ end - 1,
+ case Credit of
+ 0 -> put(credit_blocked, true);
+ _ -> ok
+ end,
+ put({credit_from, From}, Credit).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4ac387c5d8..9e3b58aabe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/1]).
+-export([conserve_memory/3, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -40,7 +40,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, blockers}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -137,8 +137,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
+conserve_memory(Blocker, Pid, Conserve) ->
+ Pid ! {conserve_memory, Blocker, Conserve},
ok.
server_properties(Protocol) ->
@@ -220,7 +220,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ blockers = sets:new()},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -276,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({conserve_memory, Blocker, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -361,24 +362,47 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+internal_conserve_memory(true, Blocker,
+ State = #v1{connection_state = running,
+ blockers = Blockers}) ->
+ 0 = sets:size(Blockers), %% ASSERT
+ State#v1{connection_state = blocking,
+ blockers = sets:add_element(Blocker, Blockers)};
+internal_conserve_memory(true, Blocker,
+ State = #v1{blockers = Blockers}) ->
+ State#v1{blockers = sets:add_element(Blocker, Blockers)};
+internal_conserve_memory(false, Blocker,
+ State = #v1{connection_state = blocking,
+ blockers = Blockers}) ->
+ NewBlockers = sets:del_element(Blocker, Blockers),
+ case sets:size(NewBlockers) of
+ 0 -> State#v1{connection_state = running,
+ blockers = NewBlockers};
+
+ _ -> State#v1{blockers = NewBlockers}
+ end;
+internal_conserve_memory(false, Blocker,
+ State = #v1{connection_state = blocked,
+ blockers = Blockers,
+ heartbeater = Heartbeater}) ->
+ NewBlockers = sets:del_element(Blocker, Blockers),
+ case sets:size(NewBlockers) of
+ 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#v1{connection_state = running,
+ blockers = NewBlockers};
+
+ _ -> State#v1{blockers = NewBlockers}
+ end;
+internal_conserve_memory(_Conserve, _Blocker, State) ->
State.
internal_bump_credit(Msg, State) ->
rabbit_flow:bump(Msg),
- internal_conserve_memory(false, State).
+ internal_conserve_memory(false, self(), State).
internal_check_credit(State) when ?IS_RUNNING(State) ->
case rabbit_flow:blocked() of
- true -> internal_conserve_memory(true, State);
+ true -> internal_conserve_memory(true, self(), State);
false -> State
end.
@@ -713,7 +737,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory,
+ [memory_alarm]}),
+ memory_alarm,
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 31f5ad14ea..47030232d5 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,6 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
+ [rabbit_flow:consume(QPid) || QPid <- QPids],
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};