summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_queue_mode_manager.erl38
3 files changed, 60 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 57269c5341..9d3cead654 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([constrain_memory/2, report_desired_memory/1]).
+-export([constrain_memory/2]).
-import(mnesia).
-import(gen_server2).
@@ -109,7 +109,6 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
--spec(report_desired_memory/1 :: (pid()) -> non_neg_integer()).
-endif.
@@ -357,9 +356,6 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-report_desired_memory(QPid) ->
- gen_server2:pcall(QPid, 9, report_desired_memory, infinity).
-
safe_pmap_ok(H, F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 084529a4af..19f6f308cd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,6 +37,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
+-define(MEMORY_REPORT_INTERVAL, 500).
-export([start_link/1]).
@@ -55,7 +56,10 @@
mixed_state,
next_msg_id,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ memory_report_counter,
+ old_memory_report
+ }).
-record(consumer, {tag, ack_required}).
@@ -104,7 +108,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
mixed_state = MS,
next_msg_id = 1,
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new(),
+ memory_report_counter = ?MEMORY_REPORT_INTERVAL,
+ old_memory_report = 1
+ }, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -121,9 +128,16 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState = #q { memory_report_counter = 0 }) ->
+ {reply, Reply, report_memory(NewState), ?HIBERNATE_AFTER};
+reply(Reply, NewState = #q { memory_report_counter = C }) ->
+ {reply, Reply, NewState #q { memory_report_counter = C - 1 },
+ ?HIBERNATE_AFTER}.
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+noreply(NewState = #q { memory_report_counter = 0}) ->
+ {noreply, report_memory(NewState), ?HIBERNATE_AFTER};
+noreply(NewState = #q { memory_report_counter = C}) ->
+ {noreply, NewState #q { memory_report_counter = C - 1 }, ?HIBERNATE_AFTER}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -524,6 +538,22 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
+report_memory(State = #q { old_memory_report = OldMem,
+ mixed_state = MS }) ->
+ MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
+ {memory, PSize} = process_info(self(), memory),
+ NewMem = case MSize + PSize of
+ 0 -> 1; %% avoid / 0
+ N -> N
+ end,
+ State1 = State #q { memory_report_counter = ?MEMORY_REPORT_INTERVAL },
+ case (NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1 of
+ true ->
+ rabbit_queue_mode_manager:report_memory(self(), NewMem),
+ State1 #q { old_memory_report = NewMem };
+ false -> State1
+ end.
+
%---------------------------------------------------------------------------
handle_call(info, _From, State) ->
@@ -720,12 +750,7 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(ok, State);
_ ->
reply(locked, State)
- end;
-
-handle_call(report_desired_memory, _From, State = #q { mixed_state = MS }) ->
- MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
- {memory, PSize} = process_info(self(), memory),
- reply(PSize + MSize, State).
+ end.
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -817,11 +842,17 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State) ->
+handle_info(timeout, State = #q { memory_report_counter = Count })
+ when Count == ?MEMORY_REPORT_INTERVAL ->
+ %% Have to do the +1 because the timeout below, with noreply, will -1
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
+handle_info(timeout, State) ->
+ State1 = report_memory(State),
+ noreply(State1 #q { memory_report_counter = 1 + ?MEMORY_REPORT_INTERVAL });
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index e317fedaa3..0e59f7d226 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -40,7 +40,7 @@
-export([register/1, change_memory_footprint/2,
reduce_memory_footprint/0, increase_memory_footprint/0,
- gather_memory_estimates/0
+ report_memory/2
]).
-define(SERVER, ?MODULE).
@@ -55,6 +55,7 @@
-spec(change_memory_footprint/2 :: (pid(), bool()) -> 'ok').
-spec(reduce_memory_footprint/0 :: () -> 'ok').
-spec(increase_memory_footprint/0 :: () -> 'ok').
+-spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -77,24 +78,24 @@ reduce_memory_footprint() ->
increase_memory_footprint() ->
gen_server2:cast(?SERVER, {change_memory_footprint, false}).
-gather_memory_estimates() ->
- gen_server2:cast(?SERVER, gather_memory_estimates).
+report_memory(Pid, Memory) ->
+ gen_server2:cast(?SERVER, {report_memory, Pid, Memory}).
init([]) ->
process_flag(trap_exit, true),
ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}),
- {ok, _TRef} = timer:apply_interval(5000, ?MODULE, gather_memory_estimates, []),
{ok, #state { mode = unlimited,
- queues = []
+ queues = dict:new()
}}.
handle_call({register, Pid}, _From,
State = #state { queues = Qs, mode = Mode }) ->
+ _MRef = erlang:monitor(process, Pid),
Result = case Mode of
unlimited -> mixed;
_ -> disk
end,
- {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
+ {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}.
handle_cast({change_memory_footprint, true},
State = #state { mode = disk_only }) ->
@@ -120,10 +121,13 @@ handle_cast({change_memory_footprint, false},
constrain_queues(false, State #state.queues),
{noreply, State #state { mode = ram_disk }};
-handle_cast(gather_memory_estimates, State) ->
- State1 = internal_gather(State),
- {noreply, State1}.
+handle_cast({report_memory, Pid, Memory}, State = #state { queues = Qs }) ->
+ io:format("Queue ~w requested ~w bytes~n", [Pid, Memory]),
+ {noreply, State #state { queues = dict:store(Pid, Memory, Qs) }}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state { queues = Qs }) ->
+ {noreply, State #state { queues = dict:erase(Pid, Qs) }};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(_Info, State) ->
@@ -136,15 +140,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
constrain_queues(Constrain, Qs) ->
- lists:foreach(
- fun (QPid) ->
- ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
- end, Qs).
-
-internal_gather(State = #state { queues = Qs }) ->
- lists:foreach(fun(Q) ->
- io:format("Queue memory request: ~w is ~w bytes~n",
- [Q, rabbit_amqqueue:report_desired_memory(Q)
- ])
- end, Qs),
- State.
+ dict:fold(
+ fun (QPid, _Mem, ok) ->
+ rabbit_amqqueue:constrain_memory(QPid, Constrain)
+ end, ok, Qs).