diff options
| author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-07 13:58:23 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-07 17:17:27 +0100 |
| commit | 7e1f0b0213560837c46f829b2b1cbdeb89f7d46a (patch) | |
| tree | 60481047d016c99e3d3df97c4646b3561597d04b /src | |
| parent | df7a84271155ecc3765c066cf35e3dccc786e31c (diff) | |
| download | rabbitmq-server-git-7e1f0b0213560837c46f829b2b1cbdeb89f7d46a.tar.gz | |
Performance improvements on file_handle_cache
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 19 |
3 files changed, 73 insertions, 61 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d5f0cbee6f..78b0095036 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -145,7 +145,8 @@ -export([register_callback/3]). -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, - copy/3, set_maximum_since_use/1, delete/1, clear/1]). + copy/3, set_maximum_since_use/1, delete/1, clear/1, + open_with_absolute_path/3]). -export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, info/0, info/1, clear_read_cache/0]). @@ -249,6 +250,11 @@ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> val_or_error(ref())). +-spec(open_with_absolute_path/3 :: + (file:filename(), [any()], + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | + {'read_buffer', (non_neg_integer() | 'unbuffered')}]) + -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> val_or_error([char()] | binary()) | 'eof'). @@ -300,9 +306,11 @@ register_callback(M, F, A) gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}). open(Path, Mode, Options) -> - Path1 = filename:absname(Path), + open_with_absolute_path(filename:absname(Path), Mode, Options). + +open_with_absolute_path(Path, Mode, Options) -> File1 = #file { reader_count = RCount, has_writer = HasWriter } = - case get({Path1, fhc_file}) of + case get({Path, fhc_file}) of File = #file {} -> File; undefined -> #file { reader_count = 0, has_writer = false } @@ -311,7 +319,7 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + false -> {ok, Ref} = new_closed_handle(Path, Mode1, Options), case get_or_reopen([{Ref, new}]) of {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of @@ -319,7 +327,7 @@ open(Path, Mode, Options) -> false -> RCount end, HasWriter1 = HasWriter orelse IsWriter, - put({Path1, fhc_file}, + put({Path, fhc_file}, File1 #file { reader_count = RCount1, has_writer = HasWriter1 }), {ok, Ref}; @@ -375,7 +383,7 @@ read(Ref, Count) -> offset = Offset} = tune_read_buffer_limit(Handle0, Count), WantedCount = Count - BufRem, - case prim_file_read(Hdl, lists:max([BufSz, WantedCount])) of + case prim_file_read(Hdl, max(BufSz, WantedCount)) of {ok, Data} -> <<_:BufPos/binary, BufTl/binary>> = Buf, ReadCount = size(Data), @@ -1297,11 +1305,6 @@ pending_out({N, Queue}) -> pending_count({Count, _Queue}) -> Count. -pending_is_empty({0, _Queue}) -> - true; -pending_is_empty({_N, _Queue}) -> - false. - %%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- @@ -1348,17 +1351,24 @@ process_open(State = #fhc_state { limit = Limit, {Pending1, State1} = process_pending(Pending, Limit - used(State), State), State1 #fhc_state { open_pending = Pending1 }. -process_obtain(Type, State = #fhc_state { limit = Limit, - obtain_limit = ObtainLimit }) -> - ObtainCount = obtain_state(Type, count, State), - Pending = obtain_state(Type, pending, State), - Quota = case Type of - file -> Limit - (used(State)); - socket -> lists:min([ObtainLimit - ObtainCount, - Limit - (used(State))]) - end, +process_obtain(socket, State = #fhc_state { limit = Limit, + obtain_limit = ObtainLimit, + open_count = OpenCount, + obtain_count_socket = ObtainCount, + obtain_pending_socket = Pending, + obtain_count_file = ObtainCountF}) -> + Quota = min(ObtainLimit - ObtainCount, + Limit - (OpenCount + ObtainCount + ObtainCountF)), {Pending1, State1} = process_pending(Pending, Quota, State), - set_obtain_state(Type, pending, Pending1, State1). + State1#fhc_state{obtain_pending_socket = Pending1}; +process_obtain(file, State = #fhc_state { limit = Limit, + open_count = OpenCount, + obtain_count_socket = ObtainCountS, + obtain_count_file = ObtainCountF, + obtain_pending_file = Pending}) -> + Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF), + {Pending1, State1} = process_pending(Pending, Quota, State), + State1#fhc_state{obtain_pending_file = Pending1}. process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, State}; @@ -1383,26 +1393,21 @@ run_pending_item(#pending { kind = Kind, true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), update_counts(Kind, Pid, Requested, State). -update_counts(Kind, Pid, Delta, +update_counts(open, Pid, Delta, State = #fhc_state { open_count = OpenCount, - obtain_count_file = ObtainCountF, - obtain_count_socket = ObtainCountS, clients = Clients }) -> - {OpenDelta, ObtainDeltaF, ObtainDeltaS} = - update_counts1(Kind, Pid, Delta, Clients), - State #fhc_state { open_count = OpenCount + OpenDelta, - obtain_count_file = ObtainCountF + ObtainDeltaF, - obtain_count_socket = ObtainCountS + ObtainDeltaS }. - -update_counts1(open, Pid, Delta, Clients) -> ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), - {Delta, 0, 0}; -update_counts1({obtain, file}, Pid, Delta, Clients) -> + State #fhc_state { open_count = OpenCount + Delta}; +update_counts({obtain, file}, Pid, Delta, + State = #fhc_state {obtain_count_file = ObtainCountF, + clients = Clients }) -> ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}), - {0, Delta, 0}; -update_counts1({obtain, socket}, Pid, Delta, Clients) -> + State #fhc_state { obtain_count_file = ObtainCountF + Delta}; +update_counts({obtain, socket}, Pid, Delta, + State = #fhc_state {obtain_count_socket = ObtainCountS, + clients = Clients }) -> ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}), - {0, 0, Delta}. + State #fhc_state { obtain_count_socket = ObtainCountS + Delta}. maybe_reduce(State) -> case needs_reduce(State) of @@ -1410,18 +1415,20 @@ maybe_reduce(State) -> false -> State end. -needs_reduce(State = #fhc_state { limit = Limit, - open_pending = OpenPending, - obtain_limit = ObtainLimit, - obtain_count_socket = ObtainCountS, - obtain_pending_file = ObtainPendingF, - obtain_pending_socket = ObtainPendingS }) -> +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = {OpenPending, _}, + obtain_limit = ObtainLimit, + obtain_count_socket = ObtainCountS, + obtain_count_file = ObtainCountF, + obtain_pending_file = {ObtainPendingF, _}, + obtain_pending_socket = {ObtainPendingS, _} }) -> Limit =/= infinity - andalso ((used(State) > Limit) - orelse (not pending_is_empty(OpenPending)) - orelse (not pending_is_empty(ObtainPendingF)) + andalso (((OpenCount + ObtainCountS + ObtainCountF) > Limit) + orelse (OpenPending =/= 0) + orelse (ObtainPendingF =/= 0) orelse (ObtainCountS < ObtainLimit - andalso not pending_is_empty(ObtainPendingS))). + andalso (ObtainPendingS =/= 0))). reduce(State = #fhc_state { open_pending = OpenPending, obtain_pending_file = ObtainPendingFile, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 6754c606bb..2300664687 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1360,9 +1360,10 @@ should_mask_action(CRef, MsgId, %%---------------------------------------------------------------------------- open_file(Dir, FileName, Mode) -> - file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, - {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + file_handle_cache:open_with_absolute_path( + form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, + {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; @@ -2112,10 +2113,11 @@ transform_dir(BaseDir, Store, TransformFun) -> transform_msg_file(FileOld, FileNew, TransformFun) -> ok = rabbit_file:ensure_parent_dirs_exist(FileNew), - {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), - {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], - [{write_buffer, - ?HANDLE_CACHE_BUFFER_SIZE}]), + {ok, RefOld} = file_handle_cache:open_with_absolute_path( + FileOld, [raw, binary, read], []), + {ok, RefNew} = file_handle_cache:open_with_absolute_path( + FileNew, [raw, binary, write], + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]), {ok, _Acc, _IgnoreSize} = rabbit_msg_file:scan( RefOld, filelib:file_size(FileOld), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 981d8e74ff..06b6961edb 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -816,8 +816,9 @@ append_journal_to_segment(#segment { journal_entries = JEntries, _ -> file_handle_cache_stats:update(queue_index_write), - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, - [{write_buffer, infinity}]), + {ok, Hdl} = file_handle_cache:open_with_absolute_path( + Path, ?WRITE_MODE, + [{write_buffer, infinity}]), %% the file_handle_cache also does a list reverse, so this %% might not be required here, but before we were doing a %% sparse_foldr, a lists:reverse/1 seems to be the correct @@ -832,8 +833,8 @@ get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), ok = rabbit_file:ensure_dir(Path), - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, - [{write_buffer, infinity}]), + {ok, Hdl} = file_handle_cache:open_with_absolute_path( + Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; get_journal_handle(State = #qistate { journal_handle = Hdl }) -> {Hdl, State}. @@ -1058,7 +1059,8 @@ load_segment(KeepAcked, #segment { path = Path }) -> false -> Empty; true -> Size = rabbit_file:file_size(Path), file_handle_cache_stats:update(queue_index_read), - {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), + {ok, Hdl} = file_handle_cache:open_with_absolute_path( + Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), {ok, SegBin} = file_handle_cache:read(Hdl, Size), ok = file_handle_cache:close(Hdl), @@ -1383,10 +1385,11 @@ transform_file(Path, Fun) when is_function(Fun)-> case rabbit_file:file_size(Path) of 0 -> ok; Size -> {ok, PathTmpHdl} = - file_handle_cache:open(PathTmp, ?WRITE_MODE, - [{write_buffer, infinity}]), + file_handle_cache:open_with_absolute_path( + PathTmp, ?WRITE_MODE, + [{write_buffer, infinity}]), - {ok, PathHdl} = file_handle_cache:open( + {ok, PathHdl} = file_handle_cache:open_with_absolute_path( Path, ?READ_MODE, [{read_buffer, Size}]), {ok, Content} = file_handle_cache:read(PathHdl, Size), ok = file_handle_cache:close(PathHdl), |
