summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-18 11:40:08 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-18 11:40:08 +0000
commit79b01c4b17db14eaefac52f1f6c7a9cc89676cd8 (patch)
tree0c53825c30c1d28b97f5b53724bdef2b2f3eb1cf
parent638f2c94e1fc201806710056641685d3ca169ae7 (diff)
downloadrabbitmq-server-git-79b01c4b17db14eaefac52f1f6c7a9cc89676cd8.tar.gz
Of course, there's no reason not to add into the current file cache immediately, thus allowing a write followed by a read to have no delay at all
-rw-r--r--src/rabbit_msg_store.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 82110c7d1a..ef31efadcd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -238,7 +238,10 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+write(MsgId, Msg) ->
+ %% could fail if msg already in there
+ ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, undefined, Msg}),
+ gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId, CState) ->
Defer = fun() ->
@@ -288,7 +291,7 @@ client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File }
case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
[] ->
Defer(); %% may have rolled over
- [{MsgId, Msg}] ->
+ [{MsgId, _FileOrUndefined, Msg}] ->
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
{{ok, Msg}, CState}
end;
@@ -421,7 +424,8 @@ handle_cast({write, MsgId, Msg},
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
- true = ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg}),
+ true = ets:update_element(?CUR_FILE_CACHE_ETS_NAME, MsgId,
+ {2, CurFile}),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
@@ -630,7 +634,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State);
- [{MsgId, Msg1}] ->
+ [{MsgId, File, Msg1}] ->
{Msg1, State}
end,
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
@@ -1102,7 +1106,7 @@ maybe_roll_to_new_file(Offset,
locked = false, readers = 0 }),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
{#file_summary.right, NextFile}),
- true = ets:delete_all_objects(?CUR_FILE_CACHE_ETS_NAME),
+ true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', CurFile, '_'}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->