summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl159
1 files changed, 72 insertions, 87 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c8e32665d0..af711a6038 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -47,7 +47,6 @@
-include("rabbit_msg_store.hrl").
-define(SYNC_INTERVAL, 5). %% milliseconds
--define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
@@ -694,9 +693,9 @@ handle_cast({sync, Guids, K},
handle_cast(sync, State) ->
noreply(internal_sync(State));
-handle_cast({gc_done, Reclaimed, Source, Dest},
+handle_cast({gc_done, Reclaimed, Src, Dst},
State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Source, Dest},
+ gc_active = {Src, Dst},
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
%% GC done, so now ensure that any clients that have open fhs to
@@ -706,22 +705,21 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
%% up the GC, the client could find the close, and close and
%% reopen the fh, whilst the GC is waiting for readers to
%% disappear, before it's actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, Source),
- true = mark_handle_to_close(FileHandlesEts, Dest),
- %% we always move data left, so Source has gone and was on the
+ true = mark_handle_to_close(FileHandlesEts, Src),
+ true = mark_handle_to_close(FileHandlesEts, Dst),
+ %% we always move data left, so Src has gone and was on the
%% right, so need to make dest = source.right.left, and also
%% dest.right = source.right
- [#file_summary { left = Dest,
- right = SourceRight,
+ [#file_summary { left = Dst,
+ right = SrcRight,
locked = true,
- readers = 0 }] = ets:lookup(FileSummaryEts, Source),
- %% this could fail if SourceRight == undefined
- ets:update_element(FileSummaryEts, SourceRight,
- {#file_summary.left, Dest}),
- true = ets:update_element(FileSummaryEts, Dest,
+ readers = 0 }] = ets:lookup(FileSummaryEts, Src),
+ %% this could fail if SrcRight == undefined
+ ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
+ true = ets:update_element(FileSummaryEts, Dst,
[{#file_summary.locked, false},
- {#file_summary.right, SourceRight}]),
- true = ets:delete(FileSummaryEts, Source),
+ {#file_summary.right, SrcRight}]),
+ true = ets:delete(FileSummaryEts, Src),
noreply(
maybe_compact(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
@@ -1442,69 +1440,56 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts })
when SumFileSize > 3 * ?FILE_SIZE_LIMIT andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
- First = ets:first(FileSummaryEts),
- N = rabbit_misc:ceil(math:log(1.0 - random:uniform()) /
- math:log(1.0 - ?GEOMETRIC_P)),
- case find_files_to_gc(FileSummaryEts, N, First) of
- undefined ->
+ case ets:first(FileSummaryEts) of
+ '$end_of_table' ->
State;
- {Source, Dest} ->
- State1 = close_handle(Source, close_handle(Dest, State)),
- true = ets:update_element(FileSummaryEts, Source,
- {#file_summary.locked, true}),
- true = ets:update_element(FileSummaryEts, Dest,
- {#file_summary.locked, true}),
- ok = rabbit_msg_store_gc:gc(GCPid, Source, Dest),
- State1 #msstate { gc_active = {Source, Dest} }
+ First ->
+ case find_files_to_gc(FileSummaryEts,
+ ets:lookup(FileSummaryEts, First)) of
+ not_found ->
+ State;
+ {Src, Dst} ->
+ State1 = close_handle(Src, close_handle(Dst, State)),
+ true = ets:update_element(FileSummaryEts, Src,
+ {#file_summary.locked, true}),
+ true = ets:update_element(FileSummaryEts, Dst,
+ {#file_summary.locked, true}),
+ ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst),
+ State1 #msstate { gc_active = {Src, Dst} }
+ end
end;
maybe_compact(State) ->
State.
mark_handle_to_close(FileHandlesEts, File) ->
[ ets:update_element(FileHandlesEts, Key, {2, close})
- || {Key, open} <- ets:match_object(FileHandlesEts,
- {{'_', File}, open}) ],
+ || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
true.
-find_files_to_gc(_FileSummaryEts, _N, '$end_of_table') ->
- undefined;
-find_files_to_gc(FileSummaryEts, N, First) ->
- [FirstObj = #file_summary { right = Right }] =
- ets:lookup(FileSummaryEts, First),
- Pairs = find_files_to_gc(FileSummaryEts, N, FirstObj,
- ets:lookup(FileSummaryEts, Right), []),
- case Pairs of
- [] -> undefined;
- [Pair] -> Pair;
- _ -> Len = length(Pairs), %% The list is the wrong way
- M = Len - (N rem Len), %% around, so subtract our N
- lists:nth(M, Pairs) %% from its length
+find_files_to_gc(FileSummaryEts,
+ [#file_summary { file = Dst,
+ valid_total_size = DstValid,
+ right = Src }]) ->
+ case Src of
+ undefined ->
+ not_found;
+ _ ->
+ [#file_summary { file = Src,
+ valid_total_size = SrcValid,
+ left = Dst,
+ right = SrcRight }] = Next =
+ ets:lookup(FileSummaryEts, Src),
+ case SrcRight of
+ undefined ->
+ not_found;
+ _ ->
+ case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of
+ true -> {Src, Dst};
+ false -> find_files_to_gc(FileSummaryEts, Next)
+ end
+ end
end.
-find_files_to_gc(_FileSummaryEts, _N, #file_summary {}, [], Pairs) ->
- lists:reverse(Pairs);
-find_files_to_gc(FileSummaryEts, N,
- #file_summary { right = Source,
- file = Dest,
- valid_total_size = DestValid },
- [SourceObj = #file_summary { left = Dest,
- right = SourceRight,
- valid_total_size = SourceValid,
- file = Source }],
- Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso
- SourceRight =/= undefined ->
- Pair = {Source, Dest},
- case N == 1 of
- true -> [Pair];
- false -> find_files_to_gc(FileSummaryEts, (N - 1), SourceObj,
- ets:lookup(FileSummaryEts, SourceRight),
- [Pair | Pairs])
- end;
-find_files_to_gc(FileSummaryEts, N, _Left,
- [Right = #file_summary { right = RightRight }], Pairs) ->
- find_files_to_gc(
- FileSummaryEts, N, Right, ets:lookup(FileSummaryEts, RightRight), Pairs).
-
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State = #msstate {
@@ -1547,33 +1532,33 @@ delete_file_if_empty(File, State = #msstate {
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
- [SourceObj = #file_summary {
- readers = SourceReaders,
- valid_total_size = SourceValidData,
- left = DestFile,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, SourceFile),
- [DestObj = #file_summary {
- readers = DestReaders,
- valid_total_size = DestValidData,
- right = SourceFile,
- file_size = DestFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, DestFile),
-
- case SourceReaders =:= 0 andalso DestReaders =:= 0 of
- true -> TotalValidData = DestValidData + SourceValidData,
- ok = combine_files(SourceObj, DestObj, State),
+gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
+ [SrcObj = #file_summary {
+ readers = SrcReaders,
+ valid_total_size = SrcValidData,
+ left = DstFile,
+ file_size = SrcFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, SrcFile),
+ [DstObj = #file_summary {
+ readers = DstReaders,
+ valid_total_size = DstValidData,
+ right = SrcFile,
+ file_size = DstFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, DstFile),
+
+ case SrcReaders =:= 0 andalso DstReaders =:= 0 of
+ true -> TotalValidData = DstValidData + SrcValidData,
+ ok = combine_files(SrcObj, DstObj, State),
%% don't update dest.right, because it could be
%% changing at the same time
true = ets:update_element(
- FileSummaryEts, DestFile,
+ FileSummaryEts, DstFile,
[{#file_summary.valid_total_size, TotalValidData},
{#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
- SourceFileSize + DestFileSize - TotalValidData;
+ SrcFileSize + DstFileSize - TotalValidData;
false -> timer:sleep(100),
- gc(SourceFile, DestFile, State)
+ gc(SrcFile, DstFile, State)
end.
combine_files(#file_summary { file = Source,