summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-10 17:41:53 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-10 17:41:53 +0000
commit7b42cc024a23c2570a3548ac00006dafa0a74df6 (patch)
treea6fc0177c103bdb725c6123c671085d9a63bf5b4 /src
parent7a1b0abac9733eaaad3fe8db7591f4c956655ba2 (diff)
downloadrabbitmq-server-git-7b42cc024a23c2570a3548ac00006dafa0a74df6.tar.gz
Agressively check caches before risking deferring to the msg_store. This often is very beneficial because in practise, the last messages to be pushed to disk under memory pressure are the messages at the head of the queue, not the tail. Thus in the event of the msg_store being overloaded, due to pushing to disk due to memory pressure, checking all the caches first is very effective at keeping the queues moving. The downside is that when we read from the cur_file cache, we don't get the ref count (at this point, the write is maybe only in the mailbox of the msg_store, so it may not even have an index entry, thus we have no chance of knowing an accurate refcount). The result is that the message may end up existing having been read from the cur file cache, then pushed to disk, forgotten, and read in again, only to be added to the dedup cache. However, this is as bad as it can get - there could at most be two binary copies of a message in memory. This actually holds, even though there's a slightly amazing interaction on ack: on ack, the copy read from the cur_file cache will decrement the dedup cache, possibly evicting the entry. At this point there would be no copies in the dedup cache, maybe one copy in the cur_file cache, and maybe one copy in RAM, which is the copy read from the dedup cache. The copy read from the cur_file cache has been ack'd, so has gone. The next read, if there is no copy in the cur_file cache may add to the dedup cache, which will then be emptied on the ack from the first dedup read. I.e. you can achieve a chain in which no more than 2 copies are held in RAM at any one time, but each read really goes to disk. It is left as an exercise to the reader to come up with a test case that exhibits this behaviour.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl47
1 files changed, 29 insertions, 18 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b3d784b280..2a4eadc91f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -274,17 +274,27 @@ write(MsgId, Msg) ->
gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId, CState) ->
- Defer = fun() ->
- {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState}
- end,
- case index_lookup(MsgId, CState) of
+ %% 1. Check the dedup cache
+ case fetch_and_increment_cache(MsgId) of
not_found ->
- Defer();
- MsgLocation ->
- case fetch_and_increment_cache(MsgId) of
- not_found -> client_read1(MsgLocation, Defer, CState);
- Msg -> {{ok, Msg}, CState}
- end
+ %% 2. Check the cur file cache
+ case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ [] ->
+ Defer =
+ fun() -> {gen_server2:call(
+ ?SERVER, {read, MsgId}, infinity), CState}
+ end,
+ case index_lookup(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
+ end;
+ [{MsgId, _FileOrUndefined, Msg}] ->
+ %% Although we've found it, we don't know the
+ %% refcount, so can't insert into dedup cache
+ {{ok, Msg}, CState}
+ end;
+ Msg ->
+ {{ok, Msg}, CState}
end.
contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
@@ -318,11 +328,11 @@ client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer,
[] -> %% File has been GC'd and no longer exists. Go around again.
read(MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
- client_read2(MsgLocation, Locked, Right, Defer, CState)
+ client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount },
- false, undefined, Defer, CState) ->
+client_read2(false, undefined, #msg_location {
+ msg_id = MsgId, ref_count = RefCount }, Defer, CState) ->
case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
[] ->
Defer(); %% may have rolled over
@@ -330,13 +340,14 @@ client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount },
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
{{ok, Msg}, CState}
end;
-client_read2(_MsgLocation, true, _Right, Defer, _CState) ->
+client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
-client_read2(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File }, false, _Right, Defer, CState) ->
+client_read2(false, _Right, #msg_location {
+ msg_id = MsgId, ref_count = RefCount, file = File },
+ Defer, CState) ->
%% It's entirely possible that everything we're doing from here on
%% is for the wrong file, or a non-existent file, as a GC may have
%% finished.
@@ -892,12 +903,12 @@ fetch_and_increment_cache(MsgId) ->
decrement_cache(MsgId) ->
true = try case ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, -1}) of
N when N =< 0 -> true = ets:delete(?CACHE_ETS_NAME, MsgId);
- _N -> true
+ _N -> true
end
catch error:badarg ->
%% MsgId is not in there because although it's been
%% delivered, it's never actually been read (think:
- %% persistent message in mixed queue)
+ %% persistent message held in RAM)
true
end,
ok.