diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-10 17:32:08 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-10 17:32:08 +0100 |
| commit | 1c182e755f35b0b65a9137cb9d0c4ea027d01e97 (patch) | |
| tree | 744a9ab3adc1b685da541c983b5c9ac0aff3c20f | |
| parent | 6d931d531567e4d51e3606ae8b5e2c35e9a59e05 (diff) | |
| download | rabbitmq-server-git-1c182e755f35b0b65a9137cb9d0c4ea027d01e97.tar.gz | |
Reset in-memory counts after purging
[#164735591]
| -rw-r--r-- | src/rabbit_fifo.erl | 6 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 28 |
2 files changed, 32 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 274a196bc7..f8f4e78943 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -126,7 +126,7 @@ init(#{name := Name, queue_resource := Resource} = Conf) -> update_config(Conf, #?MODULE{cfg = #cfg{name = Name, - resource = Resource}}). + resource = Resource}}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), @@ -317,7 +317,9 @@ apply(#{index := RaftIdx}, #purge{}, returns = lqueue:new(), msg_bytes_enqueue = 0, prefix_msgs = {[], []}, - low_msg_num = undefined}, + low_msg_num = undefined, + msg_bytes_in_memory = 0, + msgs_ready_in_memory = 0}, []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 598765e4cb..61f9328855 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -123,6 +123,7 @@ all_tests() -> queue_length_in_memory_bytes_limit_basic_get, queue_length_in_memory_bytes_limit_subscribe, queue_length_in_memory_bytes_limit, + queue_length_in_memory_purge, in_memory ]. @@ -2082,6 +2083,33 @@ queue_length_in_memory_bytes_limit(Config) -> ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). +queue_length_in_memory_purge(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + {'queue.purge_ok', 3} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), + + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + in_memory(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
