diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-19 11:28:11 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-19 11:28:11 +0100 |
| commit | a1a0c042f5ce52807960287b1f44d7da03223a44 (patch) | |
| tree | b7772a65e410992a09754a1704d96b1857759fb5 | |
| parent | 58ea63ab02b7bebd9c442e018bc32ed1955095ba (diff) | |
| download | rabbitmq-server-git-a1a0c042f5ce52807960287b1f44d7da03223a44.tar.gz | |
Reworked reporting of memory requests so that the queues are proactive about deciding when to report. This isn't quite good enough though because GC means the memory size fluctuates too much. Need to switch to just grabbing the size of the messages in the queue.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 38 |
3 files changed, 60 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 57269c5341..9d3cead654 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([constrain_memory/2, report_desired_memory/1]). +-export([constrain_memory/2]). -import(mnesia). -import(gen_server2). @@ -109,7 +109,6 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). --spec(report_desired_memory/1 :: (pid()) -> non_neg_integer()). -endif. @@ -357,9 +356,6 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -report_desired_memory(QPid) -> - gen_server2:pcall(QPid, 9, report_desired_memory, infinity). - safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 084529a4af..19f6f308cd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -37,6 +37,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). +-define(MEMORY_REPORT_INTERVAL, 500). -export([start_link/1]). @@ -55,7 +56,10 @@ mixed_state, next_msg_id, active_consumers, - blocked_consumers}). + blocked_consumers, + memory_report_counter, + old_memory_report + }). -record(consumer, {tag, ack_required}). @@ -104,7 +108,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> mixed_state = MS, next_msg_id = 1, active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new(), + memory_report_counter = ?MEMORY_REPORT_INTERVAL, + old_memory_report = 1 + }, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -121,9 +128,16 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState = #q { memory_report_counter = 0 }) -> + {reply, Reply, report_memory(NewState), ?HIBERNATE_AFTER}; +reply(Reply, NewState = #q { memory_report_counter = C }) -> + {reply, Reply, NewState #q { memory_report_counter = C - 1 }, + ?HIBERNATE_AFTER}. -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +noreply(NewState = #q { memory_report_counter = 0}) -> + {noreply, report_memory(NewState), ?HIBERNATE_AFTER}; +noreply(NewState = #q { memory_report_counter = C}) -> + {noreply, NewState #q { memory_report_counter = C - 1 }, ?HIBERNATE_AFTER}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -524,6 +538,22 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). +report_memory(State = #q { old_memory_report = OldMem, + mixed_state = MS }) -> + MSize = rabbit_mixed_queue:estimate_extra_memory(MS), + {memory, PSize} = process_info(self(), memory), + NewMem = case MSize + PSize of + 0 -> 1; %% avoid / 0 + N -> N + end, + State1 = State #q { memory_report_counter = ?MEMORY_REPORT_INTERVAL }, + case (NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1 of + true -> + rabbit_queue_mode_manager:report_memory(self(), NewMem), + State1 #q { old_memory_report = NewMem }; + false -> State1 + end. + %--------------------------------------------------------------------------- handle_call(info, _From, State) -> @@ -720,12 +750,7 @@ handle_call({claim_queue, ReaderPid}, _From, reply(ok, State); _ -> reply(locked, State) - end; - -handle_call(report_desired_memory, _From, State = #q { mixed_state = MS }) -> - MSize = rabbit_mixed_queue:estimate_extra_memory(MS), - {memory, PSize} = process_info(self(), memory), - reply(PSize + MSize, State). + end. handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -817,11 +842,17 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> +handle_info(timeout, State = #q { memory_report_counter = Count }) + when Count == ?MEMORY_REPORT_INTERVAL -> + %% Have to do the +1 because the timeout below, with noreply, will -1 %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); +handle_info(timeout, State) -> + State1 = report_memory(State), + noreply(State1 #q { memory_report_counter = 1 + ?MEMORY_REPORT_INTERVAL }); + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index e317fedaa3..0e59f7d226 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -40,7 +40,7 @@ -export([register/1, change_memory_footprint/2, reduce_memory_footprint/0, increase_memory_footprint/0, - gather_memory_estimates/0 + report_memory/2 ]). -define(SERVER, ?MODULE). @@ -55,6 +55,7 @@ -spec(change_memory_footprint/2 :: (pid(), bool()) -> 'ok'). -spec(reduce_memory_footprint/0 :: () -> 'ok'). -spec(increase_memory_footprint/0 :: () -> 'ok'). +-spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. @@ -77,24 +78,24 @@ reduce_memory_footprint() -> increase_memory_footprint() -> gen_server2:cast(?SERVER, {change_memory_footprint, false}). -gather_memory_estimates() -> - gen_server2:cast(?SERVER, gather_memory_estimates). +report_memory(Pid, Memory) -> + gen_server2:cast(?SERVER, {report_memory, Pid, Memory}). init([]) -> process_flag(trap_exit, true), ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}), - {ok, _TRef} = timer:apply_interval(5000, ?MODULE, gather_memory_estimates, []), {ok, #state { mode = unlimited, - queues = [] + queues = dict:new() }}. handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) -> + _MRef = erlang:monitor(process, Pid), Result = case Mode of unlimited -> mixed; _ -> disk end, - {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}. + {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}. handle_cast({change_memory_footprint, true}, State = #state { mode = disk_only }) -> @@ -120,10 +121,13 @@ handle_cast({change_memory_footprint, false}, constrain_queues(false, State #state.queues), {noreply, State #state { mode = ram_disk }}; -handle_cast(gather_memory_estimates, State) -> - State1 = internal_gather(State), - {noreply, State1}. +handle_cast({report_memory, Pid, Memory}, State = #state { queues = Qs }) -> + io:format("Queue ~w requested ~w bytes~n", [Pid, Memory]), + {noreply, State #state { queues = dict:store(Pid, Memory, Qs) }}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state { queues = Qs }) -> + {noreply, State #state { queues = dict:erase(Pid, Qs) }}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(_Info, State) -> @@ -136,15 +140,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. constrain_queues(Constrain, Qs) -> - lists:foreach( - fun (QPid) -> - ok = rabbit_amqqueue:constrain_memory(QPid, Constrain) - end, Qs). - -internal_gather(State = #state { queues = Qs }) -> - lists:foreach(fun(Q) -> - io:format("Queue memory request: ~w is ~w bytes~n", - [Q, rabbit_amqqueue:report_desired_memory(Q) - ]) - end, Qs), - State. + dict:fold( + fun (QPid, _Mem, ok) -> + rabbit_amqqueue:constrain_memory(QPid, Constrain) + end, ok, Qs). |
