summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-17 18:34:38 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-17 18:34:38 +0100
commit94186864554fabb2a85884263550d3018fc4a8c1 (patch)
treeaa9f2a3e0f8ca419b1058cc737316f2931a916e9
parent15d267347b9833e4b8ddcc661705af6b218d39fe (diff)
downloadrabbitmq-server-git-94186864554fabb2a85884263550d3018fc4a8c1.tar.gz
Change how we handle a gc finding concurrent readers on the same file. Previously we were just waiting and trying again. Now we get the readers to signal to the GC when they're done. This itself introduces a race and so has to carefully deal with such notifications arriving after the GC has completed, but it removes a magic number. Also fixed a bug which mean readers were reading from locked files. Whoops.
-rw-r--r--src/rabbit_msg_store.erl38
-rw-r--r--src/rabbit_msg_store_gc.erl45
2 files changed, 60 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index af0f8de52a..ddb53a24db 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -89,6 +89,7 @@
index_state,
index_module,
dir,
+ gc_pid,
file_handles_ets,
file_summary_ets,
dedup_cache_ets,
@@ -109,6 +110,7 @@
index_state :: any(),
index_module :: atom(),
dir :: file_path(),
+ gc_pid :: pid(),
file_handles_ets :: tid(),
file_summary_ets :: tid(),
dedup_cache_ets :: tid(),
@@ -137,7 +139,8 @@
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {tid(), file_path(), atom(), any()}) -> non_neg_integer()).
+ {tid(), file_path(), atom(), any()}) ->
+ 'concurrent_readers' | non_neg_integer()).
-endif.
@@ -351,13 +354,14 @@ set_maximum_since_use(Server, Age) ->
gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
- {IState, IModule, Dir,
+ {IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
dir = Dir,
+ gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -423,10 +427,20 @@ client_read2(Server, false, _Right,
client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- Release = fun() -> ets:update_counter(FileSummaryEts, File,
- {#file_summary.readers, -1})
- end,
+ dedup_cache_ets = DedupCacheEts,
+ gc_pid = GCPid }) ->
+ Release =
+ fun() -> ok = case ets:update_counter(FileSummaryEts, File,
+ {#file_summary.readers, -1}) of
+ 0 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ rabbit_msg_store_gc:no_readers(
+ GCPid, File);
+ _ -> ok
+ end;
+ _ -> ok
+ end
+ end,
%% If a GC involving the file hasn't already started, it won't
%% start now. Need to check again to see if we've been locked in
%% the meantime, between lookup and update_counter (thus GC
@@ -435,7 +449,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
read(Server, Guid, CState);
- [{#file_summary { locked = true }}] ->
+ [#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
%% just defer.
@@ -447,7 +461,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
Defer()
catch error:badarg -> read(Server, Guid, CState)
end;
- _ ->
+ [#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
%% the file cannot start up now, and isn't running, so
%% nothing will tell us from now on to close the handle if
@@ -569,8 +583,9 @@ handle_call({new_client_state, CRef}, _From,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs }) ->
- reply({IndexState, IndexModule, Dir,
+ client_refs = ClientRefs,
+ gc_pid = GCPid }) ->
+ reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
@@ -1569,8 +1584,7 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
{#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
SrcFileSize + DstFileSize - TotalValidData;
- false -> timer:sleep(100),
- gc(SrcFile, DstFile, State)
+ false -> concurrent_readers
end.
combine_files(#file_summary { file = Source,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 038d51c484..96280e103f 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, stop/1]).
+-export([start_link/4, gc/3, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
@@ -45,7 +45,8 @@
index_state,
index_module,
parent,
- file_summary_ets
+ file_summary_ets,
+ scheduled
}).
-include("rabbit.hrl").
@@ -60,6 +61,9 @@ start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
gc(Server, Source, Destination) ->
gen_server2:cast(Server, {gc, Source, Destination}).
+no_readers(Server, File) ->
+ gen_server2:cast(Server, {no_readers, File}).
+
stop(Server) ->
gen_server2:call(Server, stop, infinity).
@@ -75,7 +79,8 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
index_state = IndexState,
index_module = IndexModule,
parent = Parent,
- file_summary_ets = FileSummaryEts },
+ file_summary_ets = FileSummaryEts,
+ scheduled = undefined },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -83,14 +88,16 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({gc, Source, Destination},
- State = #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index,
- parent = Parent,
- file_summary_ets = FileSummaryEts }) ->
- Reclaimed = rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}),
- ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
+ State = #gcstate { scheduled = undefined }) ->
+ {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }),
+ hibernate};
+
+handle_cast({no_readers, File},
+ State = #gcstate { scheduled = {Source, Destination} })
+ when File =:= Source orelse File =:= Destination ->
+ {noreply, attempt_gc(State), hibernate};
+
+handle_cast({no_readers, _File}, State) ->
{noreply, State, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -105,3 +112,19 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+attempt_gc(State = #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index,
+ parent = Parent,
+ file_summary_ets = FileSummaryEts,
+ scheduled = {Source, Destination} }) ->
+ case rabbit_msg_store:gc(Source, Destination,
+ {FileSummaryEts, Dir, Index, IndexState}) of
+ concurrent_readers ->
+ State;
+ Reclaimed ->
+ ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source,
+ Destination),
+ State #gcstate { scheduled = undefined }
+ end.