diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-04 15:14:18 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-04 15:14:18 +0100 |
| commit | c384ec140a7c32631f2df37d39d6ad1bf50dc020 (patch) | |
| tree | 625b5e89a3d6929c52096920d150da9a9f7c5e46 /src | |
| parent | 4ff65bec96c96a3a7b2fa5a13f0902a714c31e4a (diff) | |
| download | rabbitmq-server-git-c384ec140a7c32631f2df37d39d6ad1bf50dc020.tar.gz | |
sorted out the disk_queue tests which had been left behind with the last set of API changes. By fixing them, discovered a bug in the disk queue. Also made the tests a little more rigorous, and discovered a the rdq_stress_gc test was not doing anything like what I'd wanted. Fixed.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 119 |
2 files changed, 64 insertions, 57 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 8fb5b90505..da25e52495 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1247,7 +1247,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> true = ets:insert_new(Sequences, - {Q, SeqId, NextWrite}); + {Q, SeqId, NextWrite, -1}); [Orig = {Q, Read, Write, Length}] -> Repl = {Q, lists:min([Read, SeqId]), %% Length is wrong here, but diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 597b0a76e6..46c641fc93 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -689,6 +689,7 @@ delete_log_handlers(Handlers) -> ok. test_disk_queue() -> + rdq_stop(), % unicode chars are supported properly from r13 onwards io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), @@ -707,7 +708,6 @@ test_disk_queue() -> rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), - rabbit_disk_queue:to_ram_disk_mode(), QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), @@ -721,9 +721,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [begin SeqIds = - [begin {N, Msg, MsgSizeBytes, false, SeqId} = - rabbit_disk_queue:deliver(Q), SeqId end - || N <- List], + [begin + Remaining = MsgCount - N, + {N, Msg, MsgSizeBytes, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(Q), + SeqId + end || N <- List], ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) end || Q <- Qs] end]]), @@ -747,53 +750,36 @@ rdq_stress_gc(MsgCount) -> rabbit_disk_queue:tx_commit(q, List, []), StartChunk = round(MsgCount / 20), % 5% AckList = - lists:reverse( - lists:foldl( - fun (E, Acc) -> - case Acc of - [] -> [E]; - [F|_Fs] -> - case E rem F of - 0 -> Acc; - _ -> [E|Acc] - end - end - end, [], lists:flatten([lists:seq(N,MsgCount,N) - || N <- lists:seq(StartChunk,MsgCount)]))) ++ - lists:seq(1, (StartChunk - 1)), + lists:foldl( + fun (E, Acc) -> + case lists:member(E, Acc) of + true -> Acc; + false -> [E|Acc] + end + end, [], lists:flatten( + lists:reverse( + [ lists:seq(N, MsgCount, N) + || N <- lists:seq(1, round(MsgCount / 2), 1) + ]))), + {Start, End} = lists:split(StartChunk, AckList), + AckList2 = End ++ Start, MsgIdToSeqDict = lists:foldl( - fun (_, Acc) -> - {MsgId, Msg, MsgSizeBytes, false, SeqId} = + fun (MsgId, Acc) -> + Remaining = MsgCount - MsgId, + {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), dict:store(MsgId, SeqId, Acc) end, dict:new(), List), %% we really do want to ack each of this individually [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), - rabbit_disk_queue:ack(q, [SeqId]) end - || MsgId <- AckList], + rabbit_disk_queue:ack(q, [SeqId]) + end || MsgId <- AckList2], rabbit_disk_queue:tx_commit(q, [], []), + empty = rabbit_disk_queue:deliver(q), rdq_stop(), passed. -rdq_time_insane_startup() -> - rdq_virgin(), - OneGig = 1024*1024*1024, - rabbit_disk_queue:start_link(OneGig), - rabbit_disk_queue:to_ram_disk_mode(), - Msg = <<>>, - Count = 100000, - List = lists:seq(1, Count), - %% 1M empty messages, at say, 100B per message, should all fit - %% within 1GB and thus in a single file - io:format("Publishing ~p empty messages...~n",[Count]), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], - rabbit_disk_queue:tx_commit(q, List, []), - io:format("...done. Timing restart...~n", []), - rdq_stop(), - Micros = rdq_virgin(), - io:format("...startup took ~w microseconds.~n", [Micros]). - rdq_test_startup_with_queue_gaps() -> rdq_virgin(), rdq_start(), @@ -805,8 +791,10 @@ rdq_test_startup_with_queue_gaps() -> rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half - Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId + end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% ack every other message we have delivered (starting at the _first_) lists:foldl(fun (SeqId2, true) -> @@ -821,13 +809,19 @@ rdq_test_startup_with_queue_gaps() -> rdq_start(), io:format("Startup (with shuffle) done~n", []), %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered - Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(2,Half,2)], + Seqs2 = [begin + Remaining = round(Total - ((Half + N)/2)), + {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(2,Half,2)], rabbit_disk_queue:tx_commit(q, [], Seqs2), io:format("Reread non-acked messages done~n", []), %% and now fetch the rest - Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1 + Half,Total)], + Seqs3 = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1 + Half,Total)], rabbit_disk_queue:tx_commit(q, [], Seqs3), io:format("Read second half done~n", []), empty = rabbit_disk_queue:deliver(q), @@ -845,8 +839,11 @@ rdq_test_redeliver() -> rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half - Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% now requeue every other message (starting at the _first_) %% and ack the other ones @@ -860,11 +857,17 @@ rdq_test_redeliver() -> rabbit_disk_queue:tx_commit(q, [], []), io:format("Redeliver and acking done~n", []), %% we should now get the 2nd half in order, followed by every-other-from-the-first-half - Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1+Half, Total)], + Seqs2 = [begin + Remaining = round(Total - N + (Half/2)), + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1+Half, Total)], rabbit_disk_queue:tx_commit(q, [], Seqs2), - Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1, Half, 2)], + Seqs3 = [begin + Remaining = round((Half - N) / 2) - 1, + {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1, Half, 2)], rabbit_disk_queue:tx_commit(q, [], Seqs3), empty = rabbit_disk_queue:deliver(q), rdq_stop(), @@ -881,8 +884,11 @@ rdq_test_purge() -> rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half - Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), rabbit_disk_queue:purge(q), io:format("Purge done~n", []), @@ -897,13 +903,14 @@ rdq_time_commands(Funcs) -> rdq_virgin() -> {Micros, {ok, _}} = - timer:tc(rabbit_disk_queue, start_link, [1024*1024]), + timer:tc(rabbit_disk_queue, start_link, []), ok = rabbit_disk_queue:stop_and_obliterate(), timer:sleep(1000), Micros. rdq_start() -> - {ok, _} = rabbit_disk_queue:start_link(1024*1024). + {ok, _} = rabbit_disk_queue:start_link(), + rabbit_disk_queue:to_ram_disk_mode(). rdq_stop() -> rabbit_disk_queue:stop(), |
