diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_file.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 23 |
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 792f0efaba..0391090231 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/1]). +-export([append/3, read/2, scan/2]). %%---------------------------------------------------------------------------- @@ -43,6 +43,7 @@ -define(GUID_SIZE_BYTES, 16). -define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). -define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)). +-define(FOUR_MEGA_BYTES, 4194304). %%---------------------------------------------------------------------------- @@ -52,12 +53,13 @@ -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). +-type(file_size() :: non_neg_integer()). -spec(append/3 :: (io_device(), guid(), msg()) -> ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> ({'ok', {guid(), msg()}} | {'error', any()})). --spec(scan/1 :: (io_device()) -> +-spec(scan/2 :: (io_device(), file_size()) -> {'ok', [{guid(), msg_size(), position()}], position()}). -endif. @@ -90,51 +92,42 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan(FileHdl) -> scan(FileHdl, 0, []). - -scan(FileHdl, Offset, Acc) -> - case read_next(FileHdl, Offset) of - eof -> {ok, Acc, Offset}; - {corrupted, NextOffset} -> - scan(FileHdl, NextOffset, Acc); - {ok, {Guid, TotalSize, NextOffset}} -> - scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]); - _KO -> - %% bad message, but we may still have recovered some valid messages - {ok, Acc, Offset} +scan(FileHdl, FileSize) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, [], 0). + +scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> + {ok, Acc, ScanOffset}; +scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> + Read = lists:min([?FOUR_MEGA_BYTES, (FileSize - ReadOffset)]), + case file_handle_cache:read(FileHdl, Read) of + {ok, Data1} -> + {Acc1, ScanOffset1, Data2} = + scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scan(FileHdl, FileSize, Data2, ReadOffset + iolist_size(Data1), + Acc1, ScanOffset1); + _KO -> {ok, Acc, ScanOffset} end. -read_next(FileHdl, Offset) -> - case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of - %% Here we take option 5 from - %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which - %% we read the Guid as a number, and then convert it back to - %% a binary in order to work around bugs in Erlang's GC. - {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} -> - case Size of - 0 -> eof; %% Nothing we can do other than stop - _ -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - ExpectedAbsPos = Offset + TotalSize - 1, - case file_handle_cache:position( - FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of - {ok, ExpectedAbsPos} -> - NextOffset = ExpectedAbsPos + 1, - case file_handle_cache:read(FileHdl, 1) of - {ok, - <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - <<Guid:?GUID_SIZE_BYTES/binary>> = - <<GuidNum:?GUID_SIZE_BITS>>, - {ok, {Guid, TotalSize, NextOffset}}; - {ok, _SomeOtherData} -> - {corrupted, NextOffset}; - KO -> KO - end; - {ok, _SomeOtherPos} -> - %% seek failed, so give up - eof; - KO -> KO - end - end; - Other -> Other - end. +scan(<<>>, Acc, Offset) -> + {Acc, Offset, <<>>}; +scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> + {Acc, Offset, <<>>}; %% Nothing to do other than stop. +scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the Guid as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = + <<GuidAndMsg:Size/binary>>, + <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + _ -> + scan(Rest, Acc, Offset + TotalSize) + end; +scan(Data, Acc, Offset) -> + {Acc, Offset, Data}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9048619265..eb3a5db0a5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1077,7 +1077,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), + Size = filelib:file_size(form_filename(Dir, FileName)), + Valid = rabbit_msg_file:scan(Hdl, Size), %% if something really bad's happened, the close could fail, %% but ignore file_handle_cache:close(Hdl), @@ -1442,7 +1443,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, gc_active = false, gc_pid = GCPid, file_summary_ets = FileSummaryEts }) - when SumValid > ?FILE_SIZE_LIMIT andalso + when SumFileSize > 3 * ?FILE_SIZE_LIMIT andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> First = ets:first(FileSummaryEts), N = rabbit_misc:ceil(math:log(1.0 - random:uniform()) / @@ -1543,7 +1544,6 @@ delete_file_if_empty(File, State = %%---------------------------------------------------------------------------- gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SourceObj = #file_summary { readers = SourceReaders, valid_total_size = SourceValidData, left = DestFile, @@ -1597,6 +1597,8 @@ combine_files(#file_summary { file = Source, ok = truncate_and_extend_file( DestinationHdl, DestinationValid, ExpectedSize); true -> + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), Worklist = lists:dropwhile( fun (#msg_location { offset = Offset }) @@ -1610,8 +1612,7 @@ combine_files(#file_summary { file = Source, %% that the list should be naturally sorted %% as we require, however, we need to %% enforce it anyway - end, - find_unremoved_messages_in_file(Destination, State)), + end, DestinationWorkList), Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file( Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), @@ -1633,7 +1634,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, - SourceWorkList = find_unremoved_messages_in_file(Source, State), + {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1649,12 +1650,14 @@ find_unremoved_messages_in_file(File, scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( - fun ({Guid, _TotalSize, _Offset}, Acc) -> + fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of - Entry = #msg_location { file = File } -> [ Entry | Acc ]; - _ -> Acc + Entry = #msg_location { file = File } -> + {[ Entry | List ], TotalSize + Size}; + _ -> + Acc end - end, [], Messages). + end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> |
