summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-05-10 00:00:47 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-05-10 00:00:47 +0300
commita8deb7500646b2b73d15cedbe4d3ea4d3dd07b2b (patch)
treec8ec8589d05083fec0769a74fdb218fb4acf95e5 /test
parent4b281fcd28d335846acd6ef9bc7f8730ae071f59 (diff)
downloadrabbitmq-server-git-a8deb7500646b2b73d15cedbe4d3ea4d3dd07b2b.tar.gz
Compaction: more logging, a test
Diffstat (limited to 'test')
-rw-r--r--test/queue_parallel_SUITE.erl27
1 files changed, 25 insertions, 2 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index 632a314d21..f8a039dc65 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -60,8 +60,10 @@ groups() ->
[
{parallel_tests, [],
[
- {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
- {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
@@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
ok.
+trigger_message_store_compaction(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ N = 12000,
+ [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
+ wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),
+
+ AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
+ ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),
+
+ [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
+ multiple = false}) || Tag <- ToAck],
+
+ %% give compaction a moment to start in and finish
+ timer:sleep(5000),
+ amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),