summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-04 16:02:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-04 16:02:09 +0100
commit0dba1f88d2b173c92525be4c4826a70cc6c15e68 (patch)
treecb17dea6c0b273510f4446e09b3795a93ef2f8cb
parenta0c79f5d1f5941c9cb855f90180b071b23f0ef90 (diff)
downloadrabbitmq-server-git-0dba1f88d2b173c92525be4c4826a70cc6c15e68.tar.gz
Don't page when the disk alarm has gone off.
-rw-r--r--src/rabbit_memory_monitor.erl116
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index b8d8023ed3..2e9a820cb2 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -25,7 +25,7 @@
-behaviour(gen_server2).
-export([start_link/0, register/2, deregister/1,
- report_ram_duration/2, stop/0]).
+ report_ram_duration/2, stop/0, conserve_resources/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,7 +36,8 @@
queue_durations, %% ets #process
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
- desired_duration %% the desired queue duration
+ desired_duration, %% the desired queue duration
+ disk_alarm %% disable paging, disk alarm has fired
}).
-define(SERVER, ?MODULE).
@@ -86,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
+conserve_resources(Pid, disk, Conserve) ->
+ gen_server2:cast(Pid, {disk_alarm, Conserve});
+conserve_resources(_Pid, _Source, _Conserve) ->
+ ok.
+
%%----------------------------------------------------------------------------
%% Gen_server callbacks
%%----------------------------------------------------------------------------
@@ -94,13 +100,14 @@ init([]) ->
{ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update),
Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
-
+ Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
{ok, internal_update(
#state { timer = TRef,
queue_durations = Ets,
queue_duration_sum = 0.0,
queue_duration_count = 0,
- desired_duration = infinity })}.
+ desired_duration = infinity,
+ disk_alarm = lists:member(disk, Alarms)})}.
handle_call({report_ram_duration, Pid, QueueDuration}, From,
State = #state { queue_duration_sum = Sum,
@@ -137,6 +144,12 @@ handle_call({register, Pid, MFA}, _From,
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) ->
+ {noreply, State};
+
+handle_cast({disk_alarm, Alarm}, State) ->
+ {noreply, internal_update(State#state{disk_alarm = Alarm})};
+
handle_cast({deregister, Pid}, State) ->
{noreply, internal_deregister(Pid, true, State)};
@@ -192,10 +205,18 @@ internal_deregister(Pid, Demonitor,
queue_duration_count = Count1 }
end.
-internal_update(State = #state { queue_durations = Durations,
- desired_duration = DesiredDurationAvg,
- queue_duration_sum = Sum,
- queue_duration_count = Count }) ->
+internal_update(State = #state{queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ disk_alarm = DiskAlarm}) ->
+ DesiredDurationAvg1 = desired_duration_average(State),
+ State1 = State#state{desired_duration = DesiredDurationAvg1},
+ maybe_inform_queues(
+ DiskAlarm, DesiredDurationAvg, DesiredDurationAvg1, Durations),
+ State1.
+
+desired_duration_average(#state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ disk_alarm = DiskAlarm}) ->
{ok, LimitThreshold} =
application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
MemoryLimit = vm_memory_monitor:get_memory_limit(),
@@ -203,46 +224,61 @@ internal_update(State = #state { queue_durations = Durations,
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
end,
- DesiredDurationAvg1 =
- if MemoryRatio =:= infinity ->
- 0.0;
- MemoryRatio < LimitThreshold orelse Count == 0 ->
- infinity;
- MemoryRatio < ?SUM_INC_THRESHOLD ->
- ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
- true ->
- (Sum / Count) / MemoryRatio
- end,
- State1 = State #state { desired_duration = DesiredDurationAvg1 },
+ if DiskAlarm ->
+ infinity;
+ MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < LimitThreshold orelse Count == 0 ->
+ infinity;
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
+ end.
+maybe_inform_queues(false, DesiredDurationAvg, DesiredDurationAvg1,
+ Durations) ->
%% only inform queues immediately if the desired duration has
%% decreased
case DesiredDurationAvg1 == infinity orelse
(DesiredDurationAvg /= infinity andalso
DesiredDurationAvg1 >= DesiredDurationAvg) of
- true ->
- ok;
- false ->
- true =
- ets:foldl(
- fun (Proc = #process { reported = QueueDuration,
- sent = PrevSendDuration,
- callback = {M, F, A} }, true) ->
- case should_send(QueueDuration, PrevSendDuration,
- DesiredDurationAvg1) of
- true -> ok = erlang:apply(
- M, F, A ++ [DesiredDurationAvg1]),
- ets:insert(
- Durations,
- Proc #process {
- sent = DesiredDurationAvg1});
- false -> true
- end
- end, true, Durations)
- end,
- State1.
+ true -> ok;
+ false -> inform_queues(DesiredDurationAvg1, Durations,
+ fun should_send/3)
+ end;
+maybe_inform_queues(true, DesiredDurationAvg, DesiredDurationAvg1,
+ Durations) ->
+ case DesiredDurationAvg1 == infinity andalso
+ DesiredDurationAvg /= infinity of
+ true -> inform_queues(DesiredDurationAvg1, Durations,
+ fun should_send_disk_alarm/3);
+ false -> ok
+ end.
+
+inform_queues(DesiredDurationAvg1, Durations, If) ->
+ true =
+ ets:foldl(
+ fun (Proc = #process{reported = QueueDuration,
+ sent = PrevSendDuration,
+ callback = {M, F, A}}, true) ->
+ case If(QueueDuration, PrevSendDuration,
+ DesiredDurationAvg1) of
+ true -> ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc#process{sent = DesiredDurationAvg1});
+ false -> true
+ end
+ end, true, Durations).
+
should_send(infinity, infinity, _) -> true;
should_send(infinity, D, DD) -> DD < D;
should_send(D, infinity, DD) -> DD < D;
should_send(D1, D2, DD) -> DD < lists:min([D1, D2]).
+
+should_send_disk_alarm(_, infinity, _) -> false;
+should_send_disk_alarm(_, _, infinity) -> true;
+should_send_disk_alarm(_, _, _) -> false.