summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl142
1 files changed, 94 insertions, 48 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 50114a96f3..c605a6a20c 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -312,55 +312,101 @@ client_terminate(CState) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File }
- = MsgLocation, Defer, CState) ->
- [#file_summary { locked = Locked, right = Right }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case {Right, Locked} of
- {undefined, false} ->
- case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
- [] ->
- Defer(); %% may have rolled over
- [{MsgId, _FileOrUndefined, Msg}] ->
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
- {{ok, Msg}, CState}
- end;
- {_, true} ->
- Defer();
- _ ->
- ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, +1}),
- Release = fun() ->
- ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, -1})
- end,
- %% If a GC hasn't already started, it won't start
- %% now. Need to check again to see if we've been locked in
- %% the meantime, between lookup and update_counter (thus
- %% GC actually in progress).
- [#file_summary { locked = Locked2 }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case Locked2 of
- true ->
- Release(),
+client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount,
+ file = File }, Defer, CState) ->
+ case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of
+ [] -> %% File has been GC'd and no longer exists. Go around again.
+ read(MsgId, CState);
+ [#file_summary { locked = Locked, right = Right }] ->
+ case {Right, Locked} of
+ {undefined, false} ->
+ case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ [] ->
+ Defer(); %% may have rolled over
+ [{MsgId, _FileOrUndefined, Msg}] ->
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ {{ok, Msg}, CState}
+ end;
+ {_, true} ->
+ %% of course, in the mean time, the GC could have
+ %% run and our msg is actually in a different
+ %% file, unlocked. However, defering is the safest
+ %% and simplest thing to do.
Defer();
- false ->
- %% Ok, we're definitely safe to continue - a GC
- %% can't start up now, and isn't running, so
- %% nothing will tell us from now on to close the
- %% handle if it's already open. (Well, a GC could
- %% start, and could put close entries into the ets
- %% table, but the GC will wait until we're done
- %% here before doing any real work.)
-
- %% This is fine to fail (already exists)
- ets:insert_new(?FILE_HANDLES_ETS_NAME,
- {{self(), File}, open}),
- CState1 = close_all_indicated(CState),
- {Msg, CState2} = read_from_disk(MsgLocation, CState1),
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
- Release(),
- {{ok, Msg}, CState2}
+ _ ->
+ %% It's entirely possible that everything we're
+ %% doing from here on is for the wrong file, or a
+ %% non-existent file, as a GC may have finished.
+ try
+ ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, +1})
+ catch error:badarg ->
+ %% the File has been GC'd and deleted. Go around.
+ read(MsgId, CState)
+ end,
+ Release = fun() -> ets:update_counter(
+ ?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, -1})
+ end,
+ %% If a GC hasn't already started, it won't start
+ %% now. Need to check again to see if we've been
+ %% locked in the meantime, between lookup and
+ %% update_counter (thus GC started before our +1).
+ [#file_summary { locked = Locked2 }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Locked2 of
+ true ->
+ %% If we get a badarg here, then the GC
+ %% has finished and deleted our file. Try
+ %% going around again. Otherwise, just
+ %% defer.
+
+ %% badarg scenario:
+ %% we lookup, msg_store locks, gc starts,
+ %% gc ends, we +1 readers, msg_store
+ %% ets:deletes (and unlocks the dest)
+ try
+ Release(),
+ Defer()
+ catch error:badarg -> read(MsgId, CState)
+ end;
+ false ->
+ %% Ok, we're definitely safe to continue -
+ %% a GC can't start up now, and isn't
+ %% running, so nothing will tell us from
+ %% now on to close the handle if it's
+ %% already open. (Well, a GC could start,
+ %% and could put close entries into the
+ %% ets table, but the GC will wait until
+ %% we're done here before doing any real
+ %% work.)
+
+ %% Finally, we need to recheck that the
+ %% msg is still at the same place - it's
+ %% possible an entire GC ran between us
+ %% doing the lookup and the +1 on the
+ %% readers. (Same as badarg scenario
+ %% above, but we don't have a missing file
+ %% - we just have the /wrong/ file).
+
+ case index_lookup(MsgId, CState) of
+ MsgLocation2 = #msg_location { file = File } ->
+ %% Still the same file.
+ %% This is fine to fail (already exists)
+ ets:insert_new(?FILE_HANDLES_ETS_NAME,
+ {{self(), File}, open}),
+ CState1 = close_all_indicated(CState),
+ {Msg, CState2} =
+ read_from_disk(MsgLocation2, CState1),
+ ok = maybe_insert_into_cache(
+ RefCount, MsgId, Msg),
+ Release(), %% this MUST NOT fail with badarg
+ {{ok, Msg}, CState2};
+ MsgLocation2 -> %% different file!
+ Release(), %% this MUST NOT fail with badarg
+ client_read1(MsgLocation2, Defer, CState)
+ end
+ end
end
end.