diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 128 |
1 files changed, 62 insertions, 66 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2ea6d616c4..fc49b0e77e 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -756,30 +756,7 @@ handle_cast({write, CRef, MsgId}, case update_flying(-1, MsgId, CRef, State) of process -> [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action( - should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, - State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, - {MsgId, Msg, 0}), - State1; - {confirm, CurFile, - State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, - {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + noreply(write_message(MsgId, Msg, CRef, State)); ignore -> %% A 'remove' has already been issued and eliminated the %% 'write'. If all writes get eliminated, @@ -789,8 +766,8 @@ handle_cast({write, CRef, MsgId}, %% the message can continue to be done client side, from %% either the cache or the non-current files. If the %% message *is* in the current file then the cache entry - %% will be removed by the normal logic for that, - %% e.g. above and during file rolling. + %% will be removed by the normal logic for that in + %% write_message/4 and maybe_roll_to_new_file/2. case index_lookup(MsgId, State) of [#msg_location { file = File }] when File == State #msstate.current_file -> @@ -973,8 +950,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, MsgId, Msg, State) -> - write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). +write_message(MsgId, Msg, CRef, + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(MsgId, Msg, + record_pending_confirm(CRef, MsgId, State1)); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end. + +remove_message(MsgId, CRef, + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg whilst + %% it's being GC'd. + %% + %% ASSERTION: [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from cur_file_cache_ets here because + %% there may be further writes in the mailbox for the + %% same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end. write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, @@ -1072,44 +1106,6 @@ contains_message(MsgId, From, end end. -remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg whilst - %% it's being GC'd. - %% - %% ASSERTION: [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} - when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> index_update_ref_count( - MsgId, RefCount - 1, State) end, - case RefCount of - %% don't remove from cur_file_cache_ets here because - %% there may be further writes in the mailbox for the - %% same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size( - File, -TotalSize, State)) - end; - _ -> ok = Dec(), - State - end - end. - add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = |
