diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 45 |
2 files changed, 60 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index af0f8de52a..ddb53a24db 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -89,6 +89,7 @@ index_state, index_module, dir, + gc_pid, file_handles_ets, file_summary_ets, dedup_cache_ets, @@ -109,6 +110,7 @@ index_state :: any(), index_module :: atom(), dir :: file_path(), + gc_pid :: pid(), file_handles_ets :: tid(), file_summary_ets :: tid(), dedup_cache_ets :: tid(), @@ -137,7 +139,8 @@ -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {tid(), file_path(), atom(), any()}) -> non_neg_integer()). + {tid(), file_path(), atom(), any()}) -> + 'concurrent_readers' | non_neg_integer()). -endif. @@ -351,13 +354,14 @@ set_maximum_since_use(Server, Age) -> gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). client_init(Server, Ref) -> - {IState, IModule, Dir, + {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, + gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -423,10 +427,20 @@ client_read2(Server, false, _Right, client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - Release = fun() -> ets:update_counter(FileSummaryEts, File, - {#file_summary.readers, -1}) - end, + dedup_cache_ets = DedupCacheEts, + gc_pid = GCPid }) -> + Release = + fun() -> ok = case ets:update_counter(FileSummaryEts, File, + {#file_summary.readers, -1}) of + 0 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + rabbit_msg_store_gc:no_readers( + GCPid, File); + _ -> ok + end; + _ -> ok + end + end, %% If a GC involving the file hasn't already started, it won't %% start now. Need to check again to see if we've been locked in %% the meantime, between lookup and update_counter (thus GC @@ -435,7 +449,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. read(Server, Guid, CState); - [{#file_summary { locked = true }}] -> + [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, %% just defer. @@ -447,7 +461,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, Defer() catch error:badarg -> read(Server, Guid, CState) end; - _ -> + [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving %% the file cannot start up now, and isn't running, so %% nothing will tell us from now on to close the handle if @@ -569,8 +583,9 @@ handle_call({new_client_state, CRef}, _From, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs }) -> - reply({IndexState, IndexModule, Dir, + client_refs = ClientRefs, + gc_pid = GCPid }) -> + reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); @@ -1569,8 +1584,7 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; - false -> timer:sleep(100), - gc(SrcFile, DstFile, State) + false -> concurrent_readers end. combine_files(#file_summary { file = Source, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 038d51c484..96280e103f 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, stop/1]). +-export([start_link/4, gc/3, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). @@ -45,7 +45,8 @@ index_state, index_module, parent, - file_summary_ets + file_summary_ets, + scheduled }). -include("rabbit.hrl"). @@ -60,6 +61,9 @@ start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> gc(Server, Source, Destination) -> gen_server2:cast(Server, {gc, Source, Destination}). +no_readers(Server, File) -> + gen_server2:cast(Server, {no_readers, File}). + stop(Server) -> gen_server2:call(Server, stop, infinity). @@ -75,7 +79,8 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> index_state = IndexState, index_module = IndexModule, parent = Parent, - file_summary_ets = FileSummaryEts }, + file_summary_ets = FileSummaryEts, + scheduled = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -83,14 +88,16 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}. handle_cast({gc, Source, Destination}, - State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts }) -> - Reclaimed = rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}), - ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), + State = #gcstate { scheduled = undefined }) -> + {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), + hibernate}; + +handle_cast({no_readers, File}, + State = #gcstate { scheduled = {Source, Destination} }) + when File =:= Source orelse File =:= Destination -> + {noreply, attempt_gc(State), hibernate}; + +handle_cast({no_readers, _File}, State) -> {noreply, State, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> @@ -105,3 +112,19 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +attempt_gc(State = #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index, + parent = Parent, + file_summary_ets = FileSummaryEts, + scheduled = {Source, Destination} }) -> + case rabbit_msg_store:gc(Source, Destination, + {FileSummaryEts, Dir, Index, IndexState}) of + concurrent_readers -> + State; + Reclaimed -> + ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, + Destination), + State #gcstate { scheduled = undefined } + end. |
