diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-05 14:41:03 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-05 14:41:03 +0100 |
| commit | 26dc9bbc43a3c50fdef18a505fe35a348ffeb55b (patch) | |
| tree | 4d000cb3211ed0ed59624f7a5c81a254a9e759bf | |
| parent | 9d71a0d9038c86dda92689f90515df9caea168c3 (diff) | |
| download | rabbitmq-server-git-26dc9bbc43a3c50fdef18a505fe35a348ffeb55b.tar.gz | |
Various fixes and improvements to client reading
| -rw-r--r-- | src/rabbit_msg_store.erl | 82 |
1 files changed, 45 insertions, 37 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5868204639..6e4c1b5634 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -408,63 +408,71 @@ client_read1(Server, client_read2(Server, Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, - #msg_location { guid = Guid, ref_count = RefCount }, - Defer, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - dedup_cache_ets = DedupCacheEts }) -> - case ets:lookup(CurFileCacheEts, Guid) of - [] -> - Defer(); %% may have rolled over - [{Guid, Msg, _CacheRefCount}] -> - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), - {{ok, Msg}, CState} - end; +client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> + %% Although we've already checked both caches and not found the + %% message there, the message is apparently in the + %% current_file. We can only arrive here if we are trying to read + %% a message which we have not written, which is very odd, so just + %% defer. + + %% OR, on startup, the cur_file_cache is not populated with the + %% contents of the current file, thus reads from the current file + %% will end up here and will need to be deferred. + Defer(); client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); client_read2(Server, false, _Right, - #msg_location { guid = Guid, ref_count = RefCount, file = File }, + MsgLocation = #msg_location { guid = Guid, file = File }, Defer, - CState = #client_msstate { file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> + CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> %% It's entirely possible that everything we're doing from here on %% is for the wrong file, or a non-existent file, as a GC may have %% finished. - safe_ets_update_counter_ok( + safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun () -> read(Server, Guid, CState) end), + fun (_) -> client_read3(Server, Guid, MsgLocation, Defer, CState) end, + fun () -> read(Server, Guid, CState) end). + +client_read3(Server, Guid, + #msg_location { guid = Guid, ref_count = RefCount, file = File }, + Defer, + CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> Release = fun() -> ets:update_counter(FileSummaryEts, File, {#file_summary.readers, -1}) end, + ReleaseDefer = + fun () -> + %% If we get a badarg here, then the GC has finished + %% and deleted our file. Try going around + %% again. Otherwise, just defer. + + %% badarg scenario: we lookup, msg_store locks, gc + %% starts, gc ends, we +1 readers, msg_store + %% ets:deletes (and unlocks the dest) + try Release(), + Defer() + catch error:badarg -> read(Server, Guid, CState) + end + end, %% If a GC hasn't already started, it won't start now. Need to %% check again to see if we've been locked in the meantime, %% between lookup and update_counter (thus GC started before our - %% +1). - [#file_summary { locked = Locked }] = ets:lookup(FileSummaryEts, File), - case Locked of - true -> - %% If we get a badarg here, then the GC has finished and - %% deleted our file. Try going around again. Otherwise, - %% just defer. - - %% badarg scenario: - %% we lookup, msg_store locks, gc starts, gc ends, we +1 - %% readers, msg_store ets:deletes (and unlocks the dest) - try Release(), - Defer() - catch error:badarg -> read(Server, Guid, CState) - end; - false -> + %% +1. In fact, it could have finished by now too). + case ets:lookup(FileSummaryEts, File) of + [] -> %% GC has deleted our file + ReleaseDefer(); + [{#file_summary { locked = true }}] -> + ReleaseDefer(); + _ -> %% Ok, we're definitely safe to continue - a GC can't %% start up now, and isn't running, so nothing will tell %% us from now on to close the handle if it's already - %% open. (Well, a GC could start, and could put close - %% entries into the ets table, but the GC will wait until - %% we're done here before doing any real work.) + %% open. %% Finally, we need to recheck that the msg is still at %% the same place - it's possible an entire GC ran between |
