summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-10 17:32:08 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-10 17:32:08 +0100
commit1c182e755f35b0b65a9137cb9d0c4ea027d01e97 (patch)
tree744a9ab3adc1b685da541c983b5c9ac0aff3c20f
parent6d931d531567e4d51e3606ae8b5e2c35e9a59e05 (diff)
downloadrabbitmq-server-git-1c182e755f35b0b65a9137cb9d0c4ea027d01e97.tar.gz
Reset in-memory counts after purging
[#164735591]
-rw-r--r--src/rabbit_fifo.erl6
-rw-r--r--test/quorum_queue_SUITE.erl28
2 files changed, 32 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 274a196bc7..f8f4e78943 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -126,7 +126,7 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
- resource = Resource}}).
+ resource = Resource}}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
@@ -317,7 +317,9 @@ apply(#{index := RaftIdx}, #purge{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
prefix_msgs = {[], []},
- low_msg_num = undefined},
+ low_msg_num = undefined,
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
[]),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 598765e4cb..61f9328855 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -123,6 +123,7 @@ all_tests() ->
queue_length_in_memory_bytes_limit_basic_get,
queue_length_in_memory_bytes_limit_subscribe,
queue_length_in_memory_bytes_limit,
+ queue_length_in_memory_purge,
in_memory
].
@@ -2082,6 +2083,33 @@ queue_length_in_memory_bytes_limit(Config) ->
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+queue_length_in_memory_purge(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ {'queue.purge_ok', 3} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
+
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
in_memory(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),