diff options
| -rw-r--r-- | src/file_handle_cache.erl | 104 | ||||
| -rw-r--r-- | src/rabbit.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 1 |
3 files changed, 92 insertions, 14 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 5c1c5a83d4..634cf0165e 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -31,10 +31,17 @@ -module(file_handle_cache). +-behaviour(gen_server2). + -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, append_write_buffer/1, copy/3]). +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + %%---------------------------------------------------------------------------- -record(file, @@ -62,6 +69,7 @@ %%---------------------------------------------------------------------------- %% Specs +%%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -91,6 +99,10 @@ %%---------------------------------------------------------------------------- %% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). open(Path, Mode, Options) -> case is_appender(Mode) of @@ -137,14 +149,31 @@ close(Ref) -> Handle -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, global_key = GRef, is_dirty = IsDirty, - is_read = IsReader, is_write = IsWriter }} -> + is_read = IsReader, is_write = IsWriter, + last_used_at = Then }} -> case Hdl of closed -> ok; _ -> ok = case IsDirty of true -> file:sync(Hdl); false -> ok end, - ok = file:close(Hdl) + ok = file:close(Hdl), + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete(Then, Tree), + Oldest = + case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = + gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server2:cast( + ?SERVER, {self(), close, Oldest}), + Tree1 + end) end, #file { reader_count = RCount, has_writer = HasWriter, path = Path } = File = get({GRef, fhc_file}), @@ -162,7 +191,7 @@ close(Ref) -> end, ok; {Error, Handle1} -> - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Error end end. @@ -185,7 +214,7 @@ read(Ref, Count) -> end; {Error, Handle2} -> {Error, Handle2} end, - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -202,7 +231,7 @@ append(Ref, Data) -> {{error, _} = Error, Handle2} -> {Error, Handle2} end, - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -225,7 +254,7 @@ sync(Ref) -> end; Error -> {Error, Handle} end, - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -238,7 +267,7 @@ position(Ref, NewOffset) -> {ok, Handle2} -> maybe_seek(NewOffset, Handle2); {Error, Handle2} -> {Error, Handle2} end, - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -265,7 +294,7 @@ truncate(Ref) -> end; {Error, Handle2} -> {Error, Handle2} end, - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -295,7 +324,7 @@ append_write_buffer(Ref) -> case get_or_reopen(Ref) of {ok, Handle} -> {Result, Handle1} = write_buffer(Handle), - put({Ref, fhc_handle}, Handle1), + put_handle(Ref, Handle1), Result; Error -> Error end. @@ -328,8 +357,8 @@ copy(Src, Dest, Count) -> end; Error -> {Error, SHandle, DHandle} end, - put({Src, fhc_handle}, SHandle1), - put({Dest, fhc_handle}, DHandle1), + put_handle(Src, SHandle1), + put_handle(Dest, DHandle1), Result; {ok, _} -> {error, destination_not_open_for_writing}; Error -> Error @@ -350,9 +379,27 @@ get_or_reopen(Ref) -> options = Options } -> #file { path = Path } = get({GRef, fhc_file}), open1(Path, Mode, Options, Ref, GRef); - Handle -> {ok, Handle #handle { last_used_at = now() }} + Handle -> + {ok, Handle} end. +get_or_create_age_tree() -> + case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end. + +with_age_tree(Fun) -> + put(fhc_age_tree, Fun(get_or_create_age_tree())). + +put_handle(Ref, Handle = #handle { last_used_at = Then }) -> + Now = now(), + with_age_tree( + fun (Tree) -> + gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) + end), + put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). + open1(Path, Mode, Options, Ref, GRef) -> case file:open(Path, Mode) of {ok, Hdl} -> @@ -362,15 +409,23 @@ open1(Path, Mode, Options, Ref, GRef) -> infinity -> infinity; N when is_integer(N) -> N end, + Now = now(), Handle = #handle { hdl = Hdl, offset = 0, trusted_offset = 0, write_buffer_size = 0, options = Options, write_buffer_size_limit = WriteBufferSize, write_buffer = [], at_eof = false, mode = Mode, is_write = is_writer(Mode), is_read = is_reader(Mode), - global_key = GRef, last_used_at = now(), + global_key = GRef, last_used_at = Now, is_dirty = false }, put({Ref, fhc_handle}, Handle), + with_age_tree(fun (Tree) -> + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + gen_server2:cast(?SERVER, + {self(), open, Oldest}), + Tree1 + end), {ok, Handle}; {error, Reason} -> {error, Reason} @@ -453,3 +508,26 @@ needs_seek(true, CurOffset, DesiredOffset) %% same as {bof, DO} %% because we can't really track size, we could well end up at EoF and not know needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> {false, true}. + +%%---------------------------------------------------------------------------- +%% gen_server +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, state}. + +handle_call(_Msg, _From, State) -> + {reply, message_not_understood, State}. + +handle_cast(Msg, State) -> + io:format("~p~n", [Msg]), + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 2e6810bcca..405d170bea 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -142,6 +142,7 @@ start(normal, []) -> check_empty_content_body_frame_size(), ok = rabbit_alarm:start(), + ok = start_child(file_handle_cache), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2ddb18262e..e9f47d3669 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -43,7 +43,6 @@ -define(SERVER, ?MODULE). --define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB |
