diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-09 19:03:56 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-09 19:03:56 +0000 |
| commit | 872a13632237b4c9440320a819afd413e9231685 (patch) | |
| tree | 1c8208e6deedecff368dba0e6e220b0a7fca4f30 | |
| parent | e2ef3eda466a5de318ffc87f8c40b42b8b5aae74 (diff) | |
| download | rabbitmq-server-git-872a13632237b4c9440320a819afd413e9231685.tar.gz | |
some minor refactoring (more to do), and spotted and fixed a race condition
| -rw-r--r-- | src/rabbit_msg_store.erl | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 59b283cf3c..29e4972e33 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -241,9 +241,12 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). read(MsgId, CState) -> + Defer = fun() -> + {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState} + end, case index_lookup(MsgId, CState) of not_found -> - {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState}; + Defer(); #msg_location { ref_count = RefCount, file = File, offset = Offset, @@ -254,27 +257,42 @@ read(MsgId, CState) -> ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case Right =:= undefined orelse Locked =:= true of true -> - {gen_server2:call(?SERVER, {read, MsgId}, infinity), - CState}; + Defer(); false -> ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, {#file_summary.readers, 1}), - %% need to check again to see if we've - %% been locked in the meantime + Release = fun() -> + ets:update_counter( + ?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}) + end, + %% If a GC hasn't already started, it + %% won't start now. Need to check again to + %% see if we've been locked in the + %% meantime, between lookup and + %% update_counter (thus GC actually in + %% progress). [#file_summary { locked = Locked2 }] = ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case Locked2 of true -> - {gen_server2:call(?SERVER, {read, MsgId}, - infinity), CState}; + Release(), + Defer(); false -> - %% ok, we're definitely safe to + %% Ok, we're definitely safe to %% continue - a GC can't start up %% now, and isn't running, so %% nothing will tell us from now %% on to close the handle if it's - %% already open. - %% this is fine to fail (already exists) + %% already open. (Well, a GC could + %% start, and could put close + %% entries into the ets table, but + %% the GC will wait until we're + %% done here before doing any real + %% work.) + + %% This is fine to fail (already + %% exists) ets:insert_new(?FILE_HANDLES_ETS_NAME, {{self(), File}, open}), CState1 = close_all_indicated(CState), @@ -295,14 +313,12 @@ read(MsgId, CState) -> {proc_dict, get()} ]}}) end, - ets:update_counter( - ?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, -1}), + Release(), ok = case RefCount > 1 of true -> insert_into_cache(MsgId, Msg); false -> - %% it's not in the + %% It's not in the %% cache and we only %% have one reference %% to the message. So @@ -497,6 +513,14 @@ handle_cast(sync, State) -> handle_cast({gc_done, Reclaimed, Source, Dest}, State = #msstate { sum_file_size = SumFileSize, gc_active = {Source, Dest} }) -> + %% GC done, so now ensure that any clients that have open fhs to + %% those files close them before using them again. This has to be + %% done here, and not when starting up the GC, because if done + %% when starting up the GC, the client could find the close, and + %% close and reopen the fh, whilst the GC is waiting for readers + %% to disappear, before it's actually done the GC. + true = mark_handle_to_close(Source), + true = mark_handle_to_close(Dest), %% we always move data left, so Source has gone and was on the %% right, so need to make dest = source.right.left, and also %% dest.right = source.right @@ -1100,12 +1124,6 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, {#file_summary.locked, true}), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, {#file_summary.locked, true}), - %% now that they're locked, we know no queue will touch - %% them (not even add to the ets table for these files), - %% so now ensure that we ask the queues to close handles - %% to these files - true = mark_handle_to_close(Source), - true = mark_handle_to_close(Dest), ok = rabbit_msg_store_gc:gc(Source, Dest), State1 #msstate { gc_active = {Source, Dest} } end; |
