summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-19 11:28:11 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-19 11:28:11 +0100
commita1a0c042f5ce52807960287b1f44d7da03223a44 (patch)
treeb7772a65e410992a09754a1704d96b1857759fb5
parent58ea63ab02b7bebd9c442e018bc32ed1955095ba (diff)
downloadrabbitmq-server-git-a1a0c042f5ce52807960287b1f44d7da03223a44.tar.gz
Reworked reporting of memory requests so that the queues are proactive about deciding when to report. This isn't quite good enough though because GC means the memory size fluctuates too much. Need to switch to just grabbing the size of the messages in the queue.
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_queue_mode_manager.erl38
3 files changed, 60 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 57269c5341..9d3cead654 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([constrain_memory/2, report_desired_memory/1]).
+-export([constrain_memory/2]).
-import(mnesia).
-import(gen_server2).
@@ -109,7 +109,6 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
--spec(report_desired_memory/1 :: (pid()) -> non_neg_integer()).
-endif.
@@ -357,9 +356,6 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-report_desired_memory(QPid) ->
- gen_server2:pcall(QPid, 9, report_desired_memory, infinity).
-
safe_pmap_ok(H, F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 084529a4af..19f6f308cd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,6 +37,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
+-define(MEMORY_REPORT_INTERVAL, 500).
-export([start_link/1]).
@@ -55,7 +56,10 @@
mixed_state,
next_msg_id,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ memory_report_counter,
+ old_memory_report
+ }).
-record(consumer, {tag, ack_required}).
@@ -104,7 +108,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
mixed_state = MS,
next_msg_id = 1,
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new(),
+ memory_report_counter = ?MEMORY_REPORT_INTERVAL,
+ old_memory_report = 1
+ }, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -121,9 +128,16 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState = #q { memory_report_counter = 0 }) ->
+ {reply, Reply, report_memory(NewState), ?HIBERNATE_AFTER};
+reply(Reply, NewState = #q { memory_report_counter = C }) ->
+ {reply, Reply, NewState #q { memory_report_counter = C - 1 },
+ ?HIBERNATE_AFTER}.
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+noreply(NewState = #q { memory_report_counter = 0}) ->
+ {noreply, report_memory(NewState), ?HIBERNATE_AFTER};
+noreply(NewState = #q { memory_report_counter = C}) ->
+ {noreply, NewState #q { memory_report_counter = C - 1 }, ?HIBERNATE_AFTER}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -524,6 +538,22 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
+report_memory(State = #q { old_memory_report = OldMem,
+ mixed_state = MS }) ->
+ MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
+ {memory, PSize} = process_info(self(), memory),
+ NewMem = case MSize + PSize of
+ 0 -> 1; %% avoid / 0
+ N -> N
+ end,
+ State1 = State #q { memory_report_counter = ?MEMORY_REPORT_INTERVAL },
+ case (NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1 of
+ true ->
+ rabbit_queue_mode_manager:report_memory(self(), NewMem),
+ State1 #q { old_memory_report = NewMem };
+ false -> State1
+ end.
+
%---------------------------------------------------------------------------
handle_call(info, _From, State) ->
@@ -720,12 +750,7 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(ok, State);
_ ->
reply(locked, State)
- end;
-
-handle_call(report_desired_memory, _From, State = #q { mixed_state = MS }) ->
- MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
- {memory, PSize} = process_info(self(), memory),
- reply(PSize + MSize, State).
+ end.
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -817,11 +842,17 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State) ->
+handle_info(timeout, State = #q { memory_report_counter = Count })
+ when Count == ?MEMORY_REPORT_INTERVAL ->
+ %% Have to do the +1 because the timeout below, with noreply, will -1
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
+handle_info(timeout, State) ->
+ State1 = report_memory(State),
+ noreply(State1 #q { memory_report_counter = 1 + ?MEMORY_REPORT_INTERVAL });
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index e317fedaa3..0e59f7d226 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -40,7 +40,7 @@
-export([register/1, change_memory_footprint/2,
reduce_memory_footprint/0, increase_memory_footprint/0,
- gather_memory_estimates/0
+ report_memory/2
]).
-define(SERVER, ?MODULE).
@@ -55,6 +55,7 @@
-spec(change_memory_footprint/2 :: (pid(), bool()) -> 'ok').
-spec(reduce_memory_footprint/0 :: () -> 'ok').
-spec(increase_memory_footprint/0 :: () -> 'ok').
+-spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -77,24 +78,24 @@ reduce_memory_footprint() ->
increase_memory_footprint() ->
gen_server2:cast(?SERVER, {change_memory_footprint, false}).
-gather_memory_estimates() ->
- gen_server2:cast(?SERVER, gather_memory_estimates).
+report_memory(Pid, Memory) ->
+ gen_server2:cast(?SERVER, {report_memory, Pid, Memory}).
init([]) ->
process_flag(trap_exit, true),
ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}),
- {ok, _TRef} = timer:apply_interval(5000, ?MODULE, gather_memory_estimates, []),
{ok, #state { mode = unlimited,
- queues = []
+ queues = dict:new()
}}.
handle_call({register, Pid}, _From,
State = #state { queues = Qs, mode = Mode }) ->
+ _MRef = erlang:monitor(process, Pid),
Result = case Mode of
unlimited -> mixed;
_ -> disk
end,
- {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
+ {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}.
handle_cast({change_memory_footprint, true},
State = #state { mode = disk_only }) ->
@@ -120,10 +121,13 @@ handle_cast({change_memory_footprint, false},
constrain_queues(false, State #state.queues),
{noreply, State #state { mode = ram_disk }};
-handle_cast(gather_memory_estimates, State) ->
- State1 = internal_gather(State),
- {noreply, State1}.
+handle_cast({report_memory, Pid, Memory}, State = #state { queues = Qs }) ->
+ io:format("Queue ~w requested ~w bytes~n", [Pid, Memory]),
+ {noreply, State #state { queues = dict:store(Pid, Memory, Qs) }}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state { queues = Qs }) ->
+ {noreply, State #state { queues = dict:erase(Pid, Qs) }};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(_Info, State) ->
@@ -136,15 +140,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
constrain_queues(Constrain, Qs) ->
- lists:foreach(
- fun (QPid) ->
- ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
- end, Qs).
-
-internal_gather(State = #state { queues = Qs }) ->
- lists:foreach(fun(Q) ->
- io:format("Queue memory request: ~w is ~w bytes~n",
- [Q, rabbit_amqqueue:report_desired_memory(Q)
- ])
- end, Qs),
- State.
+ dict:fold(
+ fun (QPid, _Mem, ok) ->
+ rabbit_amqqueue:constrain_memory(QPid, Constrain)
+ end, ok, Qs).