summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_file.erl33
-rw-r--r--src/rabbit_msg_store.erl79
-rw-r--r--src/rabbit_msg_store_gc.erl19
-rw-r--r--src/rabbit_tests.erl17
4 files changed, 88 insertions, 60 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 3dbf8eadff..dd0579e9d6 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2]).
+-export([append/3, read/2, scan/3]).
%%----------------------------------------------------------------------------
@@ -44,7 +44,7 @@
-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)).
-define(GUID_SIZE_BYTES, 16).
-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
--define(SCAN_BLOCK_SIZE, ?FILE_SIZE_LIMIT div 4).
+-define(SCAN_BLOCK_SIZE(LIM), (LIM div 4)).
%%----------------------------------------------------------------------------
@@ -59,7 +59,7 @@
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
({'ok', {guid(), msg()}} | {'error', any()})).
--spec(scan/2 :: (io_device(), file_size()) ->
+-spec(scan/3 :: (io_device(), file_size(), file_size()) ->
{'ok', [{guid(), msg_size(), position()}], position()}).
-endif.
@@ -92,28 +92,29 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0).
+scan(FileHdl, FileSize, FileSizeLim) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, FileSizeLim, <<>>, 0, [], 0).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _FileSizeLim, _Data, FileSize, Acc, ScanOffset) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
- Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
+scan(FileHdl, FileSize, FileSizeLim, Data, ReadOffset, Acc, ScanOffset) ->
+ Read = lists:min([?SCAN_BLOCK_SIZE(FileSizeLim), (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scan1(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1);
+ scan(FileHdl, FileSize, FileSizeLim, Data2, ReadOffset1, Acc1,
+ ScanOffset1);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan(<<>>, Acc, Offset) ->
+scan1(<<>>, Acc, Offset) ->
{<<>>, Acc, Offset};
-scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
+scan1(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
{<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
case WriteMarker of
@@ -126,9 +127,9 @@ scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
<<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
<<GuidAndMsg:Size/binary>>,
<<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
+ scan1(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
_ ->
- scan(Rest, Acc, Offset + TotalSize)
+ scan1(Rest, Acc, Offset + TotalSize)
end;
-scan(Data, Acc, Offset) ->
+scan1(Data, Acc, Offset) ->
{Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 51ad2926b2..c28302dda2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,7 +81,8 @@
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
- successfully_recovered %% boolean: did we recover state?
+ successfully_recovered, %% boolean: did we recover state?
+ file_size_limit %% how big are our files allowed to get?
}).
-record(client_msstate,
@@ -106,6 +107,7 @@
-type(server() :: pid() | atom()).
-type(file_num() :: non_neg_integer()).
+-type(file_size() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
index_state :: any(),
index_module :: atom(),
@@ -139,7 +141,7 @@
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {tid(), file_path(), atom(), any()}) ->
+ {tid(), file_path(), atom(), any(), file_size()}) ->
'concurrent_readers' | non_neg_integer()).
-endif.
@@ -518,6 +520,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -536,7 +540,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
- successfully_recovered = AllCleanShutdown
+ successfully_recovered = AllCleanShutdown,
+ file_size_limit = FileSizeLimit
},
ok = case AllCleanShutdown of
@@ -548,7 +553,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
+ ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames,
+ FileSizeLimit),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
@@ -563,7 +569,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
ok = file_handle_cache:truncate(CurHdl),
{ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
- FileSummaryEts),
+ FileSummaryEts, FileSizeLimit),
{ok, maybe_compact(
State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
@@ -1207,22 +1213,24 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.
-recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
+recover_crashed_compactions(Dir, FileNames, TmpFileNames, FileSizeLimit) ->
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
ok = recover_crashed_compaction(
- Dir, TmpFileName, NonTmpRelatedFileName)
+ Dir, TmpFileName, NonTmpRelatedFileName, FileSizeLimit)
end, TmpFileNames),
ok.
-recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
+recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName,
+ FileSizeLimit) ->
{ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
+ scan_file_for_valid_messages_and_guids(Dir, TmpFileName, FileSizeLimit),
{ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
+ scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName,
+ FileSizeLimit),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1299,7 +1307,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
{ok, _MainMessages, GuidsMain} =
scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName),
+ Dir, NonTmpRelatedFileName, FileSizeLimit),
%% check that everything in Guids1 is in GuidsMain
true = is_sublist(Guids1, GuidsMain),
%% check that everything in GuidsTmp is in GuidsMain
@@ -1313,11 +1321,12 @@ is_sublist(SmallerL, BiggerL) ->
is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-scan_file_for_valid_messages(Dir, FileName) ->
+scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName))),
+ form_filename(Dir, FileName)),
+ FileSizeLimit),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1326,8 +1335,9 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
+scan_file_for_valid_messages_and_guids(Dir, FileName, FileSizeLimit) ->
+ {ok, Messages, _FileSize} =
+ scan_file_for_valid_messages(Dir, FileName, FileSizeLimit),
{ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
%% Takes the list in *ascending* order (i.e. eldest message
@@ -1395,10 +1405,11 @@ build_index(Gatherer, Left, [File|Files], State) ->
end),
build_index(Gatherer, File, Files, State).
-build_index_worker(Gatherer, State = #msstate { dir = Dir },
+build_index_worker(Gatherer, State = #msstate { file_size_limit = FileSizeLimit,
+ dir = Dir },
Left, File, Files) ->
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -1450,8 +1461,9 @@ maybe_roll_to_new_file(
current_file_handle = CurHdl,
current_file = CurFile,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts })
- when Offset >= ?FILE_SIZE_LIMIT ->
+ cur_file_cache_ets = CurFileCacheEts,
+ file_size_limit = FileSizeLimit })
+ when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
@@ -1477,8 +1489,9 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
sum_file_size = SumFileSize,
gc_active = false,
gc_pid = GCPid,
- file_summary_ets = FileSummaryEts })
- when (SumFileSize > 2 * ?FILE_SIZE_LIMIT andalso
+ file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit })
+ when (SumFileSize > 2 * FileSizeLimit andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
@@ -1486,7 +1499,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
'$end_of_table' ->
State;
First ->
- case find_files_to_gc(FileSummaryEts,
+ case find_files_to_gc(FileSummaryEts, FileSizeLimit,
ets:lookup(FileSummaryEts, First)) of
not_found ->
State;
@@ -1503,7 +1516,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
maybe_compact(State) ->
State.
-find_files_to_gc(FileSummaryEts,
+find_files_to_gc(FileSummaryEts, FileSizeLimit,
[#file_summary { file = Dst,
valid_total_size = DstValid,
right = Src }]) ->
@@ -1518,9 +1531,10 @@ find_files_to_gc(FileSummaryEts,
ets:lookup(FileSummaryEts, Src),
case SrcRight of
undefined -> not_found;
- _ -> case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of
+ _ -> case DstValid + SrcValid =< FileSizeLimit of
true -> {Src, Dst};
- false -> find_files_to_gc(FileSummaryEts, Next)
+ false -> find_files_to_gc(
+ FileSummaryEts, FileSizeLimit, Next)
end
end
end.
@@ -1565,7 +1579,8 @@ delete_file_if_empty(File, State = #msstate {
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
+gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState,
+ _FileSizeLimit}) ->
[SrcObj = #file_summary {
readers = SrcReaders,
left = DstFile,
@@ -1597,7 +1612,8 @@ combine_files(#file_summary { file = Source,
valid_total_size = DestinationValid,
contiguous_top = DestinationContiguousTop,
right = Source },
- State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
+ State = {_FileSummaryEts, Dir, _Index, _IndexState,
+ _FileSizeLimit}) ->
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
{ok, SourceHdl} = open_file(Dir, SourceName,
@@ -1656,11 +1672,11 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(SourceHdl),
ExpectedSize.
-find_unremoved_messages_in_file(File,
- {_FileSummaryEts, Dir, Index, IndexState}) ->
+find_unremoved_messages_in_file(
+ File, {_FileSummaryEts, Dir, Index, IndexState, FileSizeLimit}) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
@@ -1672,7 +1688,8 @@ find_unremoved_messages_in_file(File,
end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ Destination,
+ {_FileSummaryEts, _Dir, Index, IndexState, _FileSizeLimit}) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 4b80d088d7..f29bf1a485 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, no_readers/2, stop/1]).
+-export([start_link/5, gc/3, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
@@ -46,6 +46,7 @@
index_module,
parent,
file_summary_ets,
+ file_size_limit,
scheduled
}).
@@ -55,7 +56,9 @@
-ifdef(use_specs).
--spec(start_link/4 :: (file_path(), any(), atom(), tid()) ->
+-type(file_size() :: non_neg_integer()).
+
+-spec(start_link/5 :: (file_path(), any(), atom(), tid(), file_size()) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -66,9 +69,10 @@
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
+start_link(Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit) ->
gen_server2:start_link(
- ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts],
+ ?MODULE,
+ [self(), Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit],
[{timeout, infinity}]).
gc(Server, Source, Destination) ->
@@ -85,7 +89,7 @@ set_maximum_since_use(Pid, Age) ->
%%----------------------------------------------------------------------------
-init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
+init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #gcstate { dir = Dir,
@@ -93,6 +97,7 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
index_module = IndexModule,
parent = Parent,
file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit,
scheduled = undefined },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -131,9 +136,11 @@ attempt_gc(State = #gcstate { dir = Dir,
index_module = Index,
parent = Parent,
file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit,
scheduled = {Source, Destination} }) ->
case rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}) of
+ {FileSummaryEts, Dir, Index, IndexState,
+ FileSizeLimit}) of
concurrent_readers -> State;
Reclaimed -> ok = rabbit_msg_store:gc_done(
Parent, Reclaimed, Source, Destination),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3597efe3a6..36fa855ac3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1475,10 +1475,12 @@ test_msg_store() ->
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(), %% now safe to reuse guids
- %% push a lot of msgs in...
- BigCount = 100000,
+ %% push a lot of msgs in... at least 100 files worth
+ {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
+ PayloadSizeBits = 65536,
+ BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
- Payload = << 0:65536 >>,
+ Payload = << 0:PayloadSizeBits >>,
ok = rabbit_msg_store:client_terminate(
lists:foldl(
fun (Guid, MSCStateN) ->
@@ -1569,10 +1571,11 @@ test_queue_init() ->
test_queue_index() ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
+ MostOfASegment = trunc(SegmentSize*0.75),
stop_msg_store(),
ok = empty_test_queue(),
- SeqIdsA = lists:seq(0,9999),
- SeqIdsB = lists:seq(10000,19999),
+ SeqIdsA = lists:seq(0,MostOfASegment-1),
+ SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
{0, _Terms, Qi0} = test_queue_init(),
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
@@ -1594,7 +1597,7 @@ test_queue_index() ->
_Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
- %% should get length back as 10000
+ %% should get length back as MostOfASegment
LenB = length(SeqIdsB),
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
@@ -1834,7 +1837,7 @@ test_variable_queue_partial_segments_delta_thing() ->
test_queue_recover() ->
Count = 2*rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- #amqqueue { pid = QPid, name = QName } = Q =
+ #amqqueue { pid = QPid, name = QName } =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Msg = fun() -> rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),