summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-10 16:54:54 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-10 16:54:54 +0000
commit7a1b0abac9733eaaad3fe8db7591f4c956655ba2 (patch)
tree6c5421bf6bb0d3b77fa85550c7ff469822dd683f /src
parenteb1e4916c538481d6fdf8e1180f48aeb17598960 (diff)
downloadrabbitmq-server-git-7a1b0abac9733eaaad3fe8db7591f4c956655ba2.tar.gz
Refactoring of client concurrent read
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl170
1 files changed, 79 insertions, 91 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2bd3ac3d4b..b3d784b280 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -312,101 +312,89 @@ client_terminate(CState) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File }, Defer, CState) ->
+client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer,
+ CState) ->
case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
read(MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
- case {Right, Locked} of
- {undefined, false} ->
- case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
- [] ->
- Defer(); %% may have rolled over
- [{MsgId, _FileOrUndefined, Msg}] ->
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
- {{ok, Msg}, CState}
- end;
- {_, true} ->
- %% of course, in the mean time, the GC could have
- %% run and our msg is actually in a different
- %% file, unlocked. However, defering is the safest
- %% and simplest thing to do.
- Defer();
- _ ->
- %% It's entirely possible that everything we're
- %% doing from here on is for the wrong file, or a
- %% non-existent file, as a GC may have finished.
- try
- ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, +1})
- catch error:badarg ->
- %% the File has been GC'd and deleted. Go around.
- read(MsgId, CState)
- end,
- Release = fun() -> ets:update_counter(
- ?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, -1})
- end,
- %% If a GC hasn't already started, it won't start
- %% now. Need to check again to see if we've been
- %% locked in the meantime, between lookup and
- %% update_counter (thus GC started before our +1).
- [#file_summary { locked = Locked2 }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case Locked2 of
- true ->
- %% If we get a badarg here, then the GC
- %% has finished and deleted our file. Try
- %% going around again. Otherwise, just
- %% defer.
-
- %% badarg scenario:
- %% we lookup, msg_store locks, gc starts,
- %% gc ends, we +1 readers, msg_store
- %% ets:deletes (and unlocks the dest)
- try
- Release(),
- Defer()
- catch error:badarg -> read(MsgId, CState)
- end;
- false ->
- %% Ok, we're definitely safe to continue -
- %% a GC can't start up now, and isn't
- %% running, so nothing will tell us from
- %% now on to close the handle if it's
- %% already open. (Well, a GC could start,
- %% and could put close entries into the
- %% ets table, but the GC will wait until
- %% we're done here before doing any real
- %% work.)
-
- %% Finally, we need to recheck that the
- %% msg is still at the same place - it's
- %% possible an entire GC ran between us
- %% doing the lookup and the +1 on the
- %% readers. (Same as badarg scenario
- %% above, but we don't have a missing file
- %% - we just have the /wrong/ file).
-
- case index_lookup(MsgId, CState) of
- MsgLocation2 = #msg_location { file = File } ->
- %% Still the same file.
- %% This is fine to fail (already exists)
- ets:insert_new(?FILE_HANDLES_ETS_NAME,
- {{self(), File}, open}),
- CState1 = close_all_indicated(CState),
- {Msg, CState2} =
- read_from_disk(MsgLocation2, CState1),
- ok = maybe_insert_into_cache(
- RefCount, MsgId, Msg),
- Release(), %% this MUST NOT fail with badarg
- {{ok, Msg}, CState2};
- MsgLocation2 -> %% different file!
- Release(), %% this MUST NOT fail with badarg
- client_read1(MsgLocation2, Defer, CState)
- end
- end
+ client_read2(MsgLocation, Locked, Right, Defer, CState)
+ end.
+
+client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount },
+ false, undefined, Defer, CState) ->
+ case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ [] ->
+ Defer(); %% may have rolled over
+ [{MsgId, _FileOrUndefined, Msg}] ->
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ {{ok, Msg}, CState}
+ end;
+client_read2(_MsgLocation, true, _Right, Defer, _CState) ->
+ %% Of course, in the mean time, the GC could have run and our msg
+ %% is actually in a different file, unlocked. However, defering is
+ %% the safest and simplest thing to do.
+ Defer();
+client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount,
+ file = File }, false, _Right, Defer, CState) ->
+ %% It's entirely possible that everything we're doing from here on
+ %% is for the wrong file, or a non-existent file, as a GC may have
+ %% finished.
+ try ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, +1})
+ catch error:badarg -> %% the File has been GC'd and deleted. Go around.
+ read(MsgId, CState)
+ end,
+ Release = fun() -> ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, -1})
+ end,
+ %% If a GC hasn't already started, it won't start now. Need to
+ %% check again to see if we've been locked in the meantime,
+ %% between lookup and update_counter (thus GC started before our
+ %% +1).
+ [#file_summary { locked = Locked }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Locked of
+ true ->
+ %% If we get a badarg here, then the GC has finished and
+ %% deleted our file. Try going around again. Otherwise,
+ %% just defer.
+
+ %% badarg scenario:
+ %% we lookup, msg_store locks, gc starts, gc ends, we +1
+ %% readers, msg_store ets:deletes (and unlocks the dest)
+ try Release(),
+ Defer()
+ catch error:badarg -> read(MsgId, CState)
+ end;
+ false ->
+ %% Ok, we're definitely safe to continue - a GC can't
+ %% start up now, and isn't running, so nothing will tell
+ %% us from now on to close the handle if it's already
+ %% open. (Well, a GC could start, and could put close
+ %% entries into the ets table, but the GC will wait until
+ %% we're done here before doing any real work.)
+
+ %% Finally, we need to recheck that the msg is still at
+ %% the same place - it's possible an entire GC ran between
+ %% us doing the lookup and the +1 on the readers. (Same as
+ %% badarg scenario above, but we don't have a missing file
+ %% - we just have the /wrong/ file).
+
+ case index_lookup(MsgId, CState) of
+ MsgLocation = #msg_location { file = File } ->
+ %% Still the same file.
+ %% This is fine to fail (already exists)
+ ets:insert_new(
+ ?FILE_HANDLES_ETS_NAME, {{self(), File}, open}),
+ CState1 = close_all_indicated(CState),
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ Release(), %% this MUST NOT fail with badarg
+ {{ok, Msg}, CState2};
+ MsgLocation -> %% different file!
+ Release(), %% this MUST NOT fail with badarg
+ client_read1(MsgLocation, Defer, CState)
end
end.