summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl58
1 files changed, 38 insertions, 20 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 59b283cf3c..29e4972e33 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -241,9 +241,12 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId, CState) ->
+ Defer = fun() ->
+ {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState}
+ end,
case index_lookup(MsgId, CState) of
not_found ->
- {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState};
+ Defer();
#msg_location { ref_count = RefCount,
file = File,
offset = Offset,
@@ -254,27 +257,42 @@ read(MsgId, CState) ->
ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
case Right =:= undefined orelse Locked =:= true of
true ->
- {gen_server2:call(?SERVER, {read, MsgId}, infinity),
- CState};
+ Defer();
false ->
ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
{#file_summary.readers, 1}),
- %% need to check again to see if we've
- %% been locked in the meantime
+ Release = fun() ->
+ ets:update_counter(
+ ?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, -1})
+ end,
+ %% If a GC hasn't already started, it
+ %% won't start now. Need to check again to
+ %% see if we've been locked in the
+ %% meantime, between lookup and
+ %% update_counter (thus GC actually in
+ %% progress).
[#file_summary { locked = Locked2 }] =
ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
case Locked2 of
true ->
- {gen_server2:call(?SERVER, {read, MsgId},
- infinity), CState};
+ Release(),
+ Defer();
false ->
- %% ok, we're definitely safe to
+ %% Ok, we're definitely safe to
%% continue - a GC can't start up
%% now, and isn't running, so
%% nothing will tell us from now
%% on to close the handle if it's
- %% already open.
- %% this is fine to fail (already exists)
+ %% already open. (Well, a GC could
+ %% start, and could put close
+ %% entries into the ets table, but
+ %% the GC will wait until we're
+ %% done here before doing any real
+ %% work.)
+
+ %% This is fine to fail (already
+ %% exists)
ets:insert_new(?FILE_HANDLES_ETS_NAME,
{{self(), File}, open}),
CState1 = close_all_indicated(CState),
@@ -295,14 +313,12 @@ read(MsgId, CState) ->
{proc_dict, get()}
]}})
end,
- ets:update_counter(
- ?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, -1}),
+ Release(),
ok = case RefCount > 1 of
true ->
insert_into_cache(MsgId, Msg);
false ->
- %% it's not in the
+ %% It's not in the
%% cache and we only
%% have one reference
%% to the message. So
@@ -497,6 +513,14 @@ handle_cast(sync, State) ->
handle_cast({gc_done, Reclaimed, Source, Dest},
State = #msstate { sum_file_size = SumFileSize,
gc_active = {Source, Dest} }) ->
+ %% GC done, so now ensure that any clients that have open fhs to
+ %% those files close them before using them again. This has to be
+ %% done here, and not when starting up the GC, because if done
+ %% when starting up the GC, the client could find the close, and
+ %% close and reopen the fh, whilst the GC is waiting for readers
+ %% to disappear, before it's actually done the GC.
+ true = mark_handle_to_close(Source),
+ true = mark_handle_to_close(Dest),
%% we always move data left, so Source has gone and was on the
%% right, so need to make dest = source.right.left, and also
%% dest.right = source.right
@@ -1100,12 +1124,6 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
{#file_summary.locked, true}),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest,
{#file_summary.locked, true}),
- %% now that they're locked, we know no queue will touch
- %% them (not even add to the ets table for these files),
- %% so now ensure that we ask the queues to close handles
- %% to these files
- true = mark_handle_to_close(Source),
- true = mark_handle_to_close(Dest),
ok = rabbit_msg_store_gc:gc(Source, Dest),
State1 #msstate { gc_active = {Source, Dest} }
end;