summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-19 16:43:17 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-19 16:43:17 +0000
commit847ca84878fe7c61a04601f6119220134c959b71 (patch)
tree6a302514aa896953319834ca8865766face8d30d
parentb0f200b3d34872eac03121cde58073e1aeec63c8 (diff)
downloadrabbitmq-server-git-847ca84878fe7c61a04601f6119220134c959b71.tar.gz
Finished the file handle cache. It works as follows:
1) Every client keeps a gb_tree of timestamp-when-fd-was-last-used => fd_ref. This is updated for each action. 2) When a client opens a file or closes a file, it sends a suitable msg to the server, including the smallest timestamp-when-fd-was-last-used (i.e. least recently used fd) 3) The server counts how many fds have been used 4) When too many fds have been used, it finds the average age of the least-recently-used-fds and tells all clients to close anything older than that 5) This is likely to have no effect, because the clients may have since used the fds, thus the ages will be wrong. Regardless of whether any fds have been closed at this point, all the clients send back to the server their current smallest timestamp-when-fd-was-last-used 6) 2 seconds later, the server checks to see if the situation has improved, and if not, using the now updated information (thus the average age will be lower) may choose to further ask all clients to kill off fhs. This will repeat, albeit not that fast until enough fds have been closed.
-rw-r--r--src/file_handle_cache.erl267
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_msg_store.erl6
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/vm_memory_monitor.erl12
6 files changed, 227 insertions, 74 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 634cf0165e..53ed95d4e7 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -35,12 +35,18 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
- append_write_buffer/1, copy/3]).
+ append_write_buffer/1, copy/3, set_maximum_since_use/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([decrement/0, increment/0]).
+
-define(SERVER, ?MODULE).
+-define(RESERVED_FOR_OTHERS, 50).
+-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
+-define(FILE_HANDLES_LIMIT_OTHER, 1024).
+-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
%%----------------------------------------------------------------------------
@@ -67,6 +73,12 @@
last_used_at
}).
+-record(fhc_state,
+ { elders,
+ limit,
+ count
+ }).
+
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -94,6 +106,7 @@
-spec(append_write_buffer/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
({'ok', integer()} | error())).
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-endif.
@@ -128,7 +141,7 @@ open(Path, Mode, Options) ->
reader_count = RCount1,
has_writer = HasWriter orelse IsWriter }),
Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, GRef) of
+ case open1(Path1, Mode1, Options, Ref, GRef, bof) of
{ok, _Handle} -> {ok, Ref};
Error -> Error
end
@@ -146,54 +159,7 @@ open(Path, Mode, Options) ->
close(Ref) ->
case erase({Ref, fhc_handle}) of
undefined -> ok;
- Handle ->
- case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl, global_key = GRef, is_dirty = IsDirty,
- is_read = IsReader, is_write = IsWriter,
- last_used_at = Then }} ->
- case Hdl of
- closed -> ok;
- _ -> ok = case IsDirty of
- true -> file:sync(Hdl);
- false -> ok
- end,
- ok = file:close(Hdl),
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:delete(Then, Tree),
- Oldest =
- case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} =
- gb_trees:smallest(Tree1),
- Oldest1
- end,
- gen_server2:cast(
- ?SERVER, {self(), close, Oldest}),
- Tree1
- end)
- end,
- #file { reader_count = RCount, has_writer = HasWriter,
- path = Path } = File = get({GRef, fhc_file}),
- RCount1 = case IsReader of
- true -> RCount - 1;
- false -> RCount
- end,
- HasWriter1 = HasWriter andalso not IsWriter,
- case RCount1 =:= 0 andalso not HasWriter1 of
- true -> erase({GRef, fhc_file}),
- erase({Path, fhc_path});
- false -> put({GRef, fhc_file},
- File #file { reader_count = RCount1,
- has_writer = HasWriter1 })
- end,
- ok;
- {Error, Handle1} ->
- put_handle(Ref, Handle1),
- Error
- end
+ Handle -> close1(Ref, Handle, hard)
end.
read(Ref, Count) ->
@@ -367,18 +333,54 @@ copy(Src, Dest, Count) ->
Error -> Error
end.
+set_maximum_since_use(MaximumAge) ->
+ Now = now(),
+ lists:foreach(
+ fun ({{Ref, fhc_handle}, Handle =
+ #handle { hdl = Hdl, last_used_at = Then }}) ->
+ Age = timer:now_diff(Now, Then),
+ case Hdl /= closed andalso Age >= MaximumAge of
+ true ->
+ case close1(Ref, Handle, soft) of
+ {ok, Handle1} ->
+ put({Ref, fhc_handle}, Handle1);
+ _ -> ok
+ end;
+ false -> ok
+ end;
+ (_KeyValuePair) -> ok
+ end, get()),
+ report_eldest().
+
+decrement() ->
+ gen_server2:cast(?SERVER, decrement).
+
+increment() ->
+ gen_server2:cast(?SERVER, increment).
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
+report_eldest() ->
+ with_age_tree(
+ fun (Tree) ->
+ case gb_trees:is_empty(Tree) of
+ true -> Tree;
+ false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ gen_server2:cast(?SERVER, {self(), update, Oldest})
+ end,
+ Tree
+ end),
+ ok.
+
get_or_reopen(Ref) ->
case get({Ref, fhc_handle}) of
undefined -> {error, not_open, Ref};
#handle { hdl = closed, mode = Mode, global_key = GRef,
- options = Options } ->
+ options = Options, offset = Offset } ->
#file { path = Path } = get({GRef, fhc_file}),
- open1(Path, Mode, Options, Ref, GRef);
+ open1(Path, Mode, Options, Ref, GRef, Offset);
Handle ->
{ok, Handle}
end.
@@ -395,12 +397,10 @@ with_age_tree(Fun) ->
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
Now = now(),
with_age_tree(
- fun (Tree) ->
- gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree))
- end),
+ fun (Tree) -> gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) end),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-open1(Path, Mode, Options, Ref, GRef) ->
+open1(Path, Mode, Options, Ref, GRef, Offset) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
WriteBufferSize =
@@ -411,14 +411,15 @@ open1(Path, Mode, Options, Ref, GRef) ->
end,
Now = now(),
Handle =
- #handle { hdl = Hdl, offset = 0, trusted_offset = 0,
+ #handle { hdl = Hdl, offset = 0, trusted_offset = Offset,
write_buffer_size = 0, options = Options,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [], at_eof = false, mode = Mode,
is_write = is_writer(Mode), is_read = is_reader(Mode),
global_key = GRef, last_used_at = Now,
is_dirty = false },
- put({Ref, fhc_handle}, Handle),
+ {{ok, _Offset}, Handle1} = maybe_seek(Offset, Handle),
+ put({Ref, fhc_handle}, Handle1),
with_age_tree(fun (Tree) ->
Tree1 = gb_trees:insert(Now, Ref, Tree),
{Oldest, _Ref} = gb_trees:smallest(Tree1),
@@ -426,11 +427,64 @@ open1(Path, Mode, Options, Ref, GRef) ->
{self(), open, Oldest}),
Tree1
end),
- {ok, Handle};
+ {ok, Handle1};
{error, Reason} ->
{error, Reason}
end.
+close1(Ref, Handle, SoftOrHard) ->
+ case write_buffer(Handle) of
+ {ok, #handle { hdl = Hdl, global_key = GRef, is_dirty = IsDirty,
+ is_read = IsReader, is_write = IsWriter,
+ last_used_at = Then } = Handle1 } ->
+ case Hdl of
+ closed -> ok;
+ _ -> ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ ok = file:close(Hdl),
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:delete(Then, Tree),
+ Oldest =
+ case gb_trees:is_empty(Tree1) of
+ true -> undefined;
+ false ->
+ {Oldest1, _Ref} =
+ gb_trees:smallest(Tree1),
+ Oldest1
+ end,
+ gen_server2:cast(
+ ?SERVER, {self(), close, Oldest}),
+ Tree1
+ end)
+ end,
+ case SoftOrHard of
+ hard ->
+ #file { reader_count = RCount, has_writer = HasWriter,
+ path = Path } = File = get({GRef, fhc_file}),
+ RCount1 = case IsReader of
+ true -> RCount - 1;
+ false -> RCount
+ end,
+ HasWriter1 = HasWriter andalso not IsWriter,
+ case RCount1 =:= 0 andalso not HasWriter1 of
+ true -> erase({GRef, fhc_file}),
+ erase({Path, fhc_path});
+ false -> put({GRef, fhc_file},
+ File #file { reader_count = RCount1,
+ has_writer = HasWriter1 })
+ end,
+ ok;
+ soft ->
+ {ok, Handle1 #handle { hdl = closed }}
+ end;
+ {Error, Handle1} ->
+ put_handle(Ref, Handle1),
+ Error
+ end.
+
maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF,
offset = Offset }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
@@ -514,14 +568,46 @@ needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, state}.
+ Limit = case application:get_env(file_handles_high_watermark) of
+ {ok, Watermark}
+ when is_integer(Watermark) andalso Watermark > 0 -> Watermark;
+ _ -> ulimit()
+ end,
+ rabbit_log:info("Limiting to approx ~p file handles~n", [Limit]),
+ {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0}}.
handle_call(_Msg, _From, State) ->
{reply, message_not_understood, State}.
-handle_cast(Msg, State) ->
- io:format("~p~n", [Msg]),
- {noreply, State}.
+handle_cast({Pid, open, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders, count = Count }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ {noreply, maybe_reduce(State #fhc_state { elders = Elders1,
+ count = Count + 1 })};
+
+handle_cast({Pid, update, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ %% don't call maybe_reduce from here otherwise we can create a
+ %% storm of messages
+ {noreply, State #fhc_state { elders = Elders1 }};
+
+handle_cast({Pid, close, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders, count = Count }) ->
+ Elders1 = case EldestUnusedSince of
+ undefined -> dict:erase(Pid, Elders);
+ _ -> dict:store(Pid, EldestUnusedSince, Elders)
+ end,
+ {noreply, State #fhc_state { elders = Elders1, count = Count - 1 }};
+
+handle_cast(increment, State = #fhc_state { count = Count }) ->
+ {noreply, maybe_reduce(State #fhc_state { count = Count + 1 })};
+
+handle_cast(decrement, State = #fhc_state { count = Count }) ->
+ {noreply, State #fhc_state { count = Count - 1 }};
+
+handle_cast(check_counts, State) ->
+ {noreply, maybe_reduce(State)}.
handle_info(_Msg, State) ->
{noreply, State}.
@@ -531,3 +617,58 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%%----------------------------------------------------------------------------
+%% server helpers
+%%----------------------------------------------------------------------------
+
+maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
+ elders = Elders })
+ when Limit /= infinity andalso Count >= Limit ->
+ Now = now(),
+ {Pids, Sum, ClientCount} =
+ dict:fold(fun (_Pid, undefined, Accs) ->
+ Accs;
+ (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
+ {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end, {[], 0, 0}, Elders),
+ %% ClientCount can't be 0.
+ AverageAge = Sum / ClientCount,
+ lists:foreach(fun (Pid) ->
+ Pid ! {?MODULE, maximum_eldest_since_use, AverageAge}
+ end, Pids),
+ {ok, _TRef} = timer:apply_after(?FILE_HANDLES_CHECK_INTERVAL, gen_server2,
+ cast, [?SERVER, check_counts]),
+ State;
+maybe_reduce(State) ->
+ State.
+
+%% Googling around suggests that Windows has a limit somewhere around 16M.
+%% eg http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
+%% For everything else, assume ulimit exists. Further googling suggests that
+%% BSDs (incl OS X), solaris and linux all agree that ulimit -n is file handles
+ulimit() ->
+ try
+ %% under Linux, Solaris and FreeBSD, ulimit is a shell
+ %% builtin, not a command. In OS X, it's a command, but it's
+ %% still safe to call it this way:
+ case rabbit_misc:cmd("sh -c \"ulimit -n\"") of
+ "unlimited" -> infinity;
+ String = [C|_] when $0 =< C andalso C =< $9 ->
+ Num = list_to_integer(
+ lists:takewhile(fun (D) -> $0 =< D andalso D =< $9 end,
+ String)) - ?RESERVED_FOR_OTHERS,
+ lists:max([1, Num]);
+ String ->
+ rabbit_log:warning("Unexpected result of \"ulimit -n\": ~p~n",
+ [String]),
+ throw({unexpected_result, String})
+ end
+ catch
+ throw:_ ->
+ case os:type() of
+ {win32, _OsName} -> ?FILE_HANDLES_LIMIT_WINDOWS;
+ _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ end
+ end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bb951b4073..183bf26d41 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -914,6 +914,10 @@ handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
State#q{variable_queue_state =
rabbit_variable_queue:tx_commit_from_vq(VQS)}));
+handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce6d..391efb1df9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,7 +55,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, cmd/1]).
-import(mnesia).
-import(lists).
@@ -126,6 +126,7 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(cmd/1 :: (string()) -> string()).
-endif.
@@ -489,3 +490,10 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e9f47d3669..3d38f72198 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -414,7 +414,11 @@ handle_cast(sync, State) ->
noreply(sync(State)).
handle_info(timeout, State) ->
- noreply(sync(State)).
+ noreply(sync(State));
+
+handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State).
terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
file_summary = FileSummary,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e21485b517..bf57ca5f6a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -209,6 +209,7 @@ start_connection(Parent, Deb, ClientSock) ->
{PeerAddressS, PeerPort} = peername(ClientSock),
ProfilingValue = setup_profiling(),
try
+ file_handle_cache:increment(),
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
@@ -232,6 +233,7 @@ start_connection(Parent, Deb, ClientSock) ->
end)("exception on TCP connection ~p from ~s:~p~n~p~n",
[self(), PeerAddressS, PeerPort, Ex])
after
+ file_handle_cache:decrement(),
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 65d4a45103..9eee0c0b8f 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -223,19 +223,13 @@ get_mem_limit(MemFraction, TotalMemory) ->
%%----------------------------------------------------------------------------
%% Internal Helpers
%%----------------------------------------------------------------------------
-cmd(Command) ->
- Exec = hd(string:tokens(Command, " ")),
- case os:find_executable(Exec) of
- false -> throw({command_not_found, Exec});
- _ -> os:cmd(Command)
- end.
%% get_total_memory(OS) -> Total
%% Windows and Freebsd code based on: memsup:get_memory_usage/1
%% Original code was part of OTP and released under "Erlang Public License".
get_total_memory({unix,darwin}) ->
- File = cmd("/usr/bin/vm_stat"),
+ File = rabbit_misc:cmd("/usr/bin/vm_stat"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)),
[PageSize, Inactive, Active, Free, Wired] =
@@ -263,7 +257,7 @@ get_total_memory({unix, linux}) ->
dict:fetch('MemTotal', Dict);
get_total_memory({unix, sunos}) ->
- File = cmd("/usr/sbin/prtconf"),
+ File = rabbit_misc:cmd("/usr/sbin/prtconf"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
dict:fetch('Memory size', Dict);
@@ -314,7 +308,7 @@ parse_line_sunos(Line) ->
end.
freebsd_sysctl(Def) ->
- list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
+ list_to_integer(rabbit_misc:cmd("/sbin/sysctl -n " ++ Def) -- "\n").
%% file:read_file does not work on files in /proc as it seems to get
%% the size of the file first and then read that many bytes. But files