summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl38
-rw-r--r--src/rabbit_msg_store_gc.erl45
2 files changed, 60 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index af0f8de52a..ddb53a24db 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -89,6 +89,7 @@
index_state,
index_module,
dir,
+ gc_pid,
file_handles_ets,
file_summary_ets,
dedup_cache_ets,
@@ -109,6 +110,7 @@
index_state :: any(),
index_module :: atom(),
dir :: file_path(),
+ gc_pid :: pid(),
file_handles_ets :: tid(),
file_summary_ets :: tid(),
dedup_cache_ets :: tid(),
@@ -137,7 +139,8 @@
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {tid(), file_path(), atom(), any()}) -> non_neg_integer()).
+ {tid(), file_path(), atom(), any()}) ->
+ 'concurrent_readers' | non_neg_integer()).
-endif.
@@ -351,13 +354,14 @@ set_maximum_since_use(Server, Age) ->
gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
- {IState, IModule, Dir,
+ {IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
dir = Dir,
+ gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -423,10 +427,20 @@ client_read2(Server, false, _Right,
client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- Release = fun() -> ets:update_counter(FileSummaryEts, File,
- {#file_summary.readers, -1})
- end,
+ dedup_cache_ets = DedupCacheEts,
+ gc_pid = GCPid }) ->
+ Release =
+ fun() -> ok = case ets:update_counter(FileSummaryEts, File,
+ {#file_summary.readers, -1}) of
+ 0 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ rabbit_msg_store_gc:no_readers(
+ GCPid, File);
+ _ -> ok
+ end;
+ _ -> ok
+ end
+ end,
%% If a GC involving the file hasn't already started, it won't
%% start now. Need to check again to see if we've been locked in
%% the meantime, between lookup and update_counter (thus GC
@@ -435,7 +449,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
read(Server, Guid, CState);
- [{#file_summary { locked = true }}] ->
+ [#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
%% just defer.
@@ -447,7 +461,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
Defer()
catch error:badarg -> read(Server, Guid, CState)
end;
- _ ->
+ [#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
%% the file cannot start up now, and isn't running, so
%% nothing will tell us from now on to close the handle if
@@ -569,8 +583,9 @@ handle_call({new_client_state, CRef}, _From,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs }) ->
- reply({IndexState, IndexModule, Dir,
+ client_refs = ClientRefs,
+ gc_pid = GCPid }) ->
+ reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
@@ -1569,8 +1584,7 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
{#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
SrcFileSize + DstFileSize - TotalValidData;
- false -> timer:sleep(100),
- gc(SrcFile, DstFile, State)
+ false -> concurrent_readers
end.
combine_files(#file_summary { file = Source,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 038d51c484..96280e103f 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, stop/1]).
+-export([start_link/4, gc/3, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
@@ -45,7 +45,8 @@
index_state,
index_module,
parent,
- file_summary_ets
+ file_summary_ets,
+ scheduled
}).
-include("rabbit.hrl").
@@ -60,6 +61,9 @@ start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
gc(Server, Source, Destination) ->
gen_server2:cast(Server, {gc, Source, Destination}).
+no_readers(Server, File) ->
+ gen_server2:cast(Server, {no_readers, File}).
+
stop(Server) ->
gen_server2:call(Server, stop, infinity).
@@ -75,7 +79,8 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
index_state = IndexState,
index_module = IndexModule,
parent = Parent,
- file_summary_ets = FileSummaryEts },
+ file_summary_ets = FileSummaryEts,
+ scheduled = undefined },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -83,14 +88,16 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({gc, Source, Destination},
- State = #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index,
- parent = Parent,
- file_summary_ets = FileSummaryEts }) ->
- Reclaimed = rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}),
- ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
+ State = #gcstate { scheduled = undefined }) ->
+ {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }),
+ hibernate};
+
+handle_cast({no_readers, File},
+ State = #gcstate { scheduled = {Source, Destination} })
+ when File =:= Source orelse File =:= Destination ->
+ {noreply, attempt_gc(State), hibernate};
+
+handle_cast({no_readers, _File}, State) ->
{noreply, State, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -105,3 +112,19 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+attempt_gc(State = #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index,
+ parent = Parent,
+ file_summary_ets = FileSummaryEts,
+ scheduled = {Source, Destination} }) ->
+ case rabbit_msg_store:gc(Source, Destination,
+ {FileSummaryEts, Dir, Index, IndexState}) of
+ concurrent_readers ->
+ State;
+ Reclaimed ->
+ ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source,
+ Destination),
+ State #gcstate { scheduled = undefined }
+ end.