diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-02-10 13:22:53 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-02-10 13:22:53 +0000 |
| commit | d1d700d34f31d4585823c5eb0d8b0cd7e5341089 (patch) | |
| tree | 827fd6e9278fd8953fcff0860e2cfbde9cf37c4a /src | |
| parent | 7a3aa2a1fd9edeb5089f8e103f9bf91b37488541 (diff) | |
| download | rabbitmq-server-git-d1d700d34f31d4585823c5eb0d8b0cd7e5341089.tar.gz | |
Fix all sorts of concurrency races in the concurrent readers versus GC
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 142 |
1 files changed, 94 insertions, 48 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 50114a96f3..c605a6a20c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -312,55 +312,101 @@ client_terminate(CState) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File } - = MsgLocation, Defer, CState) -> - [#file_summary { locked = Locked, right = Right }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case {Right, Locked} of - {undefined, false} -> - case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of - [] -> - Defer(); %% may have rolled over - [{MsgId, _FileOrUndefined, Msg}] -> - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), - {{ok, Msg}, CState} - end; - {_, true} -> - Defer(); - _ -> - ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, +1}), - Release = fun() -> - ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, -1}) - end, - %% If a GC hasn't already started, it won't start - %% now. Need to check again to see if we've been locked in - %% the meantime, between lookup and update_counter (thus - %% GC actually in progress). - [#file_summary { locked = Locked2 }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case Locked2 of - true -> - Release(), +client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, + file = File }, Defer, CState) -> + case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of + [] -> %% File has been GC'd and no longer exists. Go around again. + read(MsgId, CState); + [#file_summary { locked = Locked, right = Right }] -> + case {Right, Locked} of + {undefined, false} -> + case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + [] -> + Defer(); %% may have rolled over + [{MsgId, _FileOrUndefined, Msg}] -> + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + {{ok, Msg}, CState} + end; + {_, true} -> + %% of course, in the mean time, the GC could have + %% run and our msg is actually in a different + %% file, unlocked. However, defering is the safest + %% and simplest thing to do. Defer(); - false -> - %% Ok, we're definitely safe to continue - a GC - %% can't start up now, and isn't running, so - %% nothing will tell us from now on to close the - %% handle if it's already open. (Well, a GC could - %% start, and could put close entries into the ets - %% table, but the GC will wait until we're done - %% here before doing any real work.) - - %% This is fine to fail (already exists) - ets:insert_new(?FILE_HANDLES_ETS_NAME, - {{self(), File}, open}), - CState1 = close_all_indicated(CState), - {Msg, CState2} = read_from_disk(MsgLocation, CState1), - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), - Release(), - {{ok, Msg}, CState2} + _ -> + %% It's entirely possible that everything we're + %% doing from here on is for the wrong file, or a + %% non-existent file, as a GC may have finished. + try + ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, +1}) + catch error:badarg -> + %% the File has been GC'd and deleted. Go around. + read(MsgId, CState) + end, + Release = fun() -> ets:update_counter( + ?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}) + end, + %% If a GC hasn't already started, it won't start + %% now. Need to check again to see if we've been + %% locked in the meantime, between lookup and + %% update_counter (thus GC started before our +1). + [#file_summary { locked = Locked2 }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked2 of + true -> + %% If we get a badarg here, then the GC + %% has finished and deleted our file. Try + %% going around again. Otherwise, just + %% defer. + + %% badarg scenario: + %% we lookup, msg_store locks, gc starts, + %% gc ends, we +1 readers, msg_store + %% ets:deletes (and unlocks the dest) + try + Release(), + Defer() + catch error:badarg -> read(MsgId, CState) + end; + false -> + %% Ok, we're definitely safe to continue - + %% a GC can't start up now, and isn't + %% running, so nothing will tell us from + %% now on to close the handle if it's + %% already open. (Well, a GC could start, + %% and could put close entries into the + %% ets table, but the GC will wait until + %% we're done here before doing any real + %% work.) + + %% Finally, we need to recheck that the + %% msg is still at the same place - it's + %% possible an entire GC ran between us + %% doing the lookup and the +1 on the + %% readers. (Same as badarg scenario + %% above, but we don't have a missing file + %% - we just have the /wrong/ file). + + case index_lookup(MsgId, CState) of + MsgLocation2 = #msg_location { file = File } -> + %% Still the same file. + %% This is fine to fail (already exists) + ets:insert_new(?FILE_HANDLES_ETS_NAME, + {{self(), File}, open}), + CState1 = close_all_indicated(CState), + {Msg, CState2} = + read_from_disk(MsgLocation2, CState1), + ok = maybe_insert_into_cache( + RefCount, MsgId, Msg), + Release(), %% this MUST NOT fail with badarg + {{ok, Msg}, CState2}; + MsgLocation2 -> %% different file! + Release(), %% this MUST NOT fail with badarg + client_read1(MsgLocation2, Defer, CState) + end + end end end. |
