summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-14 10:50:12 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-14 10:50:12 +0100
commit4c8ba43d6dc57b7f4574691648a3c587662bfe1d (patch)
tree4c1463b391f6dd57338bddfb5f00667055ad5936
parent341f47f909e23645b7d3af4643ae721bb027c5c6 (diff)
downloadrabbitmq-server-git-4c8ba43d6dc57b7f4574691648a3c587662bfe1d.tar.gz
GC file movement is done, but updating the accounting information is not done. So don't use it!
-rw-r--r--src/rabbit_disk_queue.erl145
1 files changed, 116 insertions, 29 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 371a5e4ba4..f70f31b472 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -167,6 +167,7 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { current_file_handle = FileHdl,
read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
+ % deliberately ignoring return codes here
if FileHdl =:= undefined -> ok;
true -> file:sync(FileHdl),
file:close(FileHdl)
@@ -281,7 +282,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}]
= ets:lookup(FileSummary, CurName),
- true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}),
+ true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}),
ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
ValidTotalSize; % can't be any holes in this file
@@ -363,35 +364,121 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File),
- GoRight = fun() ->
- case Right of
- undefined -> State;
- _ when not(CurName =:= Right) ->
- [{Right, RightValidData, _RightContiguousTop, File, _Right}]
- = ets:lookup(FileSummary, Right),
- if FileSizeLimit >= (ValidData + RightValidData) ->
- combineFiles(Right, File, State);
- true -> State
- end;
- _ -> State
- end
- end,
- case Left of
- undefined ->
- GoRight();
- _ -> [{Left, LeftValidData, _LeftContiguousTop, _Left, File}]
- = ets:lookup(FileSummary, Left),
- if FileSizeLimit >= (ValidData + LeftValidData) ->
- combineFiles(File, Left, State);
- true ->
- GoRight()
- end
+ % DELIBERATE BADMATCH. This code is not ready yet
+ ko = io:format("uh oh~n", []),
+ % the file we're looking at may no longer exist as it may have been deleted
+ % within the current GC run
+ case ets:lookup(FileSummary, File) of
+ [] -> State;
+ [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
+ GoRight = fun() ->
+ case Right of
+ undefined -> State;
+ _ when not(CurName =:= Right) ->
+ [RightObj = {Right, RightValidData, _RightContiguousTop, File, RightRight}]
+ = ets:lookup(FileSummary, Right),
+ RightSumData = ValidData + RightValidData,
+ if FileSizeLimit >= RightSumData ->
+ % here, Right will be the source and so will be deleted,
+ % File will be the destination
+ State1 = combineFiles(RightObj, FileObj, State),
+ % this could fail if RightRight is undefined
+ ets:update_element(FileSummary, RightRight, {4, File}), % left is the 4th field
+ true = ets:insert(FileSummary, {File, RightSumData, RightSumData, Left, RightRight}),
+ true = ets:delete(FileSummary, Right),
+ State1;
+ true -> State
+ end;
+ _ -> State
+ end
+ end,
+ case Left of
+ undefined ->
+ GoRight();
+ _ -> [LeftObj = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}]
+ = ets:lookup(FileSummary, Left),
+ LeftSumData = ValidData + LeftValidData,
+ if FileSizeLimit >= LeftSumData ->
+ % here, File will be the source and so will be deleted,
+ % Left will be the destination
+ State1 = combineFiles(FileObj, LeftObj, State),
+ % this could fail if Right is undefined
+ ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field
+ true = ets:insert(FileSummary, {Left, LeftSumData, LeftSumData, LeftLeft, Right}),
+ true = ets:delete(FileSummary, File),
+ State1;
+ true ->
+ GoRight()
+ end
+ end
end.
-combineFiles(Source, Destination, State) ->
+combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight},
+ {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight},
+ State1) ->
+ (State = #dqstate { file_detail = FileDetail }) = closeFile(Source, closeFile(Destination, State1)),
+ {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]),
+ {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]),
+ ExpectedSize = SourceValid + DestinationValid,
+ % if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file
+ % if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file
+ % then truncate, copy back in, and then copy over from Source
+ % otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid}),
+ ok = file:truncate(DestinationHdl),
+ {ok, ExpectedSize} = file:position(DestinationHdl, {cur, SourceValid}),
+ ok = file:truncate(DestinationHdl),
+ {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid});
+ true ->
+ Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]),
+ Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId}) when Destination2 =:= Destination ->
+ Offset > DestinationContiguousTop
+ end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})),
+ % RevMapping :: [{TmpOffset, DestinationOrigOffset, TotalSize}]
+ {TmpSize, RevMapping} =
+ lists:foldl(fun ({{Destination2, Offset}, TotalSize}, {CurOffset, Acc}) when Destination2 =:= Destination ->
+ {ok, Offset} = file:position(DestinationHdl, {bof, Offset}),
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {ok, Size} = file:copy(DestinationHdl, TmpHdl, Size),
+ {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]}
+ end, {0, []}, Worklist),
+ % so now Tmp contains everything we need to salvage from Destination,
+ % so truncate Destination and copy from Tmp back to the end
+ {ok, 0} = file:position(TmpHdl, {bof, 0}),
+ {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}),
+ ok = file:truncate(DestinationHdl),
+ {ok, ExpectedSize} = file:position(DestinationHdl, {cur, SourceValid}),
+ ok = file:truncate(DestinationHdl),
+ {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}),
+ {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ ok = file:sync(DestinationHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(form_filename(Tmp))
+ end,
+ SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}),
+ {ExpectedSize, RevMapping2} =
+ lists:foldl(fun ({{Source2, Offset}, TotalSize, _MsgId}, {CurOffset, Acc}) when Source2 =:= Source ->
+ {ok, Offset} = file:position(SourceHdl, {bof, Offset}),
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {ok, Size} = file:copy(SourceHdl, DestinationHdl, Size),
+ {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]}
+ end, {DestinationContiguousTop, []}, SourceWorkList),
+ ok = file:sync(DestinationHdl),
+ ok = file:close(SourceHdl),
+ ok = file:close(DestinationHdl),
+ ok = file:delete(form_filename(Source)),
State.
-
+
+closeFile(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) ->
+ case dict:find(File, ReadHdls) of
+ error ->
+ State;
+ {ok, {Hdl, Then}} ->
+ ok = file:close(Hdl),
+ State #dqstate { read_file_handles = { dict:erase(File, ReadHdls), gb_trees:delete(Then, ReadHdlsAge) } }
+ end.
delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
[{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File),
@@ -430,7 +517,7 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
State;
load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_'}) of
+ Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of
[] -> 0;
L -> {{Left, Offset1}, TotalSize} = lists:last(L),
Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT
@@ -452,7 +539,7 @@ load_messages(Left, [File|Files],
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
- true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize}),
+ true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize, MsgId}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}