diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-03 16:58:17 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-03 16:58:17 +0000 |
| commit | 3d8a846b69581cb4372903b9dd27361077fb8ad5 (patch) | |
| tree | f2546865e82a1b78840bbdeba37ed9f037aa114d /src | |
| parent | b10788b74928824690cddee5cfc8da930c7282ef (diff) | |
| download | rabbitmq-server-git-3d8a846b69581cb4372903b9dd27361077fb8ad5.tar.gz | |
Fixed a few bugs in fhc, pushed fhc through to msg_store and msg_file. API change to fhc:position to match file, also extended fhc with copy. msg_store must also trap exits so that it will shut down cleanly - especially important given that data to be written is now cached more aggressively. Removal of stop from msg_store as it's part of a supervisor and so the correct way to stop it is via the supervisor.
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_file_handle_cache.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 226 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 4 |
5 files changed, 180 insertions, 267 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe4e90774d..38aa482040 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -32,7 +32,7 @@ -module(file_handle_cache). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - last_sync_offset/1, append_write_buffer/1]). + last_sync_offset/1, current_offset/1, append_write_buffer/1, copy/3]). %%---------------------------------------------------------------------------- @@ -76,10 +76,14 @@ ({'ok', ([char()]|binary())} | eof | error())). -spec(append/2 :: (ref(), iodata()) -> ok_or_error()). -spec(sync/1 :: (ref()) -> ok_or_error()). --spec(position/2 :: (ref(), position()) -> ok_or_error()). +-spec(position/2 :: (ref(), position()) -> + ({'ok', non_neg_integer()} | error())). -spec(truncate/1 :: (ref()) -> ok_or_error()). -spec(last_sync_offset/1 :: (ref()) -> ({'ok', integer()} | error())). --spec(append_write_buffer/1 :: (ref()) -> ok_or_error()). +-spec(current_offset/1 :: (ref()) -> ({'ok', integer()} | error())). +-spec(append_write_buffer/1 :: (ref()) -> ok_or_error()). +-spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> + ({'ok', integer()} | error())). -endif. @@ -191,7 +195,7 @@ append(Ref, Data) -> {ok, Handle} -> {Result, Handle1} = case maybe_seek(eof, Handle) of - {ok, Handle2 = #handle { at_eof = true }} -> + {{ok, _Offset}, Handle2 = #handle { at_eof = true }} -> write_to_buffer(Data, Handle2); {{error, _} = Error, Handle2} -> {Error, Handle2} @@ -266,8 +270,17 @@ truncate(Ref) -> last_sync_offset(Ref) -> case get_or_reopen(Ref) of - {ok, #handle { trusted_offset = TrustedOffset }} -> - {ok, TrustedOffset}; + {ok, #handle { trusted_offset = TrustedOffset }} -> {ok, TrustedOffset}; + Error -> Error + end. + +current_offset(Ref) -> + case get_or_reopen(Ref) of + {ok, #handle { at_eof = true, is_write = true, offset = Offset, + write_buffer_size = Size }} -> + {ok, Offset + Size}; + {ok, #handle { offset = Offset }} -> + {ok, Offset}; Error -> Error end. @@ -280,6 +293,45 @@ append_write_buffer(Ref) -> Error -> Error end. +copy(Src, Dest, Count) -> + case get_or_reopen(Src) of + {ok, SHandle = #handle { is_read = true }} -> + case get_or_reopen(Dest) of + {ok, DHandle = #handle { is_write = true }} -> + {Result, SHandle1, DHandle1} = + case write_buffer(SHandle) of + {ok, SHandle2 = #handle { hdl = SHdl, + offset = SOffset }} -> + case write_buffer(DHandle) of + {ok, + DHandle2 = #handle { hdl = DHdl, + offset = DOffset }} -> + Result1 = file:copy(SHdl, DHdl, Count), + case Result1 of + {ok, Count1} -> + {Result1, + SHandle2 #handle { + offset = SOffset + Count1 }, + DHandle2 #handle { + offset = DOffset + Count1 }}; + Error -> + {Error, SHandle2, DHandle2} + end; + Error -> {Error, SHandle2, DHandle} + end; + Error -> {Error, SHandle, DHandle} + end, + put({Src, fhc_handle}, SHandle1), + put({Dest, fhc_handle}, DHandle1), + Result; + {ok, _} -> {error, destination_not_open_for_writing}; + Error -> Error + end; + {ok, _} -> {error, source_not_open_for_reading}; + Error -> Error + end. + + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -326,13 +378,15 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF, end, case Result of {ok, Offset1} -> - {ok, Handle #handle { at_eof = AtEoF1, offset = Offset1 }}; + {Result, Handle #handle { at_eof = AtEoF1, offset = Offset1 }}; {error, _} = Error -> {Error, Handle} end. -write_to_buffer(Data, Handle = #handle { hdl = Hdl, +write_to_buffer(Data, Handle = #handle { hdl = Hdl, offset = Offset, write_buffer_size_limit = 0 }) -> - {file:write(Hdl, Data), Handle #handle { is_dirty = true }}; + Offset1 = Offset + iolist_size(Data), + {file:write(Hdl, Data), + Handle #handle { is_dirty = true, offset = Offset1 }}; write_to_buffer(Data, Handle = #handle { write_buffer = WriteBuffer, write_buffer_size = Size, diff --git a/src/rabbit_file_handle_cache.erl b/src/rabbit_file_handle_cache.erl deleted file mode 100644 index 85a5d6e942..0000000000 --- a/src/rabbit_file_handle_cache.erl +++ /dev/null @@ -1,128 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_file_handle_cache). - --export([init/2, close_all/1, close_file/2, with_file_handle_at/4]). - -%%---------------------------------------------------------------------------- - --include("rabbit.hrl"). - --record(hcstate, - { limit, %% how many file handles can we open? - handles, %% dict of the files to their handles, age and offset - ages, %% gb_tree of the files, keyed by age - mode %% the mode to open the files as - }). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(hcstate() :: #hcstate { limit :: non_neg_integer(), - handles :: dict(), - ages :: gb_tree(), - mode :: [file_open_mode()] - }). - --spec(init/2 :: (non_neg_integer(), [file_open_mode()]) -> hcstate()). --spec(close_all/1 :: (hcstate()) -> hcstate()). --spec(close_file/2 :: (file_path(), hcstate()) -> hcstate()). --spec(with_file_handle_at/4 :: (file_path(), non_neg_integer(), - fun ((io_device()) -> {non_neg_integer(), A}), - hcstate()) -> - {A, hcstate()}). --endif. - -%%---------------------------------------------------------------------------- - -init(Limit, OpenMode) -> - #hcstate { limit = Limit, - handles = dict:new(), - ages = gb_trees:empty(), - mode = OpenMode - }. - -close_all(State = #hcstate { handles = Handles }) -> - dict:fold(fun (_File, {Hdl, _Offset, _Then}, _Acc) -> - file:close(Hdl) - end, ok, Handles), - State #hcstate { handles = dict:new(), ages = gb_trees:empty() }. - -close_file(File, State = #hcstate { handles = Handles, - ages = Ages }) -> - case dict:find(File, Handles) of - error -> - State; - {ok, {Hdl, _Offset, Then}} -> - ok = file:close(Hdl), - State #hcstate { handles = dict:erase(File, Handles), - ages = gb_trees:delete(Then, Ages) - } - end. - -with_file_handle_at(File, Offset, Fun, State = #hcstate { handles = Handles, - ages = Ages, - limit = Limit, - mode = Mode }) -> - {FileHdl, OldOffset, Handles1, Ages1} = - case dict:find(File, Handles) of - error -> - {ok, Hdl} = file:open(File, Mode), - case dict:size(Handles) < Limit of - true -> - {Hdl, 0, Handles, Ages}; - false -> - {Then, OldFile, Ages2} = gb_trees:take_smallest(Ages), - {ok, {OldHdl, _Offset, Then}} = - dict:find(OldFile, Handles), - ok = file:close(OldHdl), - {Hdl, 0, dict:erase(OldFile, Handles), Ages2} - end; - {ok, {Hdl, OldOffset1, Then}} -> - {Hdl, OldOffset1, Handles, gb_trees:delete(Then, Ages)} - end, - SeekRes = case Offset == OldOffset of - true -> ok; - false -> case file:position(FileHdl, {bof, Offset}) of - {ok, Offset} -> ok; - KO -> KO - end - end, - {NewOffset, Result} = case SeekRes of - ok -> Fun(FileHdl); - KO1 -> {OldOffset, KO1} - end, - Now = now(), - Handles2 = dict:store(File, {FileHdl, NewOffset, Now}, Handles1), - Ages3 = gb_trees:enter(Now, File, Ages1), - {Result, State #hcstate { handles = Handles2, ages = Ages3 }}. diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 84dce90e8c..c08261591d 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -70,10 +70,11 @@ append(FileHdl, MsgId, MsgBody) MsgBodyBin = term_to_binary(MsgBody), MsgBodyBinSize = size(MsgBodyBin), Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES, - case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgId:?MSG_ID_SIZE_BYTES/binary, - MsgBodyBin:MsgBodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of + case file_handle_cache:append(FileHdl, + <<Size:?INTEGER_SIZE_BITS, + MsgId:?MSG_ID_SIZE_BYTES/binary, + MsgBodyBin:MsgBodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. @@ -81,7 +82,7 @@ append(FileHdl, MsgId, MsgBody) read(FileHdl, TotalSize) -> Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, - case file:read(FileHdl, TotalSize) of + case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, MsgId:?MSG_ID_SIZE_BYTES/binary, MsgBodyBin:BodyBinSize/binary, @@ -105,7 +106,7 @@ scan(FileHdl, Offset, Acc) -> end. read_next(FileHdl, Offset) -> - case file:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of + case file_handle_cache:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of %% Here we take option 5 from %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which %% we read the MsgId as a number, and then convert it back to @@ -116,11 +117,11 @@ read_next(FileHdl, Offset) -> _ -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, - case file:position( + case file_handle_cache:position( FileHdl, {cur, Size - ?MSG_ID_SIZE_BYTES}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, - case file:read(FileHdl, 1) of + case file_handle_cache:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> <<MsgId:?MSG_ID_SIZE_BYTES/binary>> = diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a492a0248d..89f13c6fd0 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1, - release/1, sync/2, stop/0]). + release/1, sync/2]). -export([sync/0]). %% internal @@ -46,6 +46,7 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds +-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB %%---------------------------------------------------------------------------- @@ -67,7 +68,6 @@ -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). --spec(stop/0 :: () -> 'ok'). -endif. @@ -79,12 +79,9 @@ file_summary, %% what's in the files? current_file, %% current file name as number current_file_handle, %% current file handle - current_offset, %% current offset within current file - current_dirty, %% has the current file been written to %% since the last fsync? file_size_limit, %% how big can our files get? - read_file_handle_cache, %% file handle cache for reading - last_sync_offset, %% current_offset at the last time we sync'd + file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer message_cache %% ets message cache @@ -241,7 +238,6 @@ contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -stop() -> gen_server2:call(?SERVER, stop, infinity). sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- @@ -249,15 +245,14 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> - + process_flag(trap_exit, true), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), MsgLocations = ets:new(?MSG_LOC_NAME, [set, private, {keypos, #msg_location.msg_id}]), InitFile = 0, - HandleCache = rabbit_file_handle_cache:init(?MAX_READ_FILE_HANDLES, - ?BINARY_MODE ++ [read]), FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private, {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), @@ -267,11 +262,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> file_summary = FileSummary, current_file = InitFile, current_file_handle = undefined, - current_offset = 0, - current_dirty = false, file_size_limit = ?FILE_SIZE_LIMIT, - read_file_handle_cache = HandleCache, - last_sync_offset = 0, + file_handle_cache = dict:new(), on_sync = [], sync_timer_ref = undefined, message_cache = MessageCache @@ -286,13 +278,14 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> %% There should be no more tmp files now, so go ahead and load the %% whole lot Files = [filename_to_num(FileName) || FileName <- FileNames], - State1 = #msstate { current_file = CurFile, current_offset = Offset } = + {Offset, State1 = #msstate { current_file = CurFile }} = build_index(Files, State), %% read is only needed so that we can seek {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile), - ?WRITE_MODE ++ [read]), - {ok, Offset} = file:position(FileHdl, Offset), + [read | ?WRITE_MODE]), + {ok, Offset} = file_handle_cache:position(FileHdl, Offset), + ok = file_handle_cache:truncate(FileHdl), {ok, State1 #msstate { current_file_handle = FileHdl }}. @@ -304,19 +297,16 @@ handle_call({contains, MsgId}, _From, State) -> reply(case index_lookup(MsgId, State) of not_found -> false; #msg_location {} -> true - end, State); - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. + end, State). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, - current_offset = CurOffset, file_summary = FileSummary }) -> case index_lookup(MsgId, State) of not_found -> %% New message, lots to do + {ok, CurOffset} = file_handle_cache:current_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, @@ -336,10 +326,7 @@ handle_cast({write, MsgId, Msg}, valid_total_size = ValidTotalSize1, contiguous_top = ContiguousTop1 }), NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate {current_offset = NextOffset, - current_dirty = true})); + noreply(maybe_roll_to_new_file(NextOffset, State)); StoreEntry = #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter ok = index_update(StoreEntry #msg_location { @@ -371,15 +358,11 @@ handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), noreply(State); -handle_cast({sync, _MsgIds, K}, - State = #msstate { current_dirty = false }) -> - K(), - noreply(State); - handle_cast({sync, MsgIds, K}, - State = #msstate { current_file = CurFile, - last_sync_offset = SyncOffset, - on_sync = Syncs }) -> + State = #msstate { current_file = CurFile, + current_file_handle = CurHdl, + on_sync = Syncs }) -> + {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), case lists:any(fun (MsgId) -> #msg_location { file = File, offset = Offset } = index_lookup(MsgId, State), @@ -398,22 +381,19 @@ handle_info(timeout, State) -> terminate(_Reason, State = #msstate { msg_locations = MsgLocations, file_summary = FileSummary, - current_file_handle = FileHdl, - read_file_handle_cache = HC }) -> + current_file_handle = FileHdl }) -> State1 = case FileHdl of undefined -> State; _ -> State2 = sync(State), - file:close(FileHdl), + file_handle_cache:close(FileHdl), State2 end, - HC1 = rabbit_file_handle_cache:close_all(HC), + State3 = close_all_handles(State1), ets:delete(MsgLocations), ets:delete(FileSummary), - State1 #msstate { msg_locations = undefined, - file_summary = undefined, - current_file_handle = undefined, - current_dirty = false, - read_file_handle_cache = HC1 }. + State3 #msstate { msg_locations = undefined, + file_summary = undefined, + current_file_handle = undefined }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -455,50 +435,28 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -open_file(Dir, FileName, Mode) -> - file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode). - sort_file_names(FileNames) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, FileNames). preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit), - ok = file:truncate(Hdl), - {ok, FinalPos} = file:position(Hdl, FinalPos), + {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), + ok = file_handle_cache:truncate(Hdl), + {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), ok. truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file:position(FileHdl, Lowpoint), - ok = file:truncate(FileHdl), + {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), + ok = file_handle_cache:truncate(FileHdl), ok = preallocate(FileHdl, Highpoint, Lowpoint). -sync(State = #msstate { current_dirty = false }) -> - State; sync(State = #msstate { current_file_handle = CurHdl, - current_offset = CurOffset, on_sync = Syncs }) -> State1 = stop_sync_timer(State), - ok = file:sync(CurHdl), + %% we depend on this really calling sync, even if [] == Syncs + ok = file_handle_cache:sync(CurHdl), lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { current_dirty = false, - last_sync_offset = CurOffset, - on_sync = [] }. - -with_read_handle_at(File, Offset, Fun, - State = #msstate { dir = Dir, - read_file_handle_cache = HC, - current_file = CurFile, - current_dirty = IsDirty, - last_sync_offset = SyncOffset }) -> - State1 = if CurFile == File andalso IsDirty andalso Offset >= SyncOffset -> - sync(State); - true -> State - end, - FilePath = form_filename(Dir, filenum_to_name(File)), - {Result, HC1} = - rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), - {Result, State1 #msstate { read_file_handle_cache = HC1 }}. + State1 #msstate { on_sync = [] }. remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> StoreEntry = #msg_location { ref_count = RefCount, file = File, @@ -524,7 +482,9 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> no_compact end. -internal_read_message(MsgId, State) -> +internal_read_message(MsgId, + State = #msstate { current_file = CurFile, + current_file_handle = CurHdl }) -> case index_lookup(MsgId, State) of not_found -> {not_found, State}; #msg_location { ref_count = RefCount, @@ -533,37 +493,70 @@ internal_read_message(MsgId, State) -> total_size = TotalSize } -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {{ok, {MsgId, Msg}}, State1} = - with_read_handle_at( - File, Offset, - fun(Hdl) -> - Res = case rabbit_msg_file:read( - Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - {ok, Rest} -> - throw({error, - {misread, - [{old_state, State}, + {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), + State1 = + case CurFile =:= File andalso Offset >= SyncOffset of + true -> sync(State); + false -> State + end, + {Hdl, State2} = get_read_handle(File, State1), + {ok, Offset} = file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, {read, Rest}]}}) - end, - {Offset + TotalSize, Res} - end, State), + end, ok = if RefCount > 1 -> - insert_into_cache(MsgId, Msg, State1); + insert_into_cache(MsgId, Msg, State2); true -> ok %% it's not in the cache and we %% only have one reference to the %% message. So don't bother %% putting it in the cache. end, - {{ok, Msg}, State1}; + {{ok, Msg}, State2}; {Msg, _RefCount} -> {{ok, Msg}, State} end end. +close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> + case dict:find(Key, FHC) of + {ok, Hdl} -> + ok = close_file(Hdl), + State #msstate { file_handle_cache = dict:erase(Key, FHC) }; + error -> State + end. + +close_all_handles(State = #msstate { file_handle_cache = FHC }) -> + ok = dict:fold(fun (_Key, Hdl, ok) -> + file_handle_cache:close(Hdl) + end, ok, FHC), + State #msstate { file_handle_cache = dict:new() }. + +get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) -> + case dict:find(FileNum, FHC) of + {ok, Hdl} -> {Hdl, State}; + error -> new_handle(FileNum, filenum_to_name(FileNum), + [read | ?BINARY_MODE], State) + end. + +new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, + dir = Dir }) -> + {ok, Hdl} = open_file(Dir, FileName, Mode), + {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}. + +open_file(Dir, FileName, Mode) -> + file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + +close_file(Hdl) -> + file_handle_cache:close(Hdl). + %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -732,7 +725,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> true = is_disjoint(MsgIds1, MsgIdsTmp), %% must open with read flag, otherwise will stomp over contents {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - ?WRITE_MODE ++ [read]), + [read | ?WRITE_MODE]), %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the %% file. @@ -743,10 +736,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% fail, but we still aren't risking losing data ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), - {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), - ok = file:sync(MainHdl), - ok = file:close(MainHdl), - ok = file:close(TmpHdl), + {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:close(TmpHdl), ok = file:delete(TmpPath), {ok, _MainMessages, MsgIdsMain} = @@ -775,7 +767,7 @@ scan_file_for_valid_messages(Dir, FileName) -> Valid = rabbit_msg_file:scan(Hdl), %% if something really bad's happened, the close could fail, %% but ignore - file:close(Hdl), + file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, []}; {error, Reason} -> throw({error, @@ -812,8 +804,8 @@ build_index(Left, [], FilesToCompact, State) -> total_size = TotalSize } | _] -> MaxOffset + TotalSize end, - compact(FilesToCompact, %% this never includes the current file - State #msstate { current_file = Left, current_offset = Offset }); + {Offset, compact(FilesToCompact, %% this never includes the current file + State #msstate { current_file = Left })}; build_index(Left, [File|Files], FilesToCompact, State = #msstate { dir = Dir, file_summary = FileSummary }) -> {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -860,7 +852,7 @@ maybe_roll_to_new_file(Offset, file_summary = FileSummary }) when Offset >= FileSizeLimit -> State1 = sync(State), - ok = file:close(CurHdl), + ok = close_file(CurHdl), NextFile = CurFile + 1, {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), true = ets:update_element(FileSummary, CurFile, @@ -870,9 +862,7 @@ maybe_roll_to_new_file(Offset, file = NextFile, valid_total_size = 0, contiguous_top = 0, left = CurFile, right = undefined }), State2 = State1 #msstate { current_file_handle = NextHdl, - current_file = NextFile, - current_offset = 0, - last_sync_offset = 0 }, + current_file = NextFile }, compact([CurFile], State2); maybe_roll_to_new_file(_, State) -> State. @@ -957,9 +947,9 @@ combine_files(#file_summary { file = Source, contiguous_top = DestinationContiguousTop, right = Source }, State = #msstate { dir = Dir }) -> + State1 = close_handle(Source, close_handle(Destination, State)), SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), - State1 = close_file(SourceName, close_file(DestinationName, State)), {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), @@ -998,21 +988,22 @@ combine_files(#file_summary { file = Source, %% Destination, and MsgLocationDets has been updated to %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end - {ok, 0} = file:position(TmpHdl, 0), + {ok, 0} = file_handle_cache:position(TmpHdl, 0), ok = truncate_and_extend_file( DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid - ok = file:sync(DestinationHdl), - ok = file:close(TmpHdl), + ok = file_handle_cache:sync(DestinationHdl), + ok = close_file(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, SourceWorkList = index_search_by_file(Source, State1), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up - ok = file:close(SourceHdl), - ok = file:close(DestinationHdl), + ok = close_file(SourceHdl), + ok = close_file(DestinationHdl), ok = file:delete(form_filename(Dir, SourceName)), State1. @@ -1042,24 +1033,19 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, %% the previous block BSize = BlockEnd - BlockStart, {ok, BlockStart} = - file:position(SourceHdl, BlockStart), - {ok, BSize} = - file:copy(SourceHdl, DestinationHdl, BSize), + file_handle_cache:position(SourceHdl, BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), {NextOffset, Offset, Offset + TotalSize} end end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file:position(SourceHdl, BlockStart1), - {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), - ok = file:sync(DestinationHdl), + {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl), ok. -close_file(FileName, - State = #msstate { dir = Dir, read_file_handle_cache = HC }) -> - HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, FileName), HC), - State #msstate { read_file_handle_cache = HC1 }. - delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary }) -> [#file_summary { valid_total_size = ValidData, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4b48df82dc..febf3217bd 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -251,7 +251,7 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict, if JCount1 == 0 -> {Hdl, State4} = get_journal_handle(State3), - ok = file_handle_cache:position(Hdl, bof), + {ok, 0} = file_handle_cache:position(Hdl, bof), ok = file_handle_cache:truncate(Hdl), ok = file_handle_cache:sync(Hdl), State4; @@ -705,7 +705,7 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, {Hdl, State1 = #qistate { journal_del_dict = JDelDict, journal_ack_dict = JAckDict }} = get_seg_handle(SegNum, State), - ok = file_handle_cache:position(Hdl, bof), + {ok, 0} = file_handle_cache:position(Hdl, bof), {SDict, PubCount, AckCount, HighRelSeq} = load_segment_entries(Hdl, dict:new(), 0, 0, 0), %% delete ack'd msgs first |
