diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 27 |
2 files changed, 27 insertions, 2 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 21b20d406d..e3b23cfbca 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -2068,6 +2068,8 @@ do_combine_files(SourceSummary, DestinationSummary, {#file_summary.file_size, TotalValidData}]), Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes", + [Source, Destination, Reclaimed]), gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index 632a314d21..f8a039dc65 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -60,8 +60,10 @@ groups() -> [ {parallel_tests, [], [ - {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, - {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds, + trigger_message_store_compaction]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds, + trigger_message_store_compaction]}, {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} @@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) -> rabbit_ct_client_helpers:close_channel(Ch), ok. +trigger_message_store_compaction(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + N = 12000, + [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)], + wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]), + + AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N), + ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags), + + [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag, + multiple = false}) || Tag <- ToAck], + + %% give compaction a moment to start in and finish + timer:sleep(5000), + amqp_channel:cast(Ch, #'queue.purge'{queue = QName}), + rabbit_ct_client_helpers:close_channel(Ch), + ok. + subscribe_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), |
