summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-23 16:25:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-23 16:25:50 +0100
commite6b81396e4c6deed2c2fc59f056544dabae77a09 (patch)
tree606ab3a2bcc609b63dbfab22a030a9d475b5ecbb
parent657093d39de55913ef256c3cfe33547e3feb66e5 (diff)
parent7b704ec3f85fc78405cc561ef12c71a7732404e1 (diff)
downloadrabbitmq-server-git-e6b81396e4c6deed2c2fc59f056544dabae77a09.tar.gz
Merge in default
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit_disk_monitor.erl77
-rw-r--r--src/rabbit_memory_monitor.erl116
3 files changed, 129 insertions, 66 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7d0766f898..6ee0115b67 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,7 +19,7 @@
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
{vm_memory_high_watermark_paging_ratio, 0.5},
- {disk_free_limit, 1000000000}, %% 1GB
+ {disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 5aaa1b2df4..f153641ef3 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -23,16 +23,22 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0,
- set_check_interval/1, get_disk_free/0]).
+-export([get_disk_free_limit/0, set_disk_free_limit/1,
+ get_min_check_interval/0, set_min_check_interval/1,
+ get_max_check_interval/0, set_max_check_interval/1,
+ get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
+-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
+-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000).
+%% 250MB/s i.e. 250kB/ms
+-define(FAST_RATE, (250 * 1000)).
-record(state, {dir,
limit,
actual,
- timeout,
+ min_interval,
+ max_interval,
timer,
alarmed
}).
@@ -45,8 +51,10 @@
-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()).
-spec(get_disk_free_limit/0 :: () -> integer()).
-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok').
--spec(get_check_interval/0 :: () -> integer()).
--spec(set_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_min_check_interval/0 :: () -> integer()).
+-spec(set_min_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_max_check_interval/0 :: () -> integer()).
+-spec(set_max_check_interval/1 :: (integer()) -> 'ok').
-spec(get_disk_free/0 :: () -> (integer() | 'unknown')).
-endif.
@@ -61,11 +69,17 @@ get_disk_free_limit() ->
set_disk_free_limit(Limit) ->
gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
-get_check_interval() ->
- gen_server:call(?MODULE, get_check_interval, infinity).
+get_min_check_interval() ->
+ gen_server:call(?MODULE, get_min_check_interval, infinity).
-set_check_interval(Interval) ->
- gen_server:call(?MODULE, {set_check_interval, Interval}, infinity).
+set_min_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity).
+
+get_max_check_interval() ->
+ gen_server:call(?MODULE, get_max_check_interval, infinity).
+
+set_max_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
get_disk_free() ->
gen_server:call(?MODULE, get_disk_free, infinity).
@@ -78,16 +92,15 @@ start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
init([Limit]) ->
- TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL),
Dir = dir(),
- State = #state { dir = Dir,
- timeout = ?DEFAULT_DISK_CHECK_INTERVAL,
- timer = TRef,
- alarmed = false},
+ State = #state{dir = Dir,
+ min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
+ max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
+ alarmed = false},
case {catch get_disk_free(Dir),
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
- {ok, set_disk_limits(State, Limit)};
+ {ok, start_timer(set_disk_limits(State, Limit))};
Err ->
rabbit_log:info("Disabling disk free space monitoring "
"on unsupported platform: ~p~n", [Err]),
@@ -100,12 +113,17 @@ handle_call(get_disk_free_limit, _From, State) ->
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
+handle_call(get_min_check_interval, _From, State) ->
+ {reply, State#state.min_interval, State};
+
+handle_call(get_max_check_interval, _From, State) ->
+ {reply, State#state.max_interval, State};
+
+handle_call({set_min_check_interval, MinInterval}, _From, State) ->
+ {reply, ok, State#state{min_interval = MinInterval}};
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+handle_call({set_max_check_interval, MaxInterval}, _From, State) ->
+ {reply, ok, State#state{max_interval = MaxInterval}};
handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
{reply, Actual, State};
@@ -117,7 +135,7 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info(update, State) ->
- {noreply, internal_update(State)};
+ {noreply, start_timer(internal_update(State))};
handle_info(_Info, State) ->
{noreply, State}.
@@ -193,6 +211,15 @@ emit_update_info(StateStr, CurrentFree, Limit) ->
"Disk free space ~s. Free bytes:~p Limit:~p~n",
[StateStr, CurrentFree, Limit]).
-start_timer(Timeout) ->
- {ok, TRef} = timer:send_interval(Timeout, update),
- TRef.
+start_timer(State) ->
+ State#state{timer = erlang:send_after(interval(State), self(), update)}.
+
+interval(#state{alarmed = true,
+ max_interval = MaxInterval}) ->
+ MaxInterval;
+interval(#state{limit = Limit,
+ actual = Actual,
+ min_interval = MinInterval,
+ max_interval = MaxInterval}) ->
+ IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
+ trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index b8d8023ed3..2e9a820cb2 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -25,7 +25,7 @@
-behaviour(gen_server2).
-export([start_link/0, register/2, deregister/1,
- report_ram_duration/2, stop/0]).
+ report_ram_duration/2, stop/0, conserve_resources/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,7 +36,8 @@
queue_durations, %% ets #process
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
- desired_duration %% the desired queue duration
+ desired_duration, %% the desired queue duration
+ disk_alarm %% disable paging, disk alarm has fired
}).
-define(SERVER, ?MODULE).
@@ -86,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
+conserve_resources(Pid, disk, Conserve) ->
+ gen_server2:cast(Pid, {disk_alarm, Conserve});
+conserve_resources(_Pid, _Source, _Conserve) ->
+ ok.
+
%%----------------------------------------------------------------------------
%% Gen_server callbacks
%%----------------------------------------------------------------------------
@@ -94,13 +100,14 @@ init([]) ->
{ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update),
Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
-
+ Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
{ok, internal_update(
#state { timer = TRef,
queue_durations = Ets,
queue_duration_sum = 0.0,
queue_duration_count = 0,
- desired_duration = infinity })}.
+ desired_duration = infinity,
+ disk_alarm = lists:member(disk, Alarms)})}.
handle_call({report_ram_duration, Pid, QueueDuration}, From,
State = #state { queue_duration_sum = Sum,
@@ -137,6 +144,12 @@ handle_call({register, Pid, MFA}, _From,
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) ->
+ {noreply, State};
+
+handle_cast({disk_alarm, Alarm}, State) ->
+ {noreply, internal_update(State#state{disk_alarm = Alarm})};
+
handle_cast({deregister, Pid}, State) ->
{noreply, internal_deregister(Pid, true, State)};
@@ -192,10 +205,18 @@ internal_deregister(Pid, Demonitor,
queue_duration_count = Count1 }
end.
-internal_update(State = #state { queue_durations = Durations,
- desired_duration = DesiredDurationAvg,
- queue_duration_sum = Sum,
- queue_duration_count = Count }) ->
+internal_update(State = #state{queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ disk_alarm = DiskAlarm}) ->
+ DesiredDurationAvg1 = desired_duration_average(State),
+ State1 = State#state{desired_duration = DesiredDurationAvg1},
+ maybe_inform_queues(
+ DiskAlarm, DesiredDurationAvg, DesiredDurationAvg1, Durations),
+ State1.
+
+desired_duration_average(#state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ disk_alarm = DiskAlarm}) ->
{ok, LimitThreshold} =
application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
MemoryLimit = vm_memory_monitor:get_memory_limit(),
@@ -203,46 +224,61 @@ internal_update(State = #state { queue_durations = Durations,
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
end,
- DesiredDurationAvg1 =
- if MemoryRatio =:= infinity ->
- 0.0;
- MemoryRatio < LimitThreshold orelse Count == 0 ->
- infinity;
- MemoryRatio < ?SUM_INC_THRESHOLD ->
- ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
- true ->
- (Sum / Count) / MemoryRatio
- end,
- State1 = State #state { desired_duration = DesiredDurationAvg1 },
+ if DiskAlarm ->
+ infinity;
+ MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < LimitThreshold orelse Count == 0 ->
+ infinity;
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
+ end.
+maybe_inform_queues(false, DesiredDurationAvg, DesiredDurationAvg1,
+ Durations) ->
%% only inform queues immediately if the desired duration has
%% decreased
case DesiredDurationAvg1 == infinity orelse
(DesiredDurationAvg /= infinity andalso
DesiredDurationAvg1 >= DesiredDurationAvg) of
- true ->
- ok;
- false ->
- true =
- ets:foldl(
- fun (Proc = #process { reported = QueueDuration,
- sent = PrevSendDuration,
- callback = {M, F, A} }, true) ->
- case should_send(QueueDuration, PrevSendDuration,
- DesiredDurationAvg1) of
- true -> ok = erlang:apply(
- M, F, A ++ [DesiredDurationAvg1]),
- ets:insert(
- Durations,
- Proc #process {
- sent = DesiredDurationAvg1});
- false -> true
- end
- end, true, Durations)
- end,
- State1.
+ true -> ok;
+ false -> inform_queues(DesiredDurationAvg1, Durations,
+ fun should_send/3)
+ end;
+maybe_inform_queues(true, DesiredDurationAvg, DesiredDurationAvg1,
+ Durations) ->
+ case DesiredDurationAvg1 == infinity andalso
+ DesiredDurationAvg /= infinity of
+ true -> inform_queues(DesiredDurationAvg1, Durations,
+ fun should_send_disk_alarm/3);
+ false -> ok
+ end.
+
+inform_queues(DesiredDurationAvg1, Durations, If) ->
+ true =
+ ets:foldl(
+ fun (Proc = #process{reported = QueueDuration,
+ sent = PrevSendDuration,
+ callback = {M, F, A}}, true) ->
+ case If(QueueDuration, PrevSendDuration,
+ DesiredDurationAvg1) of
+ true -> ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc#process{sent = DesiredDurationAvg1});
+ false -> true
+ end
+ end, true, Durations).
+
should_send(infinity, infinity, _) -> true;
should_send(infinity, D, DD) -> DD < D;
should_send(D, infinity, DD) -> DD < D;
should_send(D1, D2, DD) -> DD < lists:min([D1, D2]).
+
+should_send_disk_alarm(_, infinity, _) -> false;
+should_send_disk_alarm(_, _, infinity) -> true;
+should_send_disk_alarm(_, _, _) -> false.