diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-02-10 16:54:54 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-02-10 16:54:54 +0000 |
| commit | 7a1b0abac9733eaaad3fe8db7591f4c956655ba2 (patch) | |
| tree | 6c5421bf6bb0d3b77fa85550c7ff469822dd683f /src | |
| parent | eb1e4916c538481d6fdf8e1180f48aeb17598960 (diff) | |
| download | rabbitmq-server-git-7a1b0abac9733eaaad3fe8db7591f4c956655ba2.tar.gz | |
Refactoring of client concurrent read
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 170 |
1 files changed, 79 insertions, 91 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2bd3ac3d4b..b3d784b280 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -312,101 +312,89 @@ client_terminate(CState) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, - file = File }, Defer, CState) -> +client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer, + CState) -> case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of [] -> %% File has been GC'd and no longer exists. Go around again. read(MsgId, CState); [#file_summary { locked = Locked, right = Right }] -> - case {Right, Locked} of - {undefined, false} -> - case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of - [] -> - Defer(); %% may have rolled over - [{MsgId, _FileOrUndefined, Msg}] -> - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), - {{ok, Msg}, CState} - end; - {_, true} -> - %% of course, in the mean time, the GC could have - %% run and our msg is actually in a different - %% file, unlocked. However, defering is the safest - %% and simplest thing to do. - Defer(); - _ -> - %% It's entirely possible that everything we're - %% doing from here on is for the wrong file, or a - %% non-existent file, as a GC may have finished. - try - ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, +1}) - catch error:badarg -> - %% the File has been GC'd and deleted. Go around. - read(MsgId, CState) - end, - Release = fun() -> ets:update_counter( - ?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, -1}) - end, - %% If a GC hasn't already started, it won't start - %% now. Need to check again to see if we've been - %% locked in the meantime, between lookup and - %% update_counter (thus GC started before our +1). - [#file_summary { locked = Locked2 }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case Locked2 of - true -> - %% If we get a badarg here, then the GC - %% has finished and deleted our file. Try - %% going around again. Otherwise, just - %% defer. - - %% badarg scenario: - %% we lookup, msg_store locks, gc starts, - %% gc ends, we +1 readers, msg_store - %% ets:deletes (and unlocks the dest) - try - Release(), - Defer() - catch error:badarg -> read(MsgId, CState) - end; - false -> - %% Ok, we're definitely safe to continue - - %% a GC can't start up now, and isn't - %% running, so nothing will tell us from - %% now on to close the handle if it's - %% already open. (Well, a GC could start, - %% and could put close entries into the - %% ets table, but the GC will wait until - %% we're done here before doing any real - %% work.) - - %% Finally, we need to recheck that the - %% msg is still at the same place - it's - %% possible an entire GC ran between us - %% doing the lookup and the +1 on the - %% readers. (Same as badarg scenario - %% above, but we don't have a missing file - %% - we just have the /wrong/ file). - - case index_lookup(MsgId, CState) of - MsgLocation2 = #msg_location { file = File } -> - %% Still the same file. - %% This is fine to fail (already exists) - ets:insert_new(?FILE_HANDLES_ETS_NAME, - {{self(), File}, open}), - CState1 = close_all_indicated(CState), - {Msg, CState2} = - read_from_disk(MsgLocation2, CState1), - ok = maybe_insert_into_cache( - RefCount, MsgId, Msg), - Release(), %% this MUST NOT fail with badarg - {{ok, Msg}, CState2}; - MsgLocation2 -> %% different file! - Release(), %% this MUST NOT fail with badarg - client_read1(MsgLocation2, Defer, CState) - end - end + client_read2(MsgLocation, Locked, Right, Defer, CState) + end. + +client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount }, + false, undefined, Defer, CState) -> + case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + [] -> + Defer(); %% may have rolled over + [{MsgId, _FileOrUndefined, Msg}] -> + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + {{ok, Msg}, CState} + end; +client_read2(_MsgLocation, true, _Right, Defer, _CState) -> + %% Of course, in the mean time, the GC could have run and our msg + %% is actually in a different file, unlocked. However, defering is + %% the safest and simplest thing to do. + Defer(); +client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount, + file = File }, false, _Right, Defer, CState) -> + %% It's entirely possible that everything we're doing from here on + %% is for the wrong file, or a non-existent file, as a GC may have + %% finished. + try ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, +1}) + catch error:badarg -> %% the File has been GC'd and deleted. Go around. + read(MsgId, CState) + end, + Release = fun() -> ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}) + end, + %% If a GC hasn't already started, it won't start now. Need to + %% check again to see if we've been locked in the meantime, + %% between lookup and update_counter (thus GC started before our + %% +1). + [#file_summary { locked = Locked }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked of + true -> + %% If we get a badarg here, then the GC has finished and + %% deleted our file. Try going around again. Otherwise, + %% just defer. + + %% badarg scenario: + %% we lookup, msg_store locks, gc starts, gc ends, we +1 + %% readers, msg_store ets:deletes (and unlocks the dest) + try Release(), + Defer() + catch error:badarg -> read(MsgId, CState) + end; + false -> + %% Ok, we're definitely safe to continue - a GC can't + %% start up now, and isn't running, so nothing will tell + %% us from now on to close the handle if it's already + %% open. (Well, a GC could start, and could put close + %% entries into the ets table, but the GC will wait until + %% we're done here before doing any real work.) + + %% Finally, we need to recheck that the msg is still at + %% the same place - it's possible an entire GC ran between + %% us doing the lookup and the +1 on the readers. (Same as + %% badarg scenario above, but we don't have a missing file + %% - we just have the /wrong/ file). + + case index_lookup(MsgId, CState) of + MsgLocation = #msg_location { file = File } -> + %% Still the same file. + %% This is fine to fail (already exists) + ets:insert_new( + ?FILE_HANDLES_ETS_NAME, {{self(), File}, open}), + CState1 = close_all_indicated(CState), + {Msg, CState2} = read_from_disk(MsgLocation, CState1), + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + Release(), %% this MUST NOT fail with badarg + {{ok, Msg}, CState2}; + MsgLocation -> %% different file! + Release(), %% this MUST NOT fail with badarg + client_read1(MsgLocation, Defer, CState) end end. |
