summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-05 14:41:03 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-05 14:41:03 +0100
commit26dc9bbc43a3c50fdef18a505fe35a348ffeb55b (patch)
tree4d000cb3211ed0ed59624f7a5c81a254a9e759bf
parent9d71a0d9038c86dda92689f90515df9caea168c3 (diff)
downloadrabbitmq-server-git-26dc9bbc43a3c50fdef18a505fe35a348ffeb55b.tar.gz
Various fixes and improvements to client reading
-rw-r--r--src/rabbit_msg_store.erl82
1 files changed, 45 insertions, 37 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 5868204639..6e4c1b5634 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -408,63 +408,71 @@ client_read1(Server,
client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(_Server, false, undefined,
- #msg_location { guid = Guid, ref_count = RefCount },
- Defer,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- dedup_cache_ets = DedupCacheEts }) ->
- case ets:lookup(CurFileCacheEts, Guid) of
- [] ->
- Defer(); %% may have rolled over
- [{Guid, Msg, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
- {{ok, Msg}, CState}
- end;
+client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
+ %% Although we've already checked both caches and not found the
+ %% message there, the message is apparently in the
+ %% current_file. We can only arrive here if we are trying to read
+ %% a message which we have not written, which is very odd, so just
+ %% defer.
+
+ %% OR, on startup, the cur_file_cache is not populated with the
+ %% contents of the current file, thus reads from the current file
+ %% will end up here and will need to be deferred.
+ Defer();
client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
client_read2(Server, false, _Right,
- #msg_location { guid = Guid, ref_count = RefCount, file = File },
+ MsgLocation = #msg_location { guid = Guid, file = File },
Defer,
- CState = #client_msstate { file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
%% It's entirely possible that everything we're doing from here on
%% is for the wrong file, or a non-existent file, as a GC may have
%% finished.
- safe_ets_update_counter_ok(
+ safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
- fun () -> read(Server, Guid, CState) end),
+ fun (_) -> client_read3(Server, Guid, MsgLocation, Defer, CState) end,
+ fun () -> read(Server, Guid, CState) end).
+
+client_read3(Server, Guid,
+ #msg_location { guid = Guid, ref_count = RefCount, file = File },
+ Defer,
+ CState = #client_msstate { file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
Release = fun() -> ets:update_counter(FileSummaryEts, File,
{#file_summary.readers, -1})
end,
+ ReleaseDefer =
+ fun () ->
+ %% If we get a badarg here, then the GC has finished
+ %% and deleted our file. Try going around
+ %% again. Otherwise, just defer.
+
+ %% badarg scenario: we lookup, msg_store locks, gc
+ %% starts, gc ends, we +1 readers, msg_store
+ %% ets:deletes (and unlocks the dest)
+ try Release(),
+ Defer()
+ catch error:badarg -> read(Server, Guid, CState)
+ end
+ end,
%% If a GC hasn't already started, it won't start now. Need to
%% check again to see if we've been locked in the meantime,
%% between lookup and update_counter (thus GC started before our
- %% +1).
- [#file_summary { locked = Locked }] = ets:lookup(FileSummaryEts, File),
- case Locked of
- true ->
- %% If we get a badarg here, then the GC has finished and
- %% deleted our file. Try going around again. Otherwise,
- %% just defer.
-
- %% badarg scenario:
- %% we lookup, msg_store locks, gc starts, gc ends, we +1
- %% readers, msg_store ets:deletes (and unlocks the dest)
- try Release(),
- Defer()
- catch error:badarg -> read(Server, Guid, CState)
- end;
- false ->
+ %% +1. In fact, it could have finished by now too).
+ case ets:lookup(FileSummaryEts, File) of
+ [] -> %% GC has deleted our file
+ ReleaseDefer();
+ [{#file_summary { locked = true }}] ->
+ ReleaseDefer();
+ _ ->
%% Ok, we're definitely safe to continue - a GC can't
%% start up now, and isn't running, so nothing will tell
%% us from now on to close the handle if it's already
- %% open. (Well, a GC could start, and could put close
- %% entries into the ets table, but the GC will wait until
- %% we're done here before doing any real work.)
+ %% open.
%% Finally, we need to recheck that the msg is still at
%% the same place - it's possible an entire GC ran between