diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-12-10 12:36:56 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-14 13:52:09 +0000 |
| commit | 3676541e7a30b69333ea887450c8ddf6e3c3510d (patch) | |
| tree | 2822189583461010a9da9c04c3a24bd8e61be64f /test | |
| parent | 685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e (diff) | |
| download | rabbitmq-server-git-3676541e7a30b69333ea887450c8ddf6e3c3510d.tar.gz | |
Report message bytes in quorum queue stats
[#161505138]
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 82 |
1 files changed, 78 insertions, 4 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index c94fb9ddab..d698618494 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -120,6 +120,7 @@ all_tests() -> delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, + message_bytes_metrics, memory_alarm_rolls_wal ]. @@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) -> requeue = true}), %% wait for requeueing timer:sleep(500), - + {#'basic.get_ok'{delivery_tag = DeliveryTag1, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H1}}} = @@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) -> ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, multiple = false, - requeue = true}). + requeue = true}), + ok. -memory_alarm_rolls_wal(Config) -> +message_bytes_metrics(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + QRes = rabbit_misc:r(<<"/">>, queue, QQ), + + publish(Ch, QQ), + + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {3, 3, 0} == get_message_bytes(Leader, QRes) + end), + + subscribe(Ch, QQ, false), + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_until(fun() -> + {3, 0, 3} == get_message_bytes(Leader, QRes) + end), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {0, 0, 0} == get_message_bytes(Leader, QRes) + end) + end, + + %% Let's publish and then close the consumer channel. Messages must be + %% returned to the queue + publish(Ch, QQ), + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_until(fun() -> + {3, 0, 3} == get_message_bytes(Leader, QRes) + end), + + rabbit_ct_client_helpers:close_channel(Ch), + + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {3, 3, 0} == get_message_bytes(Leader, QRes) + end), + ok. + +memory_alarm_rolls_wal(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []), [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"), ok = rpc:call(Server, rabbit_alarm, set_alarm, @@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) -> [{{resource_limit, memory, Server}, []}]), timer:sleep(1000), [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"), - ?assert(Wal1 == Wal2). + ?assert(Wal1 == Wal2), + ok = rpc:call(Server, rabbit_alarm, clear_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + ok. %%---------------------------------------------------------------------------- @@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) -> fun(N) -> case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of {ok, {_, Msgs}, _} -> + ct:pal("Msgs ~w", [Msgs]), Msgs; _ -> undefined @@ -2258,3 +2321,14 @@ delete_queues() -> stop_node(Config, Server) -> rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]). + +get_message_bytes(Leader, QRes) -> + case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of + [{QRes, Props, _}] -> + {proplists:get_value(message_bytes, Props), + proplists:get_value(message_bytes_ready, Props), + proplists:get_value(message_bytes_unacknowledged, Props)}; + _ -> + [] + end. + |
