summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--test/queue_parallel_SUITE.erl27
2 files changed, 27 insertions, 2 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 21b20d406d..e3b23cfbca 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -2068,6 +2068,8 @@ do_combine_files(SourceSummary, DestinationSummary,
{#file_summary.file_size, TotalValidData}]),
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
+ rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
+ [Source, Destination, Reclaimed]),
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index 632a314d21..f8a039dc65 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -60,8 +60,10 @@ groups() ->
[
{parallel_tests, [],
[
- {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
- {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
@@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
ok.
+trigger_message_store_compaction(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ N = 12000,
+ [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
+ wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),
+
+ AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
+ ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),
+
+ [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
+ multiple = false}) || Tag <- ToAck],
+
+ %% give compaction a moment to start in and finish
+ timer:sleep(5000),
+ amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),