summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-14 16:32:58 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-14 16:32:58 +0100
commitc9c41fc670c97a57bc078374c285f5efdb08b7cd (patch)
treedb18b02126cf5424b3e28382eb2186c8079a7fe1 /src
parentd30830b7be0a2f6e1141beac06c934c05e2c4789 (diff)
downloadrabbitmq-server-git-c9c41fc670c97a57bc078374c285f5efdb08b7cd.tar.gz
Bugs fixed, and tests written. GC works. rabbit_disk_queue is functionally complete. However, fortunately, Matthias this morning changed its requirements so there are now several changes to make.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl64
-rw-r--r--src/rabbit_tests.erl29
2 files changed, 78 insertions, 15 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 94a444e2a1..4cd146edd9 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -285,7 +285,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}),
ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
- ValidTotalSize; % can't be any holes in this file
+ ValidTotalSize1; % can't be any holes in this file
true -> ContiguousTop
end,
true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}),
@@ -440,19 +440,37 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
% extended by TotalSize
Offset > DestinationContiguousTop
end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})),
- TmpSize =
- lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, CurOffset) when Destination2 =:= Destination ->
- {ok, Offset} = file:position(DestinationHdl, {bof, Offset}),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ {TmpSize, BlockStart1, BlockEnd1} =
+ lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Destination2 =:= Destination ->
+ % CurOffset is in the TmpFile.
+ % Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- {ok, Size} = file:copy(DestinationHdl, TmpHdl, Size),
% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
FinalOffset = DestinationContiguousTop + CurOffset,
true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}),
% sadly you can't use update_element to change the key:
true = ets:delete(FileDetail, {Destination, Offset}),
true = ets:insert_new(FileDetail, {{Destination, FinalOffset}, TotalSize, MsgId}),
- CurOffset + Size
- end, 0, Worklist),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ % base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ % extend the current block because the next msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ % found a gap, so actually do the work for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} = file:position(DestinationHdl, {bof, BlockStart}),
+ {ok, BSize} = file:copy(DestinationHdl, TmpHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {0, undefined, undefined}, Worklist),
+ % do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
+ {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
% so now Tmp contains everything we need to salvage from Destination,
% and both FileDetail and MsgLocation have been updated to reflect compaction of Destination
% so truncate Destination and copy from Tmp back to the end
@@ -469,18 +487,36 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
ok = file:delete(form_filename(Tmp))
end,
SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}),
- ExpectedSize =
- lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, CurOffset) when Source2 =:= Source ->
- {ok, Offset} = file:position(SourceHdl, {bof, Offset}),
+ {ExpectedSize, BlockStart2, BlockEnd2} =
+ lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Source2 =:= Source ->
+ % CurOffset is in the DestinationFile.
+ % Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- {ok, Size} = file:copy(SourceHdl, DestinationHdl, Size),
% update MsgLocation to reflect change of file (3rd field) and offset (4th field)
true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]),
% can't use update_element to change key:
true = ets:delete(FileDetail, {Source, Offset}),
true = ets:insert_new(FileDetail, {{Destination, CurOffset}, TotalSize, MsgId}),
- CurOffset + Size
- end, DestinationValid, SourceWorkList),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ % base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ % extend the current block because the next msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ % found a gap, so actually do the work for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} = file:position(SourceHdl, {bof, BlockStart}),
+ {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {DestinationValid, undefined, undefined}, SourceWorkList),
+ % do the last remaining block
+ BSize2 = BlockEnd2 - BlockStart2,
+ {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
+ {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2),
+ % tidy up
ok = file:sync(DestinationHdl),
ok = file:close(SourceHdl),
ok = file:close(DestinationHdl),
@@ -535,7 +571,7 @@ load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) ->
Num = list_to_integer(filename:rootname(Left)),
Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of
[] -> 0;
- L -> {{Left, Offset1}, TotalSize} = lists:last(L),
+ L -> {{Left, Offset1}, TotalSize, _MsgId} = lists:last(L),
Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State # dqstate { current_file_num = Num, current_file_name = Left,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 89c575bfbd..491950d212 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -632,6 +632,8 @@ test_disk_queue() ->
MsgCount <- [1024, 4096, 16384]
],
rdq_virgin(),
+ rdq_stress_gc(100),
+ rdq_stress_gc(1000),
passed.
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
@@ -647,7 +649,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
io:format("Published ~p ~p-byte messages in ~p microseconds to ~p queues (~p microseconds/msg) (~p microseconds/byte)~n",
[MsgCount, MsgSizeBytes, Micros, QCount, (Micros / (MsgCount * QCount)), (Micros / (MsgCount * QCount * MsgSizeBytes))]),
{Micros2, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
+ [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
rabbit_disk_queue:ack(Q, List),
rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
@@ -655,6 +657,31 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
io:format("Delivered ~p ~p-byte messages in ~p microseconds from ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros2, QCount, (Micros2 / (MsgCount * QCount)), (Micros2 / (MsgCount * QCount * MsgSizeBytes))]),
rdq_stop().
+% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have
+% several files, and then keep punching holes in a reasonably sensible way.
+rdq_stress_gc(MsgCount) ->
+ rdq_virgin(),
+ rdq_start(),
+ MsgSizeBytes = 1024*1024,
+ Msg = <<0:(8*MsgSizeBytes)>>, % 1MB
+ List = lists:seq(1, MsgCount),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
+ rabbit_disk_queue:tx_commit(q, List),
+ % this list generation is _very_ slow, as it's O(N^2)
+ AckList =
+ lists:reverse(lists:foldl(fun (E, Acc) -> case lists:member(E, Acc) of
+ true ->
+ Acc;
+ _False -> [E|Acc]
+ end
+ end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(4,MsgCount)])))
+ ++ lists:seq(1, 3),
+ [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q, N),
+ rabbit_disk_queue:ack(q, [N]),
+ rabbit_disk_queue:tx_commit(q, [])
+ end || N <- AckList],
+ rdq_stop().
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).