diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-18 11:58:35 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-18 11:58:35 +0100 |
| commit | 28c6ed4a02943e19ce9c65f9a856f463f27a79d6 (patch) | |
| tree | 290274a6f8780075213cbc626845fc919bec2c2d /src | |
| parent | 3c5fb288e4fb9533da6143dd30770c2f7194fb70 (diff) | |
| download | rabbitmq-server-git-28c6ed4a02943e19ce9c65f9a856f463f27a79d6.tar.gz | |
Rewrote scanning of message files, to read in blocks of up to 4MB (hence bounded) but can still cope with variable sized files. The advantage here is vastly reduced number of OS calls to position and read. The results is that in tests, GC time is reduced from around 35 seconds to about 2. Code is also 7 lines shorter - and arguably simpler - the only tricky bit is reusing left over data from one block with the next read block, hence different read and scan offsets
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_file.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 23 |
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 792f0efaba..0391090231 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/1]). +-export([append/3, read/2, scan/2]). %%---------------------------------------------------------------------------- @@ -43,6 +43,7 @@ -define(GUID_SIZE_BYTES, 16). -define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). -define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)). +-define(FOUR_MEGA_BYTES, 4194304). %%---------------------------------------------------------------------------- @@ -52,12 +53,13 @@ -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). +-type(file_size() :: non_neg_integer()). -spec(append/3 :: (io_device(), guid(), msg()) -> ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> ({'ok', {guid(), msg()}} | {'error', any()})). --spec(scan/1 :: (io_device()) -> +-spec(scan/2 :: (io_device(), file_size()) -> {'ok', [{guid(), msg_size(), position()}], position()}). -endif. @@ -90,51 +92,42 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan(FileHdl) -> scan(FileHdl, 0, []). - -scan(FileHdl, Offset, Acc) -> - case read_next(FileHdl, Offset) of - eof -> {ok, Acc, Offset}; - {corrupted, NextOffset} -> - scan(FileHdl, NextOffset, Acc); - {ok, {Guid, TotalSize, NextOffset}} -> - scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]); - _KO -> - %% bad message, but we may still have recovered some valid messages - {ok, Acc, Offset} +scan(FileHdl, FileSize) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, [], 0). + +scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> + {ok, Acc, ScanOffset}; +scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> + Read = lists:min([?FOUR_MEGA_BYTES, (FileSize - ReadOffset)]), + case file_handle_cache:read(FileHdl, Read) of + {ok, Data1} -> + {Acc1, ScanOffset1, Data2} = + scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scan(FileHdl, FileSize, Data2, ReadOffset + iolist_size(Data1), + Acc1, ScanOffset1); + _KO -> {ok, Acc, ScanOffset} end. -read_next(FileHdl, Offset) -> - case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of - %% Here we take option 5 from - %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which - %% we read the Guid as a number, and then convert it back to - %% a binary in order to work around bugs in Erlang's GC. - {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} -> - case Size of - 0 -> eof; %% Nothing we can do other than stop - _ -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - ExpectedAbsPos = Offset + TotalSize - 1, - case file_handle_cache:position( - FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of - {ok, ExpectedAbsPos} -> - NextOffset = ExpectedAbsPos + 1, - case file_handle_cache:read(FileHdl, 1) of - {ok, - <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - <<Guid:?GUID_SIZE_BYTES/binary>> = - <<GuidNum:?GUID_SIZE_BITS>>, - {ok, {Guid, TotalSize, NextOffset}}; - {ok, _SomeOtherData} -> - {corrupted, NextOffset}; - KO -> KO - end; - {ok, _SomeOtherPos} -> - %% seek failed, so give up - eof; - KO -> KO - end - end; - Other -> Other - end. +scan(<<>>, Acc, Offset) -> + {Acc, Offset, <<>>}; +scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> + {Acc, Offset, <<>>}; %% Nothing to do other than stop. +scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the Guid as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = + <<GuidAndMsg:Size/binary>>, + <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + _ -> + scan(Rest, Acc, Offset + TotalSize) + end; +scan(Data, Acc, Offset) -> + {Acc, Offset, Data}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9048619265..eb3a5db0a5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1077,7 +1077,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), + Size = filelib:file_size(form_filename(Dir, FileName)), + Valid = rabbit_msg_file:scan(Hdl, Size), %% if something really bad's happened, the close could fail, %% but ignore file_handle_cache:close(Hdl), @@ -1442,7 +1443,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, gc_active = false, gc_pid = GCPid, file_summary_ets = FileSummaryEts }) - when SumValid > ?FILE_SIZE_LIMIT andalso + when SumFileSize > 3 * ?FILE_SIZE_LIMIT andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> First = ets:first(FileSummaryEts), N = rabbit_misc:ceil(math:log(1.0 - random:uniform()) / @@ -1543,7 +1544,6 @@ delete_file_if_empty(File, State = %%---------------------------------------------------------------------------- gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SourceObj = #file_summary { readers = SourceReaders, valid_total_size = SourceValidData, left = DestFile, @@ -1597,6 +1597,8 @@ combine_files(#file_summary { file = Source, ok = truncate_and_extend_file( DestinationHdl, DestinationValid, ExpectedSize); true -> + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), Worklist = lists:dropwhile( fun (#msg_location { offset = Offset }) @@ -1610,8 +1612,7 @@ combine_files(#file_summary { file = Source, %% that the list should be naturally sorted %% as we require, however, we need to %% enforce it anyway - end, - find_unremoved_messages_in_file(Destination, State)), + end, DestinationWorkList), Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file( Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), @@ -1633,7 +1634,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, - SourceWorkList = find_unremoved_messages_in_file(Source, State), + {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1649,12 +1650,14 @@ find_unremoved_messages_in_file(File, scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( - fun ({Guid, _TotalSize, _Offset}, Acc) -> + fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of - Entry = #msg_location { file = File } -> [ Entry | Acc ]; - _ -> Acc + Entry = #msg_location { file = File } -> + {[ Entry | List ], TotalSize + Size}; + _ -> + Acc end - end, [], Messages). + end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> |
