summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-05 16:21:38 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-05 16:21:38 +0000
commitc66b64de5431d49755ce5c452d7014813164169b (patch)
treedc3b5d7ac6ed50667ca114dea96a9e3935460b3f
parent95ed9fa5e8de814ae6ba491ecad314f1e0a3f7f5 (diff)
downloadrabbitmq-server-git-c66b64de5431d49755ce5c452d7014813164169b.tar.gz
Change to cur file cache. Rather than bother storing the file in there, just give a ref count which in inc'd when the client puts it in, and dec'd when the process actually writes it. On roll, delete everything with a ref count of 0.
-rw-r--r--src/rabbit_msg_store.erl30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 85f5526ec7..27fcbbd01d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -270,8 +270,7 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[{timeout, infinity}]).
write(MsgId, Msg) ->
- %% could fail if msg already in there
- ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, undefined, Msg}),
+ ok = add_to_cache(MsgId, Msg),
gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId, CState) ->
@@ -288,7 +287,7 @@ read(MsgId, CState) ->
not_found -> Defer();
MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- [{MsgId, _FileOrUndefined, Msg}] ->
+ [{MsgId, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
%% refcount, so can't insert into dedup cache
{{ok, Msg}, CState}
@@ -325,6 +324,18 @@ client_terminate(CState) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
+add_to_cache(MsgId, Msg) ->
+ case ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg, 1}) of
+ true ->
+ ok;
+ false ->
+ try
+ ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, +1}),
+ ok
+ catch error:badarg -> add_to_cache(MsgId, Msg)
+ end
+ end.
+
client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer,
CState) ->
case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of
@@ -339,7 +350,7 @@ client_read2(false, undefined, #msg_location {
case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
[] ->
Defer(); %% may have rolled over
- [{MsgId, _FileOrUndefined, Msg}] ->
+ [{MsgId, Msg, _CacheRefCount}] ->
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
{{ok, Msg}, CState}
end;
@@ -502,11 +513,10 @@ handle_cast({write, MsgId, Msg},
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
+ true = 0 =< ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, -1}),
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
- true = ets:update_element(?CUR_FILE_CACHE_ETS_NAME, MsgId,
- {2, CurFile}),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
@@ -543,10 +553,6 @@ handle_cast({write, MsgId, Msg},
ok = index_update_fields(MsgId,
{#msg_location.ref_count, RefCount + 1},
State),
- true = case File == CurFile of
- true -> true;
- false -> ets:delete(?CUR_FILE_CACHE_ETS_NAME, MsgId)
- end,
noreply(State)
end;
@@ -721,7 +727,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State);
- [{MsgId, File, Msg1}] ->
+ [{MsgId, Msg1, _CacheRefCount}] ->
{Msg1, State}
end,
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
@@ -1193,7 +1199,7 @@ maybe_roll_to_new_file(Offset,
locked = false, readers = 0 }),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
{#file_summary.right, NextFile}),
- true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', CurFile, '_'}),
+ true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->