diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-15 08:57:22 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-15 08:57:22 +0100 |
| commit | c8b86f5de876cb5ed859798d8628d67f3946d723 (patch) | |
| tree | de28b3bd4b57ebcd54f26d5b19d8229894774596 /src | |
| parent | 73d0ee7112079d2c8df0f97eb0873d7d1647e130 (diff) | |
| download | rabbitmq-server-git-c8b86f5de876cb5ed859798d8628d67f3946d723.tar.gz | |
minor refactoring and some cosmetic changes
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 140 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 8 |
2 files changed, 71 insertions, 77 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 913cd65f82..21f1505804 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -460,8 +460,8 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, case index_lookup(Guid, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. - %% This is fine to fail (already exists) - ets:insert_new(FileHandlesEts, {{self(), File}, open}), + mark_handle_open(FileHandlesEts, File), + CState1 = close_all_indicated(CState), {Msg, CState2} = %% This will never be the current file read_from_disk(MsgLocation, CState1, DedupCacheEts), @@ -473,14 +473,6 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, end end. -close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = - CState) -> - Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), - lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> - true = ets:delete(FileHandlesEts, Key), - close_handle(File, CStateM) - end, CState, Objs). - %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -958,6 +950,24 @@ close_handle(Key, FHC) -> error -> FHC end. +mark_handle_open(FileHandlesEts, File) -> + %% This is fine to fail (already exists) + ets:insert_new(FileHandlesEts, {{self(), File}, open}), + true. + +mark_handle_to_close(FileHandlesEts, File) -> + [ ets:update_element(FileHandlesEts, Key, {2, close}) + || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], + true. + +close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = + CState) -> + Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), + lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> + true = ets:delete(FileHandlesEts, Key), + close_handle(File, CStateM) + end, CState, Objs). + close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, file_handle_cache = FHC }) -> Self = self(), @@ -1426,9 +1436,7 @@ maybe_roll_to_new_file( State1 = internal_sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, - {ok, NextHdl} = open_file( - Dir, filenum_to_name(NextFile), - ?WRITE_MODE), + {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, @@ -1476,11 +1484,6 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, maybe_compact(State) -> State. -mark_handle_to_close(FileHandlesEts, File) -> - [ ets:update_element(FileHandlesEts, Key, {2, close}) - || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], - true. - find_files_to_gc(FileSummaryEts, [#file_summary { file = Dst, valid_total_size = DstValid, @@ -1495,13 +1498,11 @@ find_files_to_gc(FileSummaryEts, right = SrcRight }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of - undefined -> - not_found; - _ -> - case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of - true -> {Src, Dst}; - false -> find_files_to_gc(FileSummaryEts, Next) - end + undefined -> not_found; + _ -> case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of + true -> {Src, Dst}; + false -> find_files_to_gc(FileSummaryEts, Next) + end end end. @@ -1536,9 +1537,7 @@ delete_file_if_empty(File, State = #msstate { true = mark_handle_to_close(FileHandlesEts, File), true = ets:delete(FileSummaryEts, File), State1 = close_handle(File, State), - ok = file:delete(form_filename( - Dir, - filenum_to_name(File))), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; _ -> State end. @@ -1550,20 +1549,17 @@ delete_file_if_empty(File, State = #msstate { gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> [SrcObj = #file_summary { readers = SrcReaders, - valid_total_size = SrcValidData, left = DstFile, file_size = SrcFileSize, locked = true }] = ets:lookup(FileSummaryEts, SrcFile), [DstObj = #file_summary { readers = DstReaders, - valid_total_size = DstValidData, right = SrcFile, file_size = DstFileSize, locked = true }] = ets:lookup(FileSummaryEts, DstFile), case SrcReaders =:= 0 andalso DstReaders =:= 0 of - true -> TotalValidData = DstValidData + SrcValidData, - ok = combine_files(SrcObj, DstObj, State), + true -> TotalValidData = combine_files(SrcObj, DstObj, State), %% don't update dest.right, because it could be %% changing at the same time true = ets:update_element( @@ -1584,12 +1580,12 @@ combine_files(#file_summary { file = Source, contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> - SourceName = filenum_to_name(Source), + SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), - {ok, SourceHdl} = - open_file(Dir, SourceName, ?READ_AHEAD_MODE), - {ok, DestinationHdl} = - open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + {ok, SourceHdl} = open_file(Dir, SourceName, + ?READ_AHEAD_MODE), + {ok, DestinationHdl} = open_file(Dir, DestinationName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file @@ -1597,10 +1593,11 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> + case DestinationContiguousTop =:= DestinationValid of + true -> ok = truncate_and_extend_file( DestinationHdl, DestinationValid, ExpectedSize); - true -> + false -> {DestinationWorkList, DestinationValid} = find_unremoved_messages_in_file(Destination, State), Worklist = @@ -1618,8 +1615,7 @@ combine_files(#file_summary { file = Source, %% enforce it anyway end, DestinationWorkList), Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file( - Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State), @@ -1644,11 +1640,11 @@ combine_files(#file_summary { file = Source, %% tidy up ok = file_handle_cache:close(DestinationHdl), ok = file_handle_cache:delete(SourceHdl), - ok. + ExpectedSize. find_unremoved_messages_in_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> - %% Msgs here will be end-of-file at start-of-list + %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order @@ -1663,11 +1659,18 @@ find_unremoved_messages_in_file(File, copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Copy = fun ({BlockStart, BlockEnd}) -> + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, BlockStart), + {ok, BSize} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize) + end, case lists:foldl( fun (#msg_location { guid = Guid, offset = Offset, total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> + {CurOffset, Block = {BlockStart, BlockEnd}}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset @@ -1675,40 +1678,29 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, [{#msg_location.file, Destination}, {#msg_location.offset, CurOffset}], IndexState), - {BlockStart2, BlockEnd2} = - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the - %% next msg follows straight on - {BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work - %% for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, - BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {Offset, Offset + TotalSize} - end, - {CurOffset + TotalSize, BlockStart2, BlockEnd2} - end, {InitOffset, undefined, undefined}, WorkList) of - {FinalOffset, BlockStart1, BlockEnd1} -> + {CurOffset + TotalSize, + case BlockEnd of + undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + _ -> + %% found a gap, so actually do the work for + %% the previous block + Copy(Block), + {Offset, Offset + TotalSize} + end} + end, {InitOffset, {undefined, undefined}}, WorkList) of + {FinalOffset, Block} -> case WorkList of [] -> ok; - %% do the last remaining block - _ -> BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = - file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = - file_handle_cache:copy(SourceHdl, DestinationHdl, - BSize1), + _ -> Copy(Block), %% do the last remaining block ok = file_handle_cache:sync(DestinationHdl) end; - {FinalOffsetZ, _BlockStart1, _BlockEnd1} -> + {FinalOffsetZ, _Block} -> {gc_error, [{expected, FinalOffset}, {got, FinalOffsetZ}, {destination, Destination}]} diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 8a275c39d9..038d51c484 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -71,9 +71,11 @@ set_maximum_since_use(Pid, Age) -> init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, index_state = IndexState, - index_module = IndexModule, parent = Parent, - file_summary_ets = FileSummaryEts}, + {ok, #gcstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + parent = Parent, + file_summary_ets = FileSummaryEts }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. |
