diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-14 16:32:58 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-14 16:32:58 +0100 |
| commit | c9c41fc670c97a57bc078374c285f5efdb08b7cd (patch) | |
| tree | db18b02126cf5424b3e28382eb2186c8079a7fe1 /src | |
| parent | d30830b7be0a2f6e1141beac06c934c05e2c4789 (diff) | |
| download | rabbitmq-server-git-c9c41fc670c97a57bc078374c285f5efdb08b7cd.tar.gz | |
Bugs fixed, and tests written. GC works. rabbit_disk_queue is functionally complete. However, fortunately, Matthias this morning changed its requirements so there are now several changes to make.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 |
2 files changed, 78 insertions, 15 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 94a444e2a1..4cd146edd9 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -285,7 +285,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, ContiguousTop1 = if CurOffset =:= ContiguousTop -> - ValidTotalSize; % can't be any holes in this file + ValidTotalSize1; % can't be any holes in this file true -> ContiguousTop end, true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), @@ -440,19 +440,37 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig % extended by TotalSize Offset > DestinationContiguousTop end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})), - TmpSize = - lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, CurOffset) when Destination2 =:= Destination -> - {ok, Offset} = file:position(DestinationHdl, {bof, Offset}), + TmpSize = DestinationValid - DestinationContiguousTop, + {TmpSize, BlockStart1, BlockEnd1} = + lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Destination2 =:= Destination -> + % CurOffset is in the TmpFile. + % Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - {ok, Size} = file:copy(DestinationHdl, TmpHdl, Size), % this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset FinalOffset = DestinationContiguousTop + CurOffset, true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}), % sadly you can't use update_element to change the key: true = ets:delete(FileDetail, {Destination, Offset}), true = ets:insert_new(FileDetail, {{Destination, FinalOffset}, TotalSize, MsgId}), - CurOffset + Size - end, 0, Worklist), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + % base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + % extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + % found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = file:position(DestinationHdl, {bof, BlockStart}), + {ok, BSize} = file:copy(DestinationHdl, TmpHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {0, undefined, undefined}, Worklist), + % do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), + {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), % so now Tmp contains everything we need to salvage from Destination, % and both FileDetail and MsgLocation have been updated to reflect compaction of Destination % so truncate Destination and copy from Tmp back to the end @@ -469,18 +487,36 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig ok = file:delete(form_filename(Tmp)) end, SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}), - ExpectedSize = - lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, CurOffset) when Source2 =:= Source -> - {ok, Offset} = file:position(SourceHdl, {bof, Offset}), + {ExpectedSize, BlockStart2, BlockEnd2} = + lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Source2 =:= Source -> + % CurOffset is in the DestinationFile. + % Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - {ok, Size} = file:copy(SourceHdl, DestinationHdl, Size), % update MsgLocation to reflect change of file (3rd field) and offset (4th field) true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]), % can't use update_element to change key: true = ets:delete(FileDetail, {Source, Offset}), true = ets:insert_new(FileDetail, {{Destination, CurOffset}, TotalSize, MsgId}), - CurOffset + Size - end, DestinationValid, SourceWorkList), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + % base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + % extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + % found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = file:position(SourceHdl, {bof, BlockStart}), + {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {DestinationValid, undefined, undefined}, SourceWorkList), + % do the last remaining block + BSize2 = BlockEnd2 - BlockStart2, + {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), + {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2), + % tidy up ok = file:sync(DestinationHdl), ok = file:close(SourceHdl), ok = file:close(DestinationHdl), @@ -535,7 +571,7 @@ load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) -> Num = list_to_integer(filename:rootname(Left)), Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of [] -> 0; - L -> {{Left, Offset1}, TotalSize} = lists:last(L), + L -> {{Left, Offset1}, TotalSize, _MsgId} = lists:last(L), Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State # dqstate { current_file_num = Num, current_file_name = Left, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 89c575bfbd..491950d212 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -632,6 +632,8 @@ test_disk_queue() -> MsgCount <- [1024, 4096, 16384] ], rdq_virgin(), + rdq_stress_gc(100), + rdq_stress_gc(1000), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -647,7 +649,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> io:format("Published ~p ~p-byte messages in ~p microseconds to ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros, QCount, (Micros / (MsgCount * QCount)), (Micros / (MsgCount * QCount * MsgSizeBytes))]), {Micros2, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin [begin rabbit_disk_queue:deliver(Q, N), ok end || N <- List], + [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List], rabbit_disk_queue:ack(Q, List), rabbit_disk_queue:tx_commit(Q, []) end || Q <- Qs] @@ -655,6 +657,31 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> io:format("Delivered ~p ~p-byte messages in ~p microseconds from ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros2, QCount, (Micros2 / (MsgCount * QCount)), (Micros2 / (MsgCount * QCount * MsgSizeBytes))]), rdq_stop(). +% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have +% several files, and then keep punching holes in a reasonably sensible way. +rdq_stress_gc(MsgCount) -> + rdq_virgin(), + rdq_start(), + MsgSizeBytes = 1024*1024, + Msg = <<0:(8*MsgSizeBytes)>>, % 1MB + List = lists:seq(1, MsgCount), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], + rabbit_disk_queue:tx_commit(q, List), + % this list generation is _very_ slow, as it's O(N^2) + AckList = + lists:reverse(lists:foldl(fun (E, Acc) -> case lists:member(E, Acc) of + true -> + Acc; + _False -> [E|Acc] + end + end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(4,MsgCount)]))) + ++ lists:seq(1, 3), + [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q, N), + rabbit_disk_queue:ack(q, [N]), + rabbit_disk_queue:tx_commit(q, []) + end || N <- AckList], + rdq_stop(). + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
