diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-04 16:02:09 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-04 16:02:09 +0100 |
| commit | 0dba1f88d2b173c92525be4c4826a70cc6c15e68 (patch) | |
| tree | cb17dea6c0b273510f4446e09b3795a93ef2f8cb | |
| parent | a0c79f5d1f5941c9cb855f90180b071b23f0ef90 (diff) | |
| download | rabbitmq-server-git-0dba1f88d2b173c92525be4c4826a70cc6c15e68.tar.gz | |
Don't page when the disk alarm has gone off.
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 116 |
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index b8d8023ed3..2e9a820cb2 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -25,7 +25,7 @@ -behaviour(gen_server2). -export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0]). + report_ram_duration/2, stop/0, conserve_resources/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,8 @@ queue_durations, %% ets #process queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum - desired_duration %% the desired queue duration + desired_duration, %% the desired queue duration + disk_alarm %% disable paging, disk alarm has fired }). -define(SERVER, ?MODULE). @@ -86,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) -> stop() -> gen_server2:cast(?SERVER, stop). +conserve_resources(Pid, disk, Conserve) -> + gen_server2:cast(Pid, {disk_alarm, Conserve}); +conserve_resources(_Pid, _Source, _Conserve) -> + ok. + %%---------------------------------------------------------------------------- %% Gen_server callbacks %%---------------------------------------------------------------------------- @@ -94,13 +100,14 @@ init([]) -> {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update), Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), - + Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), {ok, internal_update( #state { timer = TRef, queue_durations = Ets, queue_duration_sum = 0.0, queue_duration_count = 0, - desired_duration = infinity })}. + desired_duration = infinity, + disk_alarm = lists:member(disk, Alarms)})}. handle_call({report_ram_duration, Pid, QueueDuration}, From, State = #state { queue_duration_sum = Sum, @@ -137,6 +144,12 @@ handle_call({register, Pid, MFA}, _From, handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) -> + {noreply, State}; + +handle_cast({disk_alarm, Alarm}, State) -> + {noreply, internal_update(State#state{disk_alarm = Alarm})}; + handle_cast({deregister, Pid}, State) -> {noreply, internal_deregister(Pid, true, State)}; @@ -192,10 +205,18 @@ internal_deregister(Pid, Demonitor, queue_duration_count = Count1 } end. -internal_update(State = #state { queue_durations = Durations, - desired_duration = DesiredDurationAvg, - queue_duration_sum = Sum, - queue_duration_count = Count }) -> +internal_update(State = #state{queue_durations = Durations, + desired_duration = DesiredDurationAvg, + disk_alarm = DiskAlarm}) -> + DesiredDurationAvg1 = desired_duration_average(State), + State1 = State#state{desired_duration = DesiredDurationAvg1}, + maybe_inform_queues( + DiskAlarm, DesiredDurationAvg, DesiredDurationAvg1, Durations), + State1. + +desired_duration_average(#state{queue_duration_sum = Sum, + queue_duration_count = Count, + disk_alarm = DiskAlarm}) -> {ok, LimitThreshold} = application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), MemoryLimit = vm_memory_monitor:get_memory_limit(), @@ -203,46 +224,61 @@ internal_update(State = #state { queue_durations = Durations, true -> erlang:memory(total) / MemoryLimit; false -> infinity end, - DesiredDurationAvg1 = - if MemoryRatio =:= infinity -> - 0.0; - MemoryRatio < LimitThreshold orelse Count == 0 -> - infinity; - MemoryRatio < ?SUM_INC_THRESHOLD -> - ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; - true -> - (Sum / Count) / MemoryRatio - end, - State1 = State #state { desired_duration = DesiredDurationAvg1 }, + if DiskAlarm -> + infinity; + MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < LimitThreshold orelse Count == 0 -> + infinity; + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio + end. +maybe_inform_queues(false, DesiredDurationAvg, DesiredDurationAvg1, + Durations) -> %% only inform queues immediately if the desired duration has %% decreased case DesiredDurationAvg1 == infinity orelse (DesiredDurationAvg /= infinity andalso DesiredDurationAvg1 >= DesiredDurationAvg) of - true -> - ok; - false -> - true = - ets:foldl( - fun (Proc = #process { reported = QueueDuration, - sent = PrevSendDuration, - callback = {M, F, A} }, true) -> - case should_send(QueueDuration, PrevSendDuration, - DesiredDurationAvg1) of - true -> ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg1]), - ets:insert( - Durations, - Proc #process { - sent = DesiredDurationAvg1}); - false -> true - end - end, true, Durations) - end, - State1. + true -> ok; + false -> inform_queues(DesiredDurationAvg1, Durations, + fun should_send/3) + end; +maybe_inform_queues(true, DesiredDurationAvg, DesiredDurationAvg1, + Durations) -> + case DesiredDurationAvg1 == infinity andalso + DesiredDurationAvg /= infinity of + true -> inform_queues(DesiredDurationAvg1, Durations, + fun should_send_disk_alarm/3); + false -> ok + end. + +inform_queues(DesiredDurationAvg1, Durations, If) -> + true = + ets:foldl( + fun (Proc = #process{reported = QueueDuration, + sent = PrevSendDuration, + callback = {M, F, A}}, true) -> + case If(QueueDuration, PrevSendDuration, + DesiredDurationAvg1) of + true -> ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc#process{sent = DesiredDurationAvg1}); + false -> true + end + end, true, Durations). + should_send(infinity, infinity, _) -> true; should_send(infinity, D, DD) -> DD < D; should_send(D, infinity, DD) -> DD < D; should_send(D1, D2, DD) -> DD < lists:min([D1, D2]). + +should_send_disk_alarm(_, infinity, _) -> false; +should_send_disk_alarm(_, _, infinity) -> true; +should_send_disk_alarm(_, _, _) -> false. |
