diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-23 16:25:50 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-23 16:25:50 +0100 |
| commit | e6b81396e4c6deed2c2fc59f056544dabae77a09 (patch) | |
| tree | 606ab3a2bcc609b63dbfab22a030a9d475b5ecbb /src | |
| parent | 657093d39de55913ef256c3cfe33547e3feb66e5 (diff) | |
| parent | 7b704ec3f85fc78405cc561ef12c71a7732404e1 (diff) | |
| download | rabbitmq-server-git-e6b81396e4c6deed2c2fc59f056544dabae77a09.tar.gz | |
Merge in default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 116 |
2 files changed, 128 insertions, 65 deletions
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 5aaa1b2df4..f153641ef3 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -23,16 +23,22 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, - set_check_interval/1, get_disk_free/0]). +-export([get_disk_free_limit/0, set_disk_free_limit/1, + get_min_check_interval/0, set_min_check_interval/1, + get_max_check_interval/0, set_max_check_interval/1, + get_disk_free/0]). -define(SERVER, ?MODULE). --define(DEFAULT_DISK_CHECK_INTERVAL, 10000). +-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). +-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). +%% 250MB/s i.e. 250kB/ms +-define(FAST_RATE, (250 * 1000)). -record(state, {dir, limit, actual, - timeout, + min_interval, + max_interval, timer, alarmed }). @@ -45,8 +51,10 @@ -spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()). -spec(get_disk_free_limit/0 :: () -> integer()). -spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok'). --spec(get_check_interval/0 :: () -> integer()). --spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_min_check_interval/0 :: () -> integer()). +-spec(set_min_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_max_check_interval/0 :: () -> integer()). +-spec(set_max_check_interval/1 :: (integer()) -> 'ok'). -spec(get_disk_free/0 :: () -> (integer() | 'unknown')). -endif. @@ -61,11 +69,17 @@ get_disk_free_limit() -> set_disk_free_limit(Limit) -> gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). -get_check_interval() -> - gen_server:call(?MODULE, get_check_interval, infinity). +get_min_check_interval() -> + gen_server:call(?MODULE, get_min_check_interval, infinity). -set_check_interval(Interval) -> - gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). +set_min_check_interval(Interval) -> + gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity). + +get_max_check_interval() -> + gen_server:call(?MODULE, get_max_check_interval, infinity). + +set_max_check_interval(Interval) -> + gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). get_disk_free() -> gen_server:call(?MODULE, get_disk_free, infinity). @@ -78,16 +92,15 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> - TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), Dir = dir(), - State = #state { dir = Dir, - timeout = ?DEFAULT_DISK_CHECK_INTERVAL, - timer = TRef, - alarmed = false}, + State = #state{dir = Dir, + min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, + max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, + alarmed = false}, case {catch get_disk_free(Dir), vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> - {ok, set_disk_limits(State, Limit)}; + {ok, start_timer(set_disk_limits(State, Limit))}; Err -> rabbit_log:info("Disabling disk free space monitoring " "on unsupported platform: ~p~n", [Err]), @@ -100,12 +113,17 @@ handle_call(get_disk_free_limit, _From, State) -> handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; +handle_call(get_min_check_interval, _From, State) -> + {reply, State#state.min_interval, State}; + +handle_call(get_max_check_interval, _From, State) -> + {reply, State#state.max_interval, State}; + +handle_call({set_min_check_interval, MinInterval}, _From, State) -> + {reply, ok, State#state{min_interval = MinInterval}}; -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call({set_max_check_interval, MaxInterval}, _From, State) -> + {reply, ok, State#state{max_interval = MaxInterval}}; handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> {reply, Actual, State}; @@ -117,7 +135,7 @@ handle_cast(_Request, State) -> {noreply, State}. handle_info(update, State) -> - {noreply, internal_update(State)}; + {noreply, start_timer(internal_update(State))}; handle_info(_Info, State) -> {noreply, State}. @@ -193,6 +211,15 @@ emit_update_info(StateStr, CurrentFree, Limit) -> "Disk free space ~s. Free bytes:~p Limit:~p~n", [StateStr, CurrentFree, Limit]). -start_timer(Timeout) -> - {ok, TRef} = timer:send_interval(Timeout, update), - TRef. +start_timer(State) -> + State#state{timer = erlang:send_after(interval(State), self(), update)}. + +interval(#state{alarmed = true, + max_interval = MaxInterval}) -> + MaxInterval; +interval(#state{limit = Limit, + actual = Actual, + min_interval = MinInterval, + max_interval = MaxInterval}) -> + IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, + trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index b8d8023ed3..2e9a820cb2 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -25,7 +25,7 @@ -behaviour(gen_server2). -export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0]). + report_ram_duration/2, stop/0, conserve_resources/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,8 @@ queue_durations, %% ets #process queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum - desired_duration %% the desired queue duration + desired_duration, %% the desired queue duration + disk_alarm %% disable paging, disk alarm has fired }). -define(SERVER, ?MODULE). @@ -86,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) -> stop() -> gen_server2:cast(?SERVER, stop). +conserve_resources(Pid, disk, Conserve) -> + gen_server2:cast(Pid, {disk_alarm, Conserve}); +conserve_resources(_Pid, _Source, _Conserve) -> + ok. + %%---------------------------------------------------------------------------- %% Gen_server callbacks %%---------------------------------------------------------------------------- @@ -94,13 +100,14 @@ init([]) -> {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update), Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), - + Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), {ok, internal_update( #state { timer = TRef, queue_durations = Ets, queue_duration_sum = 0.0, queue_duration_count = 0, - desired_duration = infinity })}. + desired_duration = infinity, + disk_alarm = lists:member(disk, Alarms)})}. handle_call({report_ram_duration, Pid, QueueDuration}, From, State = #state { queue_duration_sum = Sum, @@ -137,6 +144,12 @@ handle_call({register, Pid, MFA}, _From, handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) -> + {noreply, State}; + +handle_cast({disk_alarm, Alarm}, State) -> + {noreply, internal_update(State#state{disk_alarm = Alarm})}; + handle_cast({deregister, Pid}, State) -> {noreply, internal_deregister(Pid, true, State)}; @@ -192,10 +205,18 @@ internal_deregister(Pid, Demonitor, queue_duration_count = Count1 } end. -internal_update(State = #state { queue_durations = Durations, - desired_duration = DesiredDurationAvg, - queue_duration_sum = Sum, - queue_duration_count = Count }) -> +internal_update(State = #state{queue_durations = Durations, + desired_duration = DesiredDurationAvg, + disk_alarm = DiskAlarm}) -> + DesiredDurationAvg1 = desired_duration_average(State), + State1 = State#state{desired_duration = DesiredDurationAvg1}, + maybe_inform_queues( + DiskAlarm, DesiredDurationAvg, DesiredDurationAvg1, Durations), + State1. + +desired_duration_average(#state{queue_duration_sum = Sum, + queue_duration_count = Count, + disk_alarm = DiskAlarm}) -> {ok, LimitThreshold} = application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), MemoryLimit = vm_memory_monitor:get_memory_limit(), @@ -203,46 +224,61 @@ internal_update(State = #state { queue_durations = Durations, true -> erlang:memory(total) / MemoryLimit; false -> infinity end, - DesiredDurationAvg1 = - if MemoryRatio =:= infinity -> - 0.0; - MemoryRatio < LimitThreshold orelse Count == 0 -> - infinity; - MemoryRatio < ?SUM_INC_THRESHOLD -> - ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; - true -> - (Sum / Count) / MemoryRatio - end, - State1 = State #state { desired_duration = DesiredDurationAvg1 }, + if DiskAlarm -> + infinity; + MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < LimitThreshold orelse Count == 0 -> + infinity; + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio + end. +maybe_inform_queues(false, DesiredDurationAvg, DesiredDurationAvg1, + Durations) -> %% only inform queues immediately if the desired duration has %% decreased case DesiredDurationAvg1 == infinity orelse (DesiredDurationAvg /= infinity andalso DesiredDurationAvg1 >= DesiredDurationAvg) of - true -> - ok; - false -> - true = - ets:foldl( - fun (Proc = #process { reported = QueueDuration, - sent = PrevSendDuration, - callback = {M, F, A} }, true) -> - case should_send(QueueDuration, PrevSendDuration, - DesiredDurationAvg1) of - true -> ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg1]), - ets:insert( - Durations, - Proc #process { - sent = DesiredDurationAvg1}); - false -> true - end - end, true, Durations) - end, - State1. + true -> ok; + false -> inform_queues(DesiredDurationAvg1, Durations, + fun should_send/3) + end; +maybe_inform_queues(true, DesiredDurationAvg, DesiredDurationAvg1, + Durations) -> + case DesiredDurationAvg1 == infinity andalso + DesiredDurationAvg /= infinity of + true -> inform_queues(DesiredDurationAvg1, Durations, + fun should_send_disk_alarm/3); + false -> ok + end. + +inform_queues(DesiredDurationAvg1, Durations, If) -> + true = + ets:foldl( + fun (Proc = #process{reported = QueueDuration, + sent = PrevSendDuration, + callback = {M, F, A}}, true) -> + case If(QueueDuration, PrevSendDuration, + DesiredDurationAvg1) of + true -> ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc#process{sent = DesiredDurationAvg1}); + false -> true + end + end, true, Durations). + should_send(infinity, infinity, _) -> true; should_send(infinity, D, DD) -> DD < D; should_send(D, infinity, DD) -> DD < D; should_send(D1, D2, DD) -> DD < lists:min([D1, D2]). + +should_send_disk_alarm(_, infinity, _) -> false; +should_send_disk_alarm(_, _, infinity) -> true; +should_send_disk_alarm(_, _, _) -> false. |
