summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-14 12:31:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-14 12:31:59 +0100
commit9b399fc817b9ef8c5992e3b4ca495e19e04df04c (patch)
tree29b67c36216dc44e598aef3c89a8dae323436b46 /src
parent4c8ba43d6dc57b7f4574691648a3c587662bfe1d (diff)
downloadrabbitmq-server-git-9b399fc817b9ef8c5992e3b4ca495e19e04df04c.tar.gz
well, the GC is _written_ now. Not tested yet...
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl42
1 files changed, 29 insertions, 13 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index f70f31b472..22406b0b0c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -364,8 +364,6 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- % DELIBERATE BADMATCH. This code is not ready yet
- ko = io:format("uh oh~n", []),
% the file we're looking at may no longer exist as it may have been deleted
% within the current GC run
case ets:lookup(FileSummary, File) of
@@ -416,7 +414,8 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight},
State1) ->
- (State = #dqstate { file_detail = FileDetail }) = closeFile(Source, closeFile(Destination, State1)),
+ (State = #dqstate { file_detail = FileDetail, msg_location = MsgLocation })
+ = closeFile(Source, closeFile(Destination, State1)),
{ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]),
{ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]),
ExpectedSize = SourceValid + DestinationValid,
@@ -433,18 +432,29 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]),
- Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId}) when Destination2 =:= Destination ->
+ % as FileDetail is an ordered_set, we should have the lowest offsets first
+ Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId})
+ when Destination2 =:= Destination, Offset /= DestinationContiguousTop ->
+ % it cannot be that Offset == DestinationContiguousTop
+ % because if it was then DestinationContiguousTop would have been
+ % extended by TotalSize
Offset > DestinationContiguousTop
end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})),
- % RevMapping :: [{TmpOffset, DestinationOrigOffset, TotalSize}]
- {TmpSize, RevMapping} =
- lists:foldl(fun ({{Destination2, Offset}, TotalSize}, {CurOffset, Acc}) when Destination2 =:= Destination ->
+ TmpSize =
+ lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, CurOffset) when Destination2 =:= Destination ->
{ok, Offset} = file:position(DestinationHdl, {bof, Offset}),
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
{ok, Size} = file:copy(DestinationHdl, TmpHdl, Size),
- {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]}
- end, {0, []}, Worklist),
+ % this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
+ FinalOffset = DestinationContiguousTop + CurOffset,
+ true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}),
+ % sadly you can't use update_element to change the key:
+ true = ets:delete(FileDetail, {Destination, Offset}),
+ true = ets:insert_new(FileDetail, {{Destination, FinalOffset}, TotalSize, MsgId}),
+ CurOffset + Size
+ end, 0, Worklist),
% so now Tmp contains everything we need to salvage from Destination,
+ % and both FileDetail and MsgLocation have been updated to reflect compaction of Destination
% so truncate Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
{ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}),
@@ -453,18 +463,24 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
ok = file:truncate(DestinationHdl),
{ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ % position in DestinationHdl should now be DestinationValid
ok = file:sync(DestinationHdl),
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Tmp))
end,
SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}),
- {ExpectedSize, RevMapping2} =
- lists:foldl(fun ({{Source2, Offset}, TotalSize, _MsgId}, {CurOffset, Acc}) when Source2 =:= Source ->
+ ExpectedSize =
+ lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, CurOffset) when Source2 =:= Source ->
{ok, Offset} = file:position(SourceHdl, {bof, Offset}),
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
{ok, Size} = file:copy(SourceHdl, DestinationHdl, Size),
- {CurOffset + Size, [{CurOffset, Offset, TotalSize}|Acc]}
- end, {DestinationContiguousTop, []}, SourceWorkList),
+ % update MsgLocation to reflect change of file (3rd field) and offset (4th field)
+ true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]),
+ % can't use update_element to change key:
+ true = ets:delete(FileDetail, {Source, Offset}),
+ true = ets:insert_new(FileDetail, {{Destination, CurOffset}, TotalSize, MsgId}),
+ CurOffset + Size
+ end, DestinationValid, SourceWorkList),
ok = file:sync(DestinationHdl),
ok = file:close(SourceHdl),
ok = file:close(DestinationHdl),