diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 85f5526ec7..27fcbbd01d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -270,8 +270,7 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [{timeout, infinity}]). write(MsgId, Msg) -> - %% could fail if msg already in there - ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, undefined, Msg}), + ok = add_to_cache(MsgId, Msg), gen_server2:cast(?SERVER, {write, MsgId, Msg}). read(MsgId, CState) -> @@ -288,7 +287,7 @@ read(MsgId, CState) -> not_found -> Defer(); MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - [{MsgId, _FileOrUndefined, Msg}] -> + [{MsgId, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the %% refcount, so can't insert into dedup cache {{ok, Msg}, CState} @@ -325,6 +324,18 @@ client_terminate(CState) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- +add_to_cache(MsgId, Msg) -> + case ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg, 1}) of + true -> + ok; + false -> + try + ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, +1}), + ok + catch error:badarg -> add_to_cache(MsgId, Msg) + end + end. + client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer, CState) -> case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of @@ -339,7 +350,7 @@ client_read2(false, undefined, #msg_location { case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of [] -> Defer(); %% may have rolled over - [{MsgId, _FileOrUndefined, Msg}] -> + [{MsgId, Msg, _CacheRefCount}] -> ok = maybe_insert_into_cache(RefCount, MsgId, Msg), {{ok, Msg}, CState} end; @@ -502,11 +513,10 @@ handle_cast({write, MsgId, Msg}, current_file = CurFile, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> + true = 0 =< ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, -1}), case index_lookup(MsgId, State) of not_found -> %% New message, lots to do - true = ets:update_element(?CUR_FILE_CACHE_ETS_NAME, MsgId, - {2, CurFile}), {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { @@ -543,10 +553,6 @@ handle_cast({write, MsgId, Msg}, ok = index_update_fields(MsgId, {#msg_location.ref_count, RefCount + 1}, State), - true = case File == CurFile of - true -> true; - false -> ets:delete(?CUR_FILE_CACHE_ETS_NAME, MsgId) - end, noreply(State) end; @@ -721,7 +727,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, false -> ok end, read_from_disk(MsgLoc, State); - [{MsgId, File, Msg1}] -> + [{MsgId, Msg1, _CacheRefCount}] -> {Msg1, State} end, ok = maybe_insert_into_cache(RefCount, MsgId, Msg), @@ -1193,7 +1199,7 @@ maybe_roll_to_new_file(Offset, locked = false, readers = 0 }), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), - true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', CurFile, '_'}), + true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', '_', 0}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; maybe_roll_to_new_file(_, State) -> |
