diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 70 |
1 files changed, 27 insertions, 43 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d0adfdcbd3..09c285670e 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -619,7 +619,7 @@ handle_cast({write, Guid, Msg}, true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), case index_lookup(Guid, State) of not_found -> - write_message(Guid, Msg, RefCount, State); + write_message(Guid, Msg, 1, State); #msg_location { ref_count = 0, file = File, offset = Offset, total_size = TotalSize } -> [#file_summary { locked = Locked, @@ -628,7 +628,7 @@ handle_cast({write, Guid, Msg}, case Locked of true -> ok = index_delete(Guid, State), - write_message(Guid, Msg, RefCount, State); + write_message(Guid, Msg, 1, State); false -> ok = index_update_fields( Guid, {#msg_location.ref_count, 1}, State), @@ -703,7 +703,6 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, [{#file_summary.locked, false}, {#file_summary.right, SrcRight}]), true = ets:delete(FileSummaryEts, Src), - ok = index_delete_by_file(Src, State), noreply( maybe_compact(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, @@ -1573,7 +1572,6 @@ combine_files(#file_summary { file = Source, left = Destination }, #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), @@ -1589,42 +1587,25 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - case DestinationContiguousTop =:= DestinationValid of - true -> - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); - false -> - {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset =/= DestinationContiguousTop -> - %% it cannot be that Offset =:= - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - end, DestinationWorkList), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl) - end, + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ok = copy_messages( + DestinationWorkList, 0, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + %% so now Tmp contains everything we need to salvage from + %% Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, 0, ExpectedSize), + {ok, DestinationValid} = + file_handle_cache:copy(TmpHdl, DestinationHdl, DestinationValid), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl), {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, @@ -1643,9 +1624,12 @@ find_unremoved_messages_in_file(File, lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of #msg_location { file = File, total_size = TotalSize, - ref_count = RefCount, - offset = Offset } = Entry - when RefCount > 0 -> + ref_count = 0, + offset = Offset } -> + ok = Index:delete(Guid, IndexState), + Acc; + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> {[ Entry | List ], TotalSize + Size}; _ -> Acc |
