diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-17 18:34:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-17 18:34:38 +0100 |
| commit | 94186864554fabb2a85884263550d3018fc4a8c1 (patch) | |
| tree | aa9f2a3e0f8ca419b1058cc737316f2931a916e9 | |
| parent | 15d267347b9833e4b8ddcc661705af6b218d39fe (diff) | |
| download | rabbitmq-server-git-94186864554fabb2a85884263550d3018fc4a8c1.tar.gz | |
Change how we handle a gc finding concurrent readers on the same file. Previously we were just waiting and trying again. Now we get the readers to signal to the GC when they're done. This itself introduces a race and so has to carefully deal with such notifications arriving after the GC has completed, but it removes a magic number. Also fixed a bug which mean readers were reading from locked files. Whoops.
| -rw-r--r-- | src/rabbit_msg_store.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 45 |
2 files changed, 60 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index af0f8de52a..ddb53a24db 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -89,6 +89,7 @@ index_state, index_module, dir, + gc_pid, file_handles_ets, file_summary_ets, dedup_cache_ets, @@ -109,6 +110,7 @@ index_state :: any(), index_module :: atom(), dir :: file_path(), + gc_pid :: pid(), file_handles_ets :: tid(), file_summary_ets :: tid(), dedup_cache_ets :: tid(), @@ -137,7 +139,8 @@ -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {tid(), file_path(), atom(), any()}) -> non_neg_integer()). + {tid(), file_path(), atom(), any()}) -> + 'concurrent_readers' | non_neg_integer()). -endif. @@ -351,13 +354,14 @@ set_maximum_since_use(Server, Age) -> gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). client_init(Server, Ref) -> - {IState, IModule, Dir, + {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, + gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -423,10 +427,20 @@ client_read2(Server, false, _Right, client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - Release = fun() -> ets:update_counter(FileSummaryEts, File, - {#file_summary.readers, -1}) - end, + dedup_cache_ets = DedupCacheEts, + gc_pid = GCPid }) -> + Release = + fun() -> ok = case ets:update_counter(FileSummaryEts, File, + {#file_summary.readers, -1}) of + 0 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + rabbit_msg_store_gc:no_readers( + GCPid, File); + _ -> ok + end; + _ -> ok + end + end, %% If a GC involving the file hasn't already started, it won't %% start now. Need to check again to see if we've been locked in %% the meantime, between lookup and update_counter (thus GC @@ -435,7 +449,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. read(Server, Guid, CState); - [{#file_summary { locked = true }}] -> + [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, %% just defer. @@ -447,7 +461,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, Defer() catch error:badarg -> read(Server, Guid, CState) end; - _ -> + [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving %% the file cannot start up now, and isn't running, so %% nothing will tell us from now on to close the handle if @@ -569,8 +583,9 @@ handle_call({new_client_state, CRef}, _From, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs }) -> - reply({IndexState, IndexModule, Dir, + client_refs = ClientRefs, + gc_pid = GCPid }) -> + reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); @@ -1569,8 +1584,7 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; - false -> timer:sleep(100), - gc(SrcFile, DstFile, State) + false -> concurrent_readers end. combine_files(#file_summary { file = Source, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 038d51c484..96280e103f 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, stop/1]). +-export([start_link/4, gc/3, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). @@ -45,7 +45,8 @@ index_state, index_module, parent, - file_summary_ets + file_summary_ets, + scheduled }). -include("rabbit.hrl"). @@ -60,6 +61,9 @@ start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> gc(Server, Source, Destination) -> gen_server2:cast(Server, {gc, Source, Destination}). +no_readers(Server, File) -> + gen_server2:cast(Server, {no_readers, File}). + stop(Server) -> gen_server2:call(Server, stop, infinity). @@ -75,7 +79,8 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> index_state = IndexState, index_module = IndexModule, parent = Parent, - file_summary_ets = FileSummaryEts }, + file_summary_ets = FileSummaryEts, + scheduled = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -83,14 +88,16 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}. handle_cast({gc, Source, Destination}, - State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts }) -> - Reclaimed = rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}), - ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), + State = #gcstate { scheduled = undefined }) -> + {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), + hibernate}; + +handle_cast({no_readers, File}, + State = #gcstate { scheduled = {Source, Destination} }) + when File =:= Source orelse File =:= Destination -> + {noreply, attempt_gc(State), hibernate}; + +handle_cast({no_readers, _File}, State) -> {noreply, State, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> @@ -105,3 +112,19 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +attempt_gc(State = #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index, + parent = Parent, + file_summary_ets = FileSummaryEts, + scheduled = {Source, Destination} }) -> + case rabbit_msg_store:gc(Source, Destination, + {FileSummaryEts, Dir, Index, IndexState}) of + concurrent_readers -> + State; + Reclaimed -> + ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, + Destination), + State #gcstate { scheduled = undefined } + end. |
