diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-18 11:40:08 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-18 11:40:08 +0000 |
| commit | 79b01c4b17db14eaefac52f1f6c7a9cc89676cd8 (patch) | |
| tree | 0c53825c30c1d28b97f5b53724bdef2b2f3eb1cf | |
| parent | 638f2c94e1fc201806710056641685d3ca169ae7 (diff) | |
| download | rabbitmq-server-git-79b01c4b17db14eaefac52f1f6c7a9cc89676cd8.tar.gz | |
Of course, there's no reason not to add into the current file cache immediately, thus allowing a write followed by a read to have no delay at all
| -rw-r--r-- | src/rabbit_msg_store.erl | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 82110c7d1a..ef31efadcd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -238,7 +238,10 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). +write(MsgId, Msg) -> + %% could fail if msg already in there + ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, undefined, Msg}), + gen_server2:cast(?SERVER, {write, MsgId, Msg}). read(MsgId, CState) -> Defer = fun() -> @@ -288,7 +291,7 @@ client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File } case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of [] -> Defer(); %% may have rolled over - [{MsgId, Msg}] -> + [{MsgId, _FileOrUndefined, Msg}] -> ok = maybe_insert_into_cache(RefCount, MsgId, Msg), {{ok, Msg}, CState} end; @@ -421,7 +424,8 @@ handle_cast({write, MsgId, Msg}, case index_lookup(MsgId, State) of not_found -> %% New message, lots to do - true = ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg}), + true = ets:update_element(?CUR_FILE_CACHE_ETS_NAME, MsgId, + {2, CurFile}), {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { @@ -630,7 +634,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, false -> ok end, read_from_disk(MsgLoc, State); - [{MsgId, Msg1}] -> + [{MsgId, File, Msg1}] -> {Msg1, State} end, ok = maybe_insert_into_cache(RefCount, MsgId, Msg), @@ -1102,7 +1106,7 @@ maybe_roll_to_new_file(Offset, locked = false, readers = 0 }), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), - true = ets:delete_all_objects(?CUR_FILE_CACHE_ETS_NAME), + true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', CurFile, '_'}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; maybe_roll_to_new_file(_, State) -> |
