diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 16:34:19 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 16:34:19 +0100 |
| commit | a976be13513623ede58ba24263b80917078b052d (patch) | |
| tree | 061c546257d8680ecd8803f6f1f1e6e06eaf4e46 /src | |
| parent | 3d4e3704f0547bc90ac0561418899dd98bdd2148 (diff) | |
| download | rabbitmq-server-git-a976be13513623ede58ba24263b80917078b052d.tar.gz | |
Expose the msg_store file size limit to configuration. Also some suitable derivisions in tests.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_file.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 17 |
4 files changed, 88 insertions, 60 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 3dbf8eadff..dd0579e9d6 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/2]). +-export([append/3, read/2, scan/3]). %%---------------------------------------------------------------------------- @@ -44,7 +44,7 @@ -define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). -define(GUID_SIZE_BYTES, 16). -define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). --define(SCAN_BLOCK_SIZE, ?FILE_SIZE_LIMIT div 4). +-define(SCAN_BLOCK_SIZE(LIM), (LIM div 4)). %%---------------------------------------------------------------------------- @@ -59,7 +59,7 @@ ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> ({'ok', {guid(), msg()}} | {'error', any()})). --spec(scan/2 :: (io_device(), file_size()) -> +-spec(scan/3 :: (io_device(), file_size(), file_size()) -> {'ok', [{guid(), msg_size(), position()}], position()}). -endif. @@ -92,28 +92,29 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan(FileHdl, FileSize) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0). +scan(FileHdl, FileSize, FileSizeLim) when FileSize >= 0 -> + scan(FileHdl, FileSize, FileSizeLim, <<>>, 0, [], 0). -scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> +scan(_FileHdl, FileSize, _FileSizeLim, _Data, FileSize, Acc, ScanOffset) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> - Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), +scan(FileHdl, FileSize, FileSizeLim, Data, ReadOffset, Acc, ScanOffset) -> + Read = lists:min([?SCAN_BLOCK_SIZE(FileSizeLim), (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scan1(<<Data/binary, Data1/binary>>, Acc, ScanOffset), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); + scan(FileHdl, FileSize, FileSizeLim, Data2, ReadOffset1, Acc1, + ScanOffset1); _KO -> {ok, Acc, ScanOffset} end. -scan(<<>>, Acc, Offset) -> +scan1(<<>>, Acc, Offset) -> {<<>>, Acc, Offset}; -scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> +scan1(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, +scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of @@ -126,9 +127,9 @@ scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = <<GuidAndMsg:Size/binary>>, <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + scan1(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); _ -> - scan(Rest, Acc, Offset + TotalSize) + scan1(Rest, Acc, Offset + TotalSize) end; -scan(Data, Acc, Offset) -> +scan1(Data, Acc, Offset) -> {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 51ad2926b2..c28302dda2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,7 +81,8 @@ dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients - successfully_recovered %% boolean: did we recover state? + successfully_recovered, %% boolean: did we recover state? + file_size_limit %% how big are our files allowed to get? }). -record(client_msstate, @@ -106,6 +107,7 @@ -type(server() :: pid() | atom()). -type(file_num() :: non_neg_integer()). +-type(file_size() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), index_state :: any(), index_module :: atom(), @@ -139,7 +141,7 @@ -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {tid(), file_path(), atom(), any()}) -> + {tid(), file_path(), atom(), any(), file_size()}) -> 'concurrent_readers' | non_neg_integer()). -endif. @@ -518,6 +520,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), + State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, @@ -536,7 +540,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, - successfully_recovered = AllCleanShutdown + successfully_recovered = AllCleanShutdown, + file_size_limit = FileSizeLimit }, ok = case AllCleanShutdown of @@ -548,7 +553,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), + ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames, + FileSizeLimit), %% There should be no more tmp files now, so go ahead and load the %% whole lot @@ -563,7 +569,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> ok = file_handle_cache:truncate(CurHdl), {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + FileSummaryEts, FileSizeLimit), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -1207,22 +1213,24 @@ count_msg_refs(Gen, Seed, State) -> count_msg_refs(Gen, Next, State) end. -recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> +recover_crashed_compactions(Dir, FileNames, TmpFileNames, FileSizeLimit) -> lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFileName, FileNames), ok = recover_crashed_compaction( - Dir, TmpFileName, NonTmpRelatedFileName) + Dir, TmpFileName, NonTmpRelatedFileName, FileSizeLimit) end, TmpFileNames), ok. -recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> +recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName, + FileSizeLimit) -> {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName), + scan_file_for_valid_messages_and_guids(Dir, TmpFileName, FileSizeLimit), {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), + scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName, + FileSizeLimit), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -1299,7 +1307,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> {ok, _MainMessages, GuidsMain} = scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName), + Dir, NonTmpRelatedFileName, FileSizeLimit), %% check that everything in Guids1 is in GuidsMain true = is_sublist(Guids1, GuidsMain), %% check that everything in GuidsTmp is in GuidsMain @@ -1313,11 +1321,12 @@ is_sublist(SmallerL, BiggerL) -> is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). -scan_file_for_valid_messages(Dir, FileName) -> +scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( Hdl, filelib:file_size( - form_filename(Dir, FileName))), + form_filename(Dir, FileName)), + FileSizeLimit), %% if something really bad has happened, %% the close could fail, but ignore file_handle_cache:close(Hdl), @@ -1326,8 +1335,9 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), +scan_file_for_valid_messages_and_guids(Dir, FileName, FileSizeLimit) -> + {ok, Messages, _FileSize} = + scan_file_for_valid_messages(Dir, FileName, FileSizeLimit), {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. %% Takes the list in *ascending* order (i.e. eldest message @@ -1395,10 +1405,11 @@ build_index(Gatherer, Left, [File|Files], State) -> end), build_index(Gatherer, File, Files, State). -build_index_worker(Gatherer, State = #msstate { dir = Dir }, +build_index_worker(Gatherer, State = #msstate { file_size_limit = FileSizeLimit, + dir = Dir }, Left, File, Files) -> {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -1450,8 +1461,9 @@ maybe_roll_to_new_file( current_file_handle = CurHdl, current_file = CurFile, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) - when Offset >= ?FILE_SIZE_LIMIT -> + cur_file_cache_ets = CurFileCacheEts, + file_size_limit = FileSizeLimit }) + when Offset >= FileSizeLimit -> State1 = internal_sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, @@ -1477,8 +1489,9 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, sum_file_size = SumFileSize, gc_active = false, gc_pid = GCPid, - file_summary_ets = FileSummaryEts }) - when (SumFileSize > 2 * ?FILE_SIZE_LIMIT andalso + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) + when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a %% complete traversal of FileSummaryEts. @@ -1486,7 +1499,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, '$end_of_table' -> State; First -> - case find_files_to_gc(FileSummaryEts, + case find_files_to_gc(FileSummaryEts, FileSizeLimit, ets:lookup(FileSummaryEts, First)) of not_found -> State; @@ -1503,7 +1516,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, maybe_compact(State) -> State. -find_files_to_gc(FileSummaryEts, +find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Dst, valid_total_size = DstValid, right = Src }]) -> @@ -1518,9 +1531,10 @@ find_files_to_gc(FileSummaryEts, ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of + _ -> case DstValid + SrcValid =< FileSizeLimit of true -> {Src, Dst}; - false -> find_files_to_gc(FileSummaryEts, Next) + false -> find_files_to_gc( + FileSummaryEts, FileSizeLimit, Next) end end end. @@ -1565,7 +1579,8 @@ delete_file_if_empty(File, State = #msstate { %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> +gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState, + _FileSizeLimit}) -> [SrcObj = #file_summary { readers = SrcReaders, left = DstFile, @@ -1597,7 +1612,8 @@ combine_files(#file_summary { file = Source, valid_total_size = DestinationValid, contiguous_top = DestinationContiguousTop, right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> + State = {_FileSummaryEts, Dir, _Index, _IndexState, + _FileSizeLimit}) -> SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, @@ -1656,11 +1672,11 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(SourceHdl), ExpectedSize. -find_unremoved_messages_in_file(File, - {_FileSummaryEts, Dir, Index, IndexState}) -> +find_unremoved_messages_in_file( + File, {_FileSummaryEts, Dir, Index, IndexState, FileSizeLimit}) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of @@ -1672,7 +1688,8 @@ find_unremoved_messages_in_file(File, end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, + {_FileSummaryEts, _Dir, Index, IndexState, _FileSizeLimit}) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 4b80d088d7..f29bf1a485 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/5, gc/3, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). @@ -46,6 +46,7 @@ index_module, parent, file_summary_ets, + file_size_limit, scheduled }). @@ -55,7 +56,9 @@ -ifdef(use_specs). --spec(start_link/4 :: (file_path(), any(), atom(), tid()) -> +-type(file_size() :: non_neg_integer()). + +-spec(start_link/5 :: (file_path(), any(), atom(), tid(), file_size()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,9 +69,10 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> +start_link(Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit) -> gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], + ?MODULE, + [self(), Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit], [{timeout, infinity}]). gc(Server, Source, Destination) -> @@ -85,7 +89,7 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), {ok, #gcstate { dir = Dir, @@ -93,6 +97,7 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> index_module = IndexModule, parent = Parent, file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit, scheduled = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -131,9 +136,11 @@ attempt_gc(State = #gcstate { dir = Dir, index_module = Index, parent = Parent, file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit, scheduled = {Source, Destination} }) -> case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of + {FileSummaryEts, Dir, Index, IndexState, + FileSizeLimit}) of concurrent_readers -> State; Reclaimed -> ok = rabbit_msg_store:gc_done( Parent, Reclaimed, Source, Destination), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3597efe3a6..36fa855ac3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1475,10 +1475,12 @@ test_msg_store() -> %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% now safe to reuse guids - %% push a lot of msgs in... - BigCount = 100000, + %% push a lot of msgs in... at least 100 files worth + {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), + PayloadSizeBits = 65536, + BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], - Payload = << 0:65536 >>, + Payload = << 0:PayloadSizeBits >>, ok = rabbit_msg_store:client_terminate( lists:foldl( fun (Guid, MSCStateN) -> @@ -1569,10 +1571,11 @@ test_queue_init() -> test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, + MostOfASegment = trunc(SegmentSize*0.75), stop_msg_store(), ok = empty_test_queue(), - SeqIdsA = lists:seq(0,9999), - SeqIdsB = lists:seq(10000,19999), + SeqIdsA = lists:seq(0,MostOfASegment-1), + SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), {0, _Terms, Qi0} = test_queue_init(), {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), @@ -1594,7 +1597,7 @@ test_queue_index() -> _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), - %% should get length back as 10000 + %% should get length back as MostOfASegment LenB = length(SeqIdsB), {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), @@ -1834,7 +1837,7 @@ test_variable_queue_partial_segments_delta_thing() -> test_queue_recover() -> Count = 2*rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - #amqqueue { pid = QPid, name = QName } = Q = + #amqqueue { pid = QPid, name = QName } = rabbit_amqqueue:declare(test_queue(), true, false, [], none), Msg = fun() -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), |
