summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-18 11:58:35 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-18 11:58:35 +0100
commit28c6ed4a02943e19ce9c65f9a856f463f27a79d6 (patch)
tree290274a6f8780075213cbc626845fc919bec2c2d /src
parent3c5fb288e4fb9533da6143dd30770c2f7194fb70 (diff)
downloadrabbitmq-server-git-28c6ed4a02943e19ce9c65f9a856f463f27a79d6.tar.gz
Rewrote scanning of message files, to read in blocks of up to 4MB (hence bounded) but can still cope with variable sized files. The advantage here is vastly reduced number of OS calls to position and read. The results is that in tests, GC time is reduced from around 35 seconds to about 2. Code is also 7 lines shorter - and arguably simpler - the only tricky bit is reusing left over data from one block with the next read block, hence different read and scan offsets
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_file.erl89
-rw-r--r--src/rabbit_msg_store.erl23
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 792f0efaba..0391090231 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/1]).
+-export([append/3, read/2, scan/2]).
%%----------------------------------------------------------------------------
@@ -43,6 +43,7 @@
-define(GUID_SIZE_BYTES, 16).
-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
-define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)).
+-define(FOUR_MEGA_BYTES, 4194304).
%%----------------------------------------------------------------------------
@@ -52,12 +53,13 @@
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
+-type(file_size() :: non_neg_integer()).
-spec(append/3 :: (io_device(), guid(), msg()) ->
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
({'ok', {guid(), msg()}} | {'error', any()})).
--spec(scan/1 :: (io_device()) ->
+-spec(scan/2 :: (io_device(), file_size()) ->
{'ok', [{guid(), msg_size(), position()}], position()}).
-endif.
@@ -90,51 +92,42 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl) -> scan(FileHdl, 0, []).
-
-scan(FileHdl, Offset, Acc) ->
- case read_next(FileHdl, Offset) of
- eof -> {ok, Acc, Offset};
- {corrupted, NextOffset} ->
- scan(FileHdl, NextOffset, Acc);
- {ok, {Guid, TotalSize, NextOffset}} ->
- scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]);
- _KO ->
- %% bad message, but we may still have recovered some valid messages
- {ok, Acc, Offset}
+scan(FileHdl, FileSize) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, [], 0).
+
+scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+ {ok, Acc, ScanOffset};
+scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+ Read = lists:min([?FOUR_MEGA_BYTES, (FileSize - ReadOffset)]),
+ case file_handle_cache:read(FileHdl, Read) of
+ {ok, Data1} ->
+ {Acc1, ScanOffset1, Data2} =
+ scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scan(FileHdl, FileSize, Data2, ReadOffset + iolist_size(Data1),
+ Acc1, ScanOffset1);
+ _KO -> {ok, Acc, ScanOffset}
end.
-read_next(FileHdl, Offset) ->
- case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which
- %% we read the Guid as a number, and then convert it back to
- %% a binary in order to work around bugs in Erlang's GC.
- {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} ->
- case Size of
- 0 -> eof; %% Nothing we can do other than stop
- _ ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- ExpectedAbsPos = Offset + TotalSize - 1,
- case file_handle_cache:position(
- FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of
- {ok, ExpectedAbsPos} ->
- NextOffset = ExpectedAbsPos + 1,
- case file_handle_cache:read(FileHdl, 1) of
- {ok,
- <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} ->
- <<Guid:?GUID_SIZE_BYTES/binary>> =
- <<GuidNum:?GUID_SIZE_BITS>>,
- {ok, {Guid, TotalSize, NextOffset}};
- {ok, _SomeOtherData} ->
- {corrupted, NextOffset};
- KO -> KO
- end;
- {ok, _SomeOtherPos} ->
- %% seek failed, so give up
- eof;
- KO -> KO
- end
- end;
- Other -> Other
- end.
+scan(<<>>, Acc, Offset) ->
+ {Acc, Offset, <<>>};
+scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
+ {Acc, Offset, <<>>}; %% Nothing to do other than stop.
+scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
+ _ ->
+ scan(Rest, Acc, Offset + TotalSize)
+ end;
+scan(Data, Acc, Offset) ->
+ {Acc, Offset, Data}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9048619265..eb3a5db0a5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1077,7 +1077,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
+ Size = filelib:file_size(form_filename(Dir, FileName)),
+ Valid = rabbit_msg_file:scan(Hdl, Size),
%% if something really bad's happened, the close could fail,
%% but ignore
file_handle_cache:close(Hdl),
@@ -1442,7 +1443,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
gc_active = false,
gc_pid = GCPid,
file_summary_ets = FileSummaryEts })
- when SumValid > ?FILE_SIZE_LIMIT andalso
+ when SumFileSize > 3 * ?FILE_SIZE_LIMIT andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
First = ets:first(FileSummaryEts),
N = rabbit_misc:ceil(math:log(1.0 - random:uniform()) /
@@ -1543,7 +1544,6 @@ delete_file_if_empty(File, State =
%%----------------------------------------------------------------------------
gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
-
[SourceObj = #file_summary {
readers = SourceReaders,
valid_total_size = SourceValidData, left = DestFile,
@@ -1597,6 +1597,8 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(
DestinationHdl, DestinationValid, ExpectedSize);
true ->
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
Worklist =
lists:dropwhile(
fun (#msg_location { offset = Offset })
@@ -1610,8 +1612,7 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end,
- find_unremoved_messages_in_file(Destination, State)),
+ end, DestinationWorkList),
Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = open_file(
Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
@@ -1633,7 +1634,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1649,12 +1650,14 @@ find_unremoved_messages_in_file(File,
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({Guid, _TotalSize, _Offset}, Acc) ->
+ fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
- Entry = #msg_location { file = File } -> [ Entry | Acc ];
- _ -> Acc
+ Entry = #msg_location { file = File } ->
+ {[ Entry | List ], TotalSize + Size};
+ _ ->
+ Acc
end
- end, [], Messages).
+ end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->