summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl104
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_msg_store.erl1
3 files changed, 92 insertions, 14 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 5c1c5a83d4..634cf0165e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -31,10 +31,17 @@
-module(file_handle_cache).
+-behaviour(gen_server2).
+
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
append_write_buffer/1, copy/3]).
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -62,6 +69,7 @@
%%----------------------------------------------------------------------------
%% Specs
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -91,6 +99,10 @@
%%----------------------------------------------------------------------------
%% Public API
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
open(Path, Mode, Options) ->
case is_appender(Mode) of
@@ -137,14 +149,31 @@ close(Ref) ->
Handle ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl, global_key = GRef, is_dirty = IsDirty,
- is_read = IsReader, is_write = IsWriter }} ->
+ is_read = IsReader, is_write = IsWriter,
+ last_used_at = Then }} ->
case Hdl of
closed -> ok;
_ -> ok = case IsDirty of
true -> file:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl)
+ ok = file:close(Hdl),
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:delete(Then, Tree),
+ Oldest =
+ case gb_trees:is_empty(Tree1) of
+ true ->
+ undefined;
+ false ->
+ {Oldest1, _Ref} =
+ gb_trees:smallest(Tree1),
+ Oldest1
+ end,
+ gen_server2:cast(
+ ?SERVER, {self(), close, Oldest}),
+ Tree1
+ end)
end,
#file { reader_count = RCount, has_writer = HasWriter,
path = Path } = File = get({GRef, fhc_file}),
@@ -162,7 +191,7 @@ close(Ref) ->
end,
ok;
{Error, Handle1} ->
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Error
end
end.
@@ -185,7 +214,7 @@ read(Ref, Count) ->
end;
{Error, Handle2} -> {Error, Handle2}
end,
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -202,7 +231,7 @@ append(Ref, Data) ->
{{error, _} = Error, Handle2} ->
{Error, Handle2}
end,
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -225,7 +254,7 @@ sync(Ref) ->
end;
Error -> {Error, Handle}
end,
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -238,7 +267,7 @@ position(Ref, NewOffset) ->
{ok, Handle2} -> maybe_seek(NewOffset, Handle2);
{Error, Handle2} -> {Error, Handle2}
end,
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -265,7 +294,7 @@ truncate(Ref) ->
end;
{Error, Handle2} -> {Error, Handle2}
end,
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -295,7 +324,7 @@ append_write_buffer(Ref) ->
case get_or_reopen(Ref) of
{ok, Handle} ->
{Result, Handle1} = write_buffer(Handle),
- put({Ref, fhc_handle}, Handle1),
+ put_handle(Ref, Handle1),
Result;
Error -> Error
end.
@@ -328,8 +357,8 @@ copy(Src, Dest, Count) ->
end;
Error -> {Error, SHandle, DHandle}
end,
- put({Src, fhc_handle}, SHandle1),
- put({Dest, fhc_handle}, DHandle1),
+ put_handle(Src, SHandle1),
+ put_handle(Dest, DHandle1),
Result;
{ok, _} -> {error, destination_not_open_for_writing};
Error -> Error
@@ -350,9 +379,27 @@ get_or_reopen(Ref) ->
options = Options } ->
#file { path = Path } = get({GRef, fhc_file}),
open1(Path, Mode, Options, Ref, GRef);
- Handle -> {ok, Handle #handle { last_used_at = now() }}
+ Handle ->
+ {ok, Handle}
end.
+get_or_create_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+with_age_tree(Fun) ->
+ put(fhc_age_tree, Fun(get_or_create_age_tree())).
+
+put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
+ Now = now(),
+ with_age_tree(
+ fun (Tree) ->
+ gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree))
+ end),
+ put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
+
open1(Path, Mode, Options, Ref, GRef) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
@@ -362,15 +409,23 @@ open1(Path, Mode, Options, Ref, GRef) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
+ Now = now(),
Handle =
#handle { hdl = Hdl, offset = 0, trusted_offset = 0,
write_buffer_size = 0, options = Options,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [], at_eof = false, mode = Mode,
is_write = is_writer(Mode), is_read = is_reader(Mode),
- global_key = GRef, last_used_at = now(),
+ global_key = GRef, last_used_at = Now,
is_dirty = false },
put({Ref, fhc_handle}, Handle),
+ with_age_tree(fun (Tree) ->
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ gen_server2:cast(?SERVER,
+ {self(), open, Oldest}),
+ Tree1
+ end),
{ok, Handle};
{error, Reason} ->
{error, Reason}
@@ -453,3 +508,26 @@ needs_seek(true, CurOffset, DesiredOffset) %% same as {bof, DO}
%% because we can't really track size, we could well end up at EoF and not know
needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
{false, true}.
+
+%%----------------------------------------------------------------------------
+%% gen_server
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, state}.
+
+handle_call(_Msg, _From, State) ->
+ {reply, message_not_understood, State}.
+
+handle_cast(Msg, State) ->
+ io:format("~p~n", [Msg]),
+ {noreply, State}.
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2e6810bcca..405d170bea 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -142,6 +142,7 @@ start(normal, []) ->
check_empty_content_body_frame_size(),
ok = rabbit_alarm:start(),
+ ok = start_child(file_handle_cache),
{ok, MemoryWatermark} =
application:get_env(vm_memory_high_watermark),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2ddb18262e..e9f47d3669 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -43,7 +43,6 @@
-define(SERVER, ?MODULE).
--define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB