summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 82110c7d1a..ef31efadcd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -238,7 +238,10 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+write(MsgId, Msg) ->
+ %% could fail if msg already in there
+ ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, undefined, Msg}),
+ gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId, CState) ->
Defer = fun() ->
@@ -288,7 +291,7 @@ client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File }
case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
[] ->
Defer(); %% may have rolled over
- [{MsgId, Msg}] ->
+ [{MsgId, _FileOrUndefined, Msg}] ->
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
{{ok, Msg}, CState}
end;
@@ -421,7 +424,8 @@ handle_cast({write, MsgId, Msg},
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
- true = ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg}),
+ true = ets:update_element(?CUR_FILE_CACHE_ETS_NAME, MsgId,
+ {2, CurFile}),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
@@ -630,7 +634,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State);
- [{MsgId, Msg1}] ->
+ [{MsgId, File, Msg1}] ->
{Msg1, State}
end,
ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
@@ -1102,7 +1106,7 @@ maybe_roll_to_new_file(Offset,
locked = false, readers = 0 }),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
{#file_summary.right, NextFile}),
- true = ets:delete_all_objects(?CUR_FILE_CACHE_ETS_NAME),
+ true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', CurFile, '_'}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->