diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-14 10:50:12 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-14 10:50:12 +0100 |
| commit | 4c8ba43d6dc57b7f4574691648a3c587662bfe1d (patch) | |
| tree | 4c1463b391f6dd57338bddfb5f00667055ad5936 | |
| parent | 341f47f909e23645b7d3af4643ae721bb027c5c6 (diff) | |
| download | rabbitmq-server-git-4c8ba43d6dc57b7f4574691648a3c587662bfe1d.tar.gz | |
GC file movement is done, but updating the accounting information is not done. So don't use it!
| -rw-r--r-- | src/rabbit_disk_queue.erl | 145 |
1 files changed, 116 insertions, 29 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 371a5e4ba4..f70f31b472 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -167,6 +167,7 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> + % deliberately ignoring return codes here if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), file:close(FileHdl) @@ -281,7 +282,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), - true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}), + true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, ContiguousTop1 = if CurOffset =:= ContiguousTop -> ValidTotalSize; % can't be any holes in this file @@ -363,35 +364,121 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, file_summary = FileSummary, current_file_name = CurName }) -> - [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), - GoRight = fun() -> - case Right of - undefined -> State; - _ when not(CurName =:= Right) -> - [{Right, RightValidData, _RightContiguousTop, File, _Right}] - = ets:lookup(FileSummary, Right), - if FileSizeLimit >= (ValidData + RightValidData) -> - combineFiles(Right, File, State); - true -> State - end; - _ -> State - end - end, - case Left of - undefined -> - GoRight(); - _ -> [{Left, LeftValidData, _LeftContiguousTop, _Left, File}] - = ets:lookup(FileSummary, Left), - if FileSizeLimit >= (ValidData + LeftValidData) -> - combineFiles(File, Left, State); - true -> - GoRight() - end + % DELIBERATE BADMATCH. This code is not ready yet + ko = io:format("uh oh~n", []), + % the file we're looking at may no longer exist as it may have been deleted + % within the current GC run + case ets:lookup(FileSummary, File) of + [] -> State; + [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> + GoRight = fun() -> + case Right of + undefined -> State; + _ when not(CurName =:= Right) -> + [RightObj = {Right, RightValidData, _RightContiguousTop, File, RightRight}] + = ets:lookup(FileSummary, Right), + RightSumData = ValidData + RightValidData, + if FileSizeLimit >= RightSumData -> + % here, Right will be the source and so will be deleted, + % File will be the destination + State1 = combineFiles(RightObj, FileObj, State), + % this could fail if RightRight is undefined + ets:update_element(FileSummary, RightRight, {4, File}), % left is the 4th field + true = ets:insert(FileSummary, {File, RightSumData, RightSumData, Left, RightRight}), + true = ets:delete(FileSummary, Right), + State1; + true -> State + end; + _ -> State + end + end, + case Left of + undefined -> + GoRight(); + _ -> [LeftObj = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] + = ets:lookup(FileSummary, Left), + LeftSumData = ValidData + LeftValidData, + if FileSizeLimit >= LeftSumData -> + % here, File will be the source and so will be deleted, + % Left will be the destination + State1 = combineFiles(FileObj, LeftObj, State), + % this could fail if Right is undefined + ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field + true = ets:insert(FileSummary, {Left, LeftSumData, LeftSumData, LeftLeft, Right}), + true = ets:delete(FileSummary, File), + State1; + true -> + GoRight() + end + end end. -combineFiles(Source, Destination, State) -> +combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, + {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, + State1) -> + (State = #dqstate { file_detail = FileDetail }) = closeFile(Source, closeFile(Destination, State1)), + {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), + {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]), + ExpectedSize = SourceValid + DestinationValid, + % if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file + % if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file + % then truncate, copy back in, and then copy over from Source + % otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid}), + ok = file:truncate(DestinationHdl), + {ok, ExpectedSize} = file:position(DestinationHdl, {cur, SourceValid}), + ok = file:truncate(DestinationHdl), + {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid}); + true -> + Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]), + Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId}) when Destination2 =:= Destination -> + Offset > DestinationContiguousTop + end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})), + % RevMapping :: [{TmpOffset, DestinationOrigOffset, TotalSize}] + {TmpSize, RevMapping} = + lists:foldl(fun ({{Destination2, Offset}, TotalSize}, {CurOffset, Acc}) when Destination2 =:= Destination -> + {ok, Offset} = file:position(DestinationHdl, {bof, Offset}), + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + {ok, Size} = file:copy(DestinationHdl, TmpHdl, Size), + {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]} + end, {0, []}, Worklist), + % so now Tmp contains everything we need to salvage from Destination, + % so truncate Destination and copy from Tmp back to the end + {ok, 0} = file:position(TmpHdl, {bof, 0}), + {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}), + ok = file:truncate(DestinationHdl), + {ok, ExpectedSize} = file:position(DestinationHdl, {cur, SourceValid}), + ok = file:truncate(DestinationHdl), + {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}), + {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + ok = file:sync(DestinationHdl), + ok = file:close(TmpHdl), + ok = file:delete(form_filename(Tmp)) + end, + SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}), + {ExpectedSize, RevMapping2} = + lists:foldl(fun ({{Source2, Offset}, TotalSize, _MsgId}, {CurOffset, Acc}) when Source2 =:= Source -> + {ok, Offset} = file:position(SourceHdl, {bof, Offset}), + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + {ok, Size} = file:copy(SourceHdl, DestinationHdl, Size), + {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]} + end, {DestinationContiguousTop, []}, SourceWorkList), + ok = file:sync(DestinationHdl), + ok = file:close(SourceHdl), + ok = file:close(DestinationHdl), + ok = file:delete(form_filename(Source)), State. - + +closeFile(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) -> + case dict:find(File, ReadHdls) of + error -> + State; + {ok, {Hdl, Then}} -> + ok = file:close(Hdl), + State #dqstate { read_file_handles = { dict:erase(File, ReadHdls), gb_trees:delete(Then, ReadHdlsAge) } } + end. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), @@ -430,7 +517,7 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, State; load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) -> Num = list_to_integer(filename:rootname(Left)), - Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_'}) of + Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of [] -> 0; L -> {{Left, Offset1}, TotalSize} = lists:last(L), Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT @@ -452,7 +539,7 @@ load_messages(Left, [File|Files], 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), - true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize}), + true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize, MsgId}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } |
