diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 18:07:27 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 18:07:27 +0100 |
| commit | 6647baf5521f35cdde481c09e542ba0a7f51ffad (patch) | |
| tree | 8e2963872c656d89145224f2aca9370bd57b73c6 | |
| parent | 2969f6ad2a7d1192901dddbfa3bb1862a0295cd9 (diff) | |
| parent | 800bf156f535a81cb37e8b183b3d0a563fff3d0a (diff) | |
| download | rabbitmq-server-git-6647baf5521f35cdde481c09e542ba0a7f51ffad.tar.gz | |
Merging bug 23233 into bug 23133
| -rw-r--r-- | src/rabbit_msg_store.erl | 113 |
1 files changed, 50 insertions, 63 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f9345c0b33..6f3f70e11e 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -98,8 +98,7 @@ }). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked, readers}). + {file, valid_total_size, left, right, file_size, locked, readers}). %%---------------------------------------------------------------------------- @@ -159,8 +158,7 @@ %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: -%% {File, ValidTotalSize, ContiguousTop, Left, Right, -%% FileSize, Locked, Readers} +%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} %% %% The basic idea is that messages are appended to the current file up %% until that file becomes too big (> file_size_limit). At that point, @@ -176,9 +174,7 @@ %% %% As messages are removed from files, holes appear in these %% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. +%% data left in the file. This is needed for garbage collection. %% %% When we discover that a file is now empty, we delete it. When we %% discover that it can be combined with the useful data in either its @@ -224,9 +220,7 @@ %% above B (i.e. truncate to the limit of the good contiguous region %% at the start of the file), then write C and D on top and then write %% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). +%% good data at the bottom of files are not rewritten. %% %% +-------+ +-------+ +-------+ %% | X | | G | | G | @@ -619,8 +613,7 @@ handle_cast({write, Guid}, case index_lookup(Guid, State) of not_found -> write_message(Guid, Msg, 1, State); - #msg_location { ref_count = 0, file = File, offset = Offset, - total_size = TotalSize } -> + #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> [#file_summary { locked = Locked, file_size = FileSize } = Summary] = ets:lookup(FileSummaryEts, File), @@ -631,8 +624,8 @@ handle_cast({write, Guid}, false -> ok = index_update_fields( Guid, {#msg_location.ref_count, 1}, State), - ok = add_to_file_summary(Summary, TotalSize, Offset, File, - FileSize, State), + ok = add_to_file_summary(Summary, TotalSize, File, FileSize, + State), noreply(State #msstate { sum_valid_data = SumValid + TotalSize }) end; @@ -810,27 +803,21 @@ write_message(Guid, Msg, RefCount, locked = false, file_size = FileSize } = Summary] = ets:lookup(FileSummaryEts, CurFile), - ok = add_to_file_summary(Summary, TotalSize, CurOffset, CurFile, - FileSize + TotalSize, State), + ok = add_to_file_summary(Summary, TotalSize, CurFile, FileSize + TotalSize, + State), NextOffset = CurOffset + TotalSize, noreply(maybe_roll_to_new_file( NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize })). -add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop }, - TotalSize, Offset, File, FileSize, +add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize }, + TotalSize, File, FileSize, #msstate { file_summary_ets = FileSummaryEts }) -> ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case ContiguousTop of - Offset -> ContiguousTop + TotalSize; - _ -> ContiguousTop - end, true = ets:update_element( FileSummaryEts, File, [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, {#file_summary.file_size, FileSize}]), ok. @@ -928,8 +915,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = - index_lookup(Guid, State), + total_size = TotalSize } = index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -937,7 +923,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of @@ -947,12 +932,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, ok = index_update_fields( Guid, {#msg_location.ref_count, RefCount - 1}, State), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}]), + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; @@ -1305,16 +1289,17 @@ scan_file_for_valid_messages(Dir, FileName) -> %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). +drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). -find_contiguous_block_prefix([], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}; -find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, Guids) -> +drop_contiguous_block_prefix([], ExpectedOffset) -> + {ExpectedOffset, []}; +drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, + total_size = TotalSize } | Tail], + ExpectedOffset) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}. + drop_contiguous_block_prefix(Tail, ExpectedOffset1); +drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> + {ExpectedOffset, MsgsAfterGap}. build_index(true, _StartupFunState, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1390,9 +1375,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, {VMAcc, VTSAcc} end end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), {Right, FileSize1} = case Files of %% if it's the last file, we'll truncate to remove any @@ -1409,7 +1391,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, left = Left, right = Right, file_size = FileSize1, @@ -1437,7 +1418,6 @@ maybe_roll_to_new_file( true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, - contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, @@ -1568,7 +1548,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> true = ets:update_element( FileSummaryEts, DstFile, [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; false -> concurrent_readers @@ -1596,23 +1575,31 @@ combine_files(#file_summary { file = Source, %% otherwise we just truncate straight away and copy over from Source {DestinationWorkList, DestinationValid} = find_unremoved_messages_in_file(Destination, State), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - DestinationWorkList, 0, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, 0, ExpectedSize), - {ok, DestinationValid} = - file_handle_cache:copy(TmpHdl, DestinationHdl, DestinationValid), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl), + {DestinationContiguousTop, DestinationWorkListTail} = + drop_contiguous_block_prefix(DestinationWorkList), + case DestinationWorkListTail of + [] -> ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), + ok = copy_messages( + DestinationWorkListTail, DestinationContiguousTop, + DestinationValid, DestinationHdl, TmpHdl, Destination, + State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) + end, {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, |
