summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-04 15:14:18 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-04 15:14:18 +0100
commitc384ec140a7c32631f2df37d39d6ad1bf50dc020 (patch)
tree625b5e89a3d6929c52096920d150da9a9f7c5e46 /src
parent4ff65bec96c96a3a7b2fa5a13f0902a714c31e4a (diff)
downloadrabbitmq-server-git-c384ec140a7c32631f2df37d39d6ad1bf50dc020.tar.gz
sorted out the disk_queue tests which had been left behind with the last set of API changes. By fixing them, discovered a bug in the disk queue. Also made the tests a little more rigorous, and discovered a the rdq_stress_gc test was not doing anything like what I'd wanted. Fixed.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl2
-rw-r--r--src/rabbit_tests.erl119
2 files changed, 64 insertions, 57 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8fb5b90505..da25e52495 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1247,7 +1247,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] ->
true = ets:insert_new(Sequences,
- {Q, SeqId, NextWrite});
+ {Q, SeqId, NextWrite, -1});
[Orig = {Q, Read, Write, Length}] ->
Repl = {Q, lists:min([Read, SeqId]),
%% Length is wrong here, but
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 597b0a76e6..46c641fc93 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -689,6 +689,7 @@ delete_log_handlers(Handlers) ->
ok.
test_disk_queue() ->
+ rdq_stop(),
% unicode chars are supported properly from r13 onwards
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
@@ -707,7 +708,6 @@ test_disk_queue() ->
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
- rabbit_disk_queue:to_ram_disk_mode(),
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
@@ -721,9 +721,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
{Deliver, ok} =
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [begin SeqIds =
- [begin {N, Msg, MsgSizeBytes, false, SeqId} =
- rabbit_disk_queue:deliver(Q), SeqId end
- || N <- List],
+ [begin
+ Remaining = MsgCount - N,
+ {N, Msg, MsgSizeBytes, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(Q),
+ SeqId
+ end || N <- List],
ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
end || Q <- Qs]
end]]),
@@ -747,53 +750,36 @@ rdq_stress_gc(MsgCount) ->
rabbit_disk_queue:tx_commit(q, List, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
- lists:reverse(
- lists:foldl(
- fun (E, Acc) ->
- case Acc of
- [] -> [E];
- [F|_Fs] ->
- case E rem F of
- 0 -> Acc;
- _ -> [E|Acc]
- end
- end
- end, [], lists:flatten([lists:seq(N,MsgCount,N)
- || N <- lists:seq(StartChunk,MsgCount)]))) ++
- lists:seq(1, (StartChunk - 1)),
+ lists:foldl(
+ fun (E, Acc) ->
+ case lists:member(E, Acc) of
+ true -> Acc;
+ false -> [E|Acc]
+ end
+ end, [], lists:flatten(
+ lists:reverse(
+ [ lists:seq(N, MsgCount, N)
+ || N <- lists:seq(1, round(MsgCount / 2), 1)
+ ]))),
+ {Start, End} = lists:split(StartChunk, AckList),
+ AckList2 = End ++ Start,
MsgIdToSeqDict =
lists:foldl(
- fun (_, Acc) ->
- {MsgId, Msg, MsgSizeBytes, false, SeqId} =
+ fun (MsgId, Acc) ->
+ Remaining = MsgCount - MsgId,
+ {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
dict:store(MsgId, SeqId, Acc)
end, dict:new(), List),
%% we really do want to ack each of this individually
[begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
- rabbit_disk_queue:ack(q, [SeqId]) end
- || MsgId <- AckList],
+ rabbit_disk_queue:ack(q, [SeqId])
+ end || MsgId <- AckList2],
rabbit_disk_queue:tx_commit(q, [], []),
+ empty = rabbit_disk_queue:deliver(q),
rdq_stop(),
passed.
-rdq_time_insane_startup() ->
- rdq_virgin(),
- OneGig = 1024*1024*1024,
- rabbit_disk_queue:start_link(OneGig),
- rabbit_disk_queue:to_ram_disk_mode(),
- Msg = <<>>,
- Count = 100000,
- List = lists:seq(1, Count),
- %% 1M empty messages, at say, 100B per message, should all fit
- %% within 1GB and thus in a single file
- io:format("Publishing ~p empty messages...~n",[Count]),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
- rabbit_disk_queue:tx_commit(q, List, []),
- io:format("...done. Timing restart...~n", []),
- rdq_stop(),
- Micros = rdq_virgin(),
- io:format("...startup took ~w microseconds.~n", [Micros]).
-
rdq_test_startup_with_queue_gaps() ->
rdq_virgin(),
rdq_start(),
@@ -805,8 +791,10 @@ rdq_test_startup_with_queue_gaps() ->
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
- Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId
+ end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% ack every other message we have delivered (starting at the _first_)
lists:foldl(fun (SeqId2, true) ->
@@ -821,13 +809,19 @@ rdq_test_startup_with_queue_gaps() ->
rdq_start(),
io:format("Startup (with shuffle) done~n", []),
%% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered
- Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(2,Half,2)],
+ Seqs2 = [begin
+ Remaining = round(Total - ((Half + N)/2)),
+ {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(2,Half,2)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
io:format("Reread non-acked messages done~n", []),
%% and now fetch the rest
- Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1 + Half,Total)],
+ Seqs3 = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1 + Half,Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
io:format("Read second half done~n", []),
empty = rabbit_disk_queue:deliver(q),
@@ -845,8 +839,11 @@ rdq_test_redeliver() ->
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
- Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% now requeue every other message (starting at the _first_)
%% and ack the other ones
@@ -860,11 +857,17 @@ rdq_test_redeliver() ->
rabbit_disk_queue:tx_commit(q, [], []),
io:format("Redeliver and acking done~n", []),
%% we should now get the 2nd half in order, followed by every-other-from-the-first-half
- Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1+Half, Total)],
+ Seqs2 = [begin
+ Remaining = round(Total - N + (Half/2)),
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1+Half, Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
- Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1, Half, 2)],
+ Seqs3 = [begin
+ Remaining = round((Half - N) / 2) - 1,
+ {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1, Half, 2)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
empty = rabbit_disk_queue:deliver(q),
rdq_stop(),
@@ -881,8 +884,11 @@ rdq_test_purge() ->
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
- Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
rabbit_disk_queue:purge(q),
io:format("Purge done~n", []),
@@ -897,13 +903,14 @@ rdq_time_commands(Funcs) ->
rdq_virgin() ->
{Micros, {ok, _}} =
- timer:tc(rabbit_disk_queue, start_link, [1024*1024]),
+ timer:tc(rabbit_disk_queue, start_link, []),
ok = rabbit_disk_queue:stop_and_obliterate(),
timer:sleep(1000),
Micros.
rdq_start() ->
- {ok, _} = rabbit_disk_queue:start_link(1024*1024).
+ {ok, _} = rabbit_disk_queue:start_link(),
+ rabbit_disk_queue:to_ram_disk_mode().
rdq_stop() ->
rabbit_disk_queue:stop(),