summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl1
-rw-r--r--src/rabbit_tests.erl46
2 files changed, 46 insertions, 1 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3a520ecdca..4d00bc3a29 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -271,6 +271,7 @@
-spec(cache_info/0 :: () -> [{atom(), term()}]).
-spec(report_memory/0 :: () -> 'ok').
-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
+-spec(prefetch/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6d76d23fe9..221279f7ed 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -742,12 +742,13 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
rdq_stop(),
rdq_virgin(),
- passed = rdq_stress_gc(1000),
+ passed = rdq_stress_gc(5000),
passed = rdq_test_startup_with_queue_gaps(),
passed = rdq_test_redeliver(),
passed = rdq_test_purge(),
passed = rdq_test_mixed_queue_modes(),
passed = rdq_test_mode_conversion_mid_txn(),
+ passed = rdq_test_disk_queue_modes(),
rdq_virgin(),
passed.
@@ -1151,6 +1152,49 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
rabbit_mixed_queue:ack(AckTags, MS8)
end,
0 = rabbit_mixed_queue:length(MS9),
+ Msg = rdq_message(0, <<0:256>>),
+ {ok, AckTag, MS10} = rabbit_mixed_queue:publish_delivered(Msg, MS9),
+ {ok,MS11} = rabbit_mixed_queue:ack([{Msg, AckTag}], MS10),
+ 0 = rabbit_mixed_queue:length(MS11),
+ passed.
+
+rdq_test_disk_queue_modes() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half1 = lists:seq(1,round(Total/2)),
+ Half2 = lists:seq(1 + round(Total/2), Total),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half1],
+ ok = rabbit_disk_queue:tx_commit(q, Half1, []),
+ io:format("Publish done~n", []),
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ io:format("To Disk Only done~n", []),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half2],
+ ok = rabbit_disk_queue:tx_commit(q, Half2, []),
+ Seqs = [begin
+ Remaining = Total - N,
+ {Message, _TSize, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- Half1],
+ io:format("Deliver first half done~n", []),
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ io:format("To RAM Disk done~n", []),
+ Seqs2 = [begin
+ Remaining = Total - N,
+ {Message, _TSize, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- Half2],
+ io:format("Deliver second half done~n", []),
+ ok = rabbit_disk_queue:tx_commit(q, [], Seqs),
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ ok = rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
passed.
rdq_time_commands(Funcs) ->