summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_file.erl89
-rw-r--r--src/rabbit_msg_store.erl23
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 792f0efaba..0391090231 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/1]).
+-export([append/3, read/2, scan/2]).
%%----------------------------------------------------------------------------
@@ -43,6 +43,7 @@
-define(GUID_SIZE_BYTES, 16).
-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
-define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)).
+-define(FOUR_MEGA_BYTES, 4194304).
%%----------------------------------------------------------------------------
@@ -52,12 +53,13 @@
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
+-type(file_size() :: non_neg_integer()).
-spec(append/3 :: (io_device(), guid(), msg()) ->
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
({'ok', {guid(), msg()}} | {'error', any()})).
--spec(scan/1 :: (io_device()) ->
+-spec(scan/2 :: (io_device(), file_size()) ->
{'ok', [{guid(), msg_size(), position()}], position()}).
-endif.
@@ -90,51 +92,42 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl) -> scan(FileHdl, 0, []).
-
-scan(FileHdl, Offset, Acc) ->
- case read_next(FileHdl, Offset) of
- eof -> {ok, Acc, Offset};
- {corrupted, NextOffset} ->
- scan(FileHdl, NextOffset, Acc);
- {ok, {Guid, TotalSize, NextOffset}} ->
- scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]);
- _KO ->
- %% bad message, but we may still have recovered some valid messages
- {ok, Acc, Offset}
+scan(FileHdl, FileSize) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, [], 0).
+
+scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+ {ok, Acc, ScanOffset};
+scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+ Read = lists:min([?FOUR_MEGA_BYTES, (FileSize - ReadOffset)]),
+ case file_handle_cache:read(FileHdl, Read) of
+ {ok, Data1} ->
+ {Acc1, ScanOffset1, Data2} =
+ scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scan(FileHdl, FileSize, Data2, ReadOffset + iolist_size(Data1),
+ Acc1, ScanOffset1);
+ _KO -> {ok, Acc, ScanOffset}
end.
-read_next(FileHdl, Offset) ->
- case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which
- %% we read the Guid as a number, and then convert it back to
- %% a binary in order to work around bugs in Erlang's GC.
- {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} ->
- case Size of
- 0 -> eof; %% Nothing we can do other than stop
- _ ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- ExpectedAbsPos = Offset + TotalSize - 1,
- case file_handle_cache:position(
- FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of
- {ok, ExpectedAbsPos} ->
- NextOffset = ExpectedAbsPos + 1,
- case file_handle_cache:read(FileHdl, 1) of
- {ok,
- <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} ->
- <<Guid:?GUID_SIZE_BYTES/binary>> =
- <<GuidNum:?GUID_SIZE_BITS>>,
- {ok, {Guid, TotalSize, NextOffset}};
- {ok, _SomeOtherData} ->
- {corrupted, NextOffset};
- KO -> KO
- end;
- {ok, _SomeOtherPos} ->
- %% seek failed, so give up
- eof;
- KO -> KO
- end
- end;
- Other -> Other
- end.
+scan(<<>>, Acc, Offset) ->
+ {Acc, Offset, <<>>};
+scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
+ {Acc, Offset, <<>>}; %% Nothing to do other than stop.
+scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
+ _ ->
+ scan(Rest, Acc, Offset + TotalSize)
+ end;
+scan(Data, Acc, Offset) ->
+ {Acc, Offset, Data}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9048619265..eb3a5db0a5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1077,7 +1077,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
+ Size = filelib:file_size(form_filename(Dir, FileName)),
+ Valid = rabbit_msg_file:scan(Hdl, Size),
%% if something really bad's happened, the close could fail,
%% but ignore
file_handle_cache:close(Hdl),
@@ -1442,7 +1443,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
gc_active = false,
gc_pid = GCPid,
file_summary_ets = FileSummaryEts })
- when SumValid > ?FILE_SIZE_LIMIT andalso
+ when SumFileSize > 3 * ?FILE_SIZE_LIMIT andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
First = ets:first(FileSummaryEts),
N = rabbit_misc:ceil(math:log(1.0 - random:uniform()) /
@@ -1543,7 +1544,6 @@ delete_file_if_empty(File, State =
%%----------------------------------------------------------------------------
gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
-
[SourceObj = #file_summary {
readers = SourceReaders,
valid_total_size = SourceValidData, left = DestFile,
@@ -1597,6 +1597,8 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(
DestinationHdl, DestinationValid, ExpectedSize);
true ->
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
Worklist =
lists:dropwhile(
fun (#msg_location { offset = Offset })
@@ -1610,8 +1612,7 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end,
- find_unremoved_messages_in_file(Destination, State)),
+ end, DestinationWorkList),
Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = open_file(
Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
@@ -1633,7 +1634,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1649,12 +1650,14 @@ find_unremoved_messages_in_file(File,
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({Guid, _TotalSize, _Offset}, Acc) ->
+ fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
- Entry = #msg_location { file = File } -> [ Entry | Acc ];
- _ -> Acc
+ Entry = #msg_location { file = File } ->
+ {[ Entry | List ], TotalSize + Size};
+ _ ->
+ Acc
end
- end, [], Messages).
+ end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->