summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_file.erl32
-rw-r--r--src/rabbit_msg_store.erl50
-rw-r--r--src/rabbit_msg_store_gc.erl18
3 files changed, 43 insertions, 57 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index dd0579e9d6..51d875acaf 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/3]).
+-export([append/3, read/2, scan/2]).
%%----------------------------------------------------------------------------
@@ -44,7 +44,7 @@
-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)).
-define(GUID_SIZE_BYTES, 16).
-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
--define(SCAN_BLOCK_SIZE(LIM), (LIM div 4)).
+-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB
%%----------------------------------------------------------------------------
@@ -59,7 +59,7 @@
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
({'ok', {guid(), msg()}} | {'error', any()})).
--spec(scan/3 :: (io_device(), file_size(), file_size()) ->
+-spec(scan/2 :: (io_device(), file_size()) ->
{'ok', [{guid(), msg_size(), position()}], position()}).
-endif.
@@ -92,29 +92,29 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl, FileSize, FileSizeLim) when FileSize >= 0 ->
- scan(FileHdl, FileSize, FileSizeLim, <<>>, 0, [], 0).
+scan(FileHdl, FileSize) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, [], 0).
-scan(_FileHdl, FileSize, _FileSizeLim, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, FileSizeLim, Data, ReadOffset, Acc, ScanOffset) ->
- Read = lists:min([?SCAN_BLOCK_SIZE(FileSizeLim), (FileSize - ReadOffset)]),
+scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+ Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan1(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, FileSizeLim, Data2, ReadOffset1, Acc1,
+ scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1,
ScanOffset1);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan1(<<>>, Acc, Offset) ->
+scan(<<>>, Acc, Offset) ->
{<<>>, Acc, Offset};
-scan1(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
+scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
{<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
case WriteMarker of
@@ -127,9 +127,9 @@ scan1(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
<<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
<<GuidAndMsg:Size/binary>>,
<<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan1(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
+ scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
_ ->
- scan1(Rest, Acc, Offset + TotalSize)
+ scan(Rest, Acc, Offset + TotalSize)
end;
-scan1(Data, Acc, Offset) ->
+scan(Data, Acc, Offset) ->
{Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c28302dda2..0f3f57a0f4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -107,7 +107,6 @@
-type(server() :: pid() | atom()).
-type(file_num() :: non_neg_integer()).
--type(file_size() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
index_state :: any(),
index_module :: atom(),
@@ -141,7 +140,7 @@
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {tid(), file_path(), atom(), any(), file_size()}) ->
+ {tid(), file_path(), atom(), any()}) ->
'concurrent_readers' | non_neg_integer()).
-endif.
@@ -553,8 +552,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames,
- FileSizeLimit),
+ ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
@@ -569,7 +567,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
ok = file_handle_cache:truncate(CurHdl),
{ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
- FileSummaryEts, FileSizeLimit),
+ FileSummaryEts),
{ok, maybe_compact(
State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
@@ -1213,24 +1211,22 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.
-recover_crashed_compactions(Dir, FileNames, TmpFileNames, FileSizeLimit) ->
+recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
ok = recover_crashed_compaction(
- Dir, TmpFileName, NonTmpRelatedFileName, FileSizeLimit)
+ Dir, TmpFileName, NonTmpRelatedFileName)
end, TmpFileNames),
ok.
-recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName,
- FileSizeLimit) ->
+recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
{ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName, FileSizeLimit),
+ scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
{ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName,
- FileSizeLimit),
+ scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1307,7 +1303,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName,
{ok, _MainMessages, GuidsMain} =
scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName, FileSizeLimit),
+ Dir, NonTmpRelatedFileName),
%% check that everything in Guids1 is in GuidsMain
true = is_sublist(Guids1, GuidsMain),
%% check that everything in GuidsTmp is in GuidsMain
@@ -1321,12 +1317,11 @@ is_sublist(SmallerL, BiggerL) ->
is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) ->
+scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName)),
- FileSizeLimit),
+ form_filename(Dir, FileName))),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1335,9 +1330,9 @@ scan_file_for_valid_messages(Dir, FileName, FileSizeLimit) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName, FileSizeLimit) ->
+scan_file_for_valid_messages_and_guids(Dir, FileName) ->
{ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, FileName, FileSizeLimit),
+ scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
%% Takes the list in *ascending* order (i.e. eldest message
@@ -1405,11 +1400,10 @@ build_index(Gatherer, Left, [File|Files], State) ->
end),
build_index(Gatherer, File, Files, State).
-build_index_worker(Gatherer, State = #msstate { file_size_limit = FileSizeLimit,
- dir = Dir },
+build_index_worker(Gatherer, State = #msstate { dir = Dir },
Left, File, Files) ->
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit),
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -1579,8 +1573,7 @@ delete_file_if_empty(File, State = #msstate {
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState,
- _FileSizeLimit}) ->
+gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
[SrcObj = #file_summary {
readers = SrcReaders,
left = DstFile,
@@ -1612,8 +1605,7 @@ combine_files(#file_summary { file = Source,
valid_total_size = DestinationValid,
contiguous_top = DestinationContiguousTop,
right = Source },
- State = {_FileSummaryEts, Dir, _Index, _IndexState,
- _FileSizeLimit}) ->
+ State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
{ok, SourceHdl} = open_file(Dir, SourceName,
@@ -1672,11 +1664,11 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(SourceHdl),
ExpectedSize.
-find_unremoved_messages_in_file(
- File, {_FileSummaryEts, Dir, Index, IndexState, FileSizeLimit}) ->
+find_unremoved_messages_in_file(File,
+ {_FileSummaryEts, Dir, Index, IndexState}) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File), FileSizeLimit),
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
@@ -1689,7 +1681,7 @@ find_unremoved_messages_in_file(
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination,
- {_FileSummaryEts, _Dir, Index, IndexState, _FileSizeLimit}) ->
+ {_FileSummaryEts, _Dir, Index, IndexState}) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index f29bf1a485..56cd422b5b 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/5, gc/3, no_readers/2, stop/1]).
+-export([start_link/4, gc/3, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
@@ -46,7 +46,6 @@
index_module,
parent,
file_summary_ets,
- file_size_limit,
scheduled
}).
@@ -56,9 +55,7 @@
-ifdef(use_specs).
--type(file_size() :: non_neg_integer()).
-
--spec(start_link/5 :: (file_path(), any(), atom(), tid(), file_size()) ->
+-spec(start_link/4 :: (file_path(), any(), atom(), tid()) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -69,10 +66,10 @@
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit) ->
+start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
gen_server2:start_link(
?MODULE,
- [self(), Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit],
+ [self(), Dir, IndexState, IndexModule, FileSummaryEts],
[{timeout, infinity}]).
gc(Server, Source, Destination) ->
@@ -89,7 +86,7 @@ set_maximum_since_use(Pid, Age) ->
%%----------------------------------------------------------------------------
-init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) ->
+init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #gcstate { dir = Dir,
@@ -97,7 +94,6 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts, FileSizeLimit]) ->
index_module = IndexModule,
parent = Parent,
file_summary_ets = FileSummaryEts,
- file_size_limit = FileSizeLimit,
scheduled = undefined },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -136,11 +132,9 @@ attempt_gc(State = #gcstate { dir = Dir,
index_module = Index,
parent = Parent,
file_summary_ets = FileSummaryEts,
- file_size_limit = FileSizeLimit,
scheduled = {Source, Destination} }) ->
case rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState,
- FileSizeLimit}) of
+ {FileSummaryEts, Dir, Index, IndexState}) of
concurrent_readers -> State;
Reclaimed -> ok = rabbit_msg_store:gc_done(
Parent, Reclaimed, Source, Destination),