diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 17:12:58 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 17:12:58 +0100 |
| commit | cb148d3f4da962a2e9a205b70c34d313d5f62482 (patch) | |
| tree | 829a6b4d9d54e7534cd953765a09d44c516b4999 | |
| parent | a976be13513623ede58ba24263b80917078b052d (diff) | |
| download | rabbitmq-server-git-cb148d3f4da962a2e9a205b70c34d313d5f62482.tar.gz | |
Fix msg_file scan block size at 4MB
| -rw-r--r-- | src/rabbit_msg_file.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 18 |
3 files changed, 43 insertions, 57 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index dd0579e9d6..51d875acaf 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/3]). +-export([append/3, read/2, scan/2]). %%---------------------------------------------------------------------------- @@ -44,7 +44,7 @@ -define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). -define(GUID_SIZE_BYTES, 16). -define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). --define(SCAN_BLOCK_SIZE(LIM), (LIM div 4)). +-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB %%---------------------------------------------------------------------------- @@ -59,7 +59,7 @@ ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> ({'ok', {guid(), msg()}} | {'error', any()})). --spec(scan/3 :: (io_device(), file_size(), file_size()) -> +-spec(scan/2 :: (io_device(), file_size()) -> {'ok', [{guid(), msg_size(), position()}], position()}). -endif. @@ -92,29 +92,29 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan(FileHdl, FileSize, FileSizeLim) when FileSize >= 0 -> - scan(FileHdl, FileSize, FileSizeLim, <<>>, 0, [], 0). +scan(FileHdl, FileSize) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, [], 0). -scan(_FileHdl, FileSize, _FileSizeLim, _Data, FileSize, Acc, ScanOffset) -> +scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, FileSizeLim, Data, ReadOffset, Acc, ScanOffset) -> - Read = lists:min([?SCAN_BLOCK_SIZE(FileSizeLim), (FileSize - ReadOffset)]), +scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> + Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scan1(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, FileSizeLim, Data2, ReadOffset1, Acc1, + scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); _KO -> {ok, Acc, ScanOffset} end. -scan1(<<>>, Acc, Offset) -> +scan(<<>>, Acc, Offset) -> {<<>>, Acc, Offset}; -scan1(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> +scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, +scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of @@ -127,9 +127,9 @@ scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = <<GuidAndMsg:Size/binary>>, <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scan1(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); _ -> - scan1(Rest, Acc, Offset + TotalSize) + scan(Rest, Acc, Offset + TotalSize) end; -scan1(Data, Acc, Offset) -> +scan(Data, Acc, Offset) -> {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c28302dda2..0f3f57a0f4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -107,7 +107,6 @@ -type(server() :: pid() | atom()). -type(file_num() :: non_neg_integer()). --type(file_size() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), index_state :: any(), index_module :: atom(), @@ -141,7 +140,7 @@ -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {tid(), file_path(), atom(), any(), file_size()}) -> + {tid(), file_path(), atom(), any()}) -> 'concurrent_readers' | non_neg_integer()). -endif. @@ -553,8 +552,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames, - FileSizeLimit), + ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), %% There should be no more tmp files now, so go ahead and load the %% whole lot @@ -569,7 +567,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> ok = file_handle_cache:truncate(CurHdl), {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts, FileSizeLimit), + FileSummaryEts), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -1213,24 +1211,22 @@ count_msg_refs(Gen, Seed, State) -> count_msg_refs(Gen, Next, State) end. -recover_crashed_compactions(Dir, FileNames, TmpFileNames, FileSizeLimit) -> +recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFileName, FileNames), ok = recover_crashed_compaction( - Dir, TmpFileName, NonTmpRelatedFileName, FileSizeLimit) + Dir, TmpFileName, NonTmpRelatedFileName) end, TmpFileNames), ok. -recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName, - FileSizeLimit) -> +recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName, FileSizeLimit), + scan_file_for_valid_messages_and_guids(Dir, TmpFileName), {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName, - FileSizeLimit), + scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -1307,7 +1303,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName, {ok, _MainMessages, GuidsMain} = scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName, FileSizeLimit), + Dir, NonTmpRelatedFileName), %% check that everything in Guids1 is in GuidsMain true = is_sublist(Guids1, GuidsMain), %% check that everything in GuidsTmp is in GuidsMain @@ -1321,12 +1317,11 @@ is_sublist(SmallerL, BiggerL) -> is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). -scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) -> +scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( Hdl, filelib:file_size( - form_filename(Dir, FileName)), - FileSizeLimit), + form_filename(Dir, FileName))), %% if something really bad has happened, %% the close could fail, but ignore file_handle_cache:close(Hdl), @@ -1335,9 +1330,9 @@ scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName, FileSizeLimit) -> +scan_file_for_valid_messages_and_guids(Dir, FileName) -> {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, FileName, FileSizeLimit), + scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. %% Takes the list in *ascending* order (i.e. eldest message @@ -1405,11 +1400,10 @@ build_index(Gatherer, Left, [File|Files], State) -> end), build_index(Gatherer, File, Files, State). -build_index_worker(Gatherer, State = #msstate { file_size_limit = FileSizeLimit, - dir = Dir }, +build_index_worker(Gatherer, State = #msstate { dir = Dir }, Left, File, Files) -> {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit), + scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -1579,8 +1573,7 @@ delete_file_if_empty(File, State = #msstate { %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState, - _FileSizeLimit}) -> +gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> [SrcObj = #file_summary { readers = SrcReaders, left = DstFile, @@ -1612,8 +1605,7 @@ combine_files(#file_summary { file = Source, valid_total_size = DestinationValid, contiguous_top = DestinationContiguousTop, right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState, - _FileSizeLimit}) -> + State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, @@ -1672,11 +1664,11 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(SourceHdl), ExpectedSize. -find_unremoved_messages_in_file( - File, {_FileSummaryEts, Dir, Index, IndexState, FileSizeLimit}) -> +find_unremoved_messages_in_file(File, + {_FileSummaryEts, Dir, Index, IndexState}) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit), + scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of @@ -1689,7 +1681,7 @@ find_unremoved_messages_in_file( copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, - {_FileSummaryEts, _Dir, Index, IndexState, _FileSizeLimit}) -> + {_FileSummaryEts, _Dir, Index, IndexState}) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index f29bf1a485..56cd422b5b 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/5, gc/3, no_readers/2, stop/1]). +-export([start_link/4, gc/3, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). @@ -46,7 +46,6 @@ index_module, parent, file_summary_ets, - file_size_limit, scheduled }). @@ -56,9 +55,7 @@ -ifdef(use_specs). --type(file_size() :: non_neg_integer()). - --spec(start_link/5 :: (file_path(), any(), atom(), tid(), file_size()) -> +-spec(start_link/4 :: (file_path(), any(), atom(), tid()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -69,10 +66,10 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit) -> +start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> gen_server2:start_link( ?MODULE, - [self(), Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit], + [self(), Dir, IndexState, IndexModule, FileSummaryEts], [{timeout, infinity}]). gc(Server, Source, Destination) -> @@ -89,7 +86,7 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) -> +init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), {ok, #gcstate { dir = Dir, @@ -97,7 +94,6 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) -> index_module = IndexModule, parent = Parent, file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit, scheduled = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -136,11 +132,9 @@ attempt_gc(State = #gcstate { dir = Dir, index_module = Index, parent = Parent, file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit, scheduled = {Source, Destination} }) -> case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState, - FileSizeLimit}) of + {FileSummaryEts, Dir, Index, IndexState}) of concurrent_readers -> State; Reclaimed -> ok = rabbit_msg_store:gc_done( Parent, Reclaimed, Source, Destination), |
