diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 18:23:21 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 18:23:21 +0100 |
| commit | 1f052db8900e388535249603c28e44ee3e40a607 (patch) | |
| tree | 99ea50e7b4323dcae15b1302cbe7c58aa8b014bf /test | |
| parent | b4d354546d44799f9cdab979c02eecb4245e67ad (diff) | |
| parent | bc662bc0a807f5b0ad0681fb2bcf49534f93729c (diff) | |
| download | rabbitmq-server-git-1f052db8900e388535249603c28e44ee3e40a607.tar.gz | |
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
Conflicts:
src/rabbit_fifo.erl
src/rabbit_quorum_queue.erl
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 82 |
1 files changed, 78 insertions, 4 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index c94fb9ddab..d698618494 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -120,6 +120,7 @@ all_tests() -> delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, + message_bytes_metrics, memory_alarm_rolls_wal ]. @@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) -> requeue = true}), %% wait for requeueing timer:sleep(500), - + {#'basic.get_ok'{delivery_tag = DeliveryTag1, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H1}}} = @@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) -> ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, multiple = false, - requeue = true}). + requeue = true}), + ok. -memory_alarm_rolls_wal(Config) -> +message_bytes_metrics(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + QRes = rabbit_misc:r(<<"/">>, queue, QQ), + + publish(Ch, QQ), + + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {3, 3, 0} == get_message_bytes(Leader, QRes) + end), + + subscribe(Ch, QQ, false), + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_until(fun() -> + {3, 0, 3} == get_message_bytes(Leader, QRes) + end), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {0, 0, 0} == get_message_bytes(Leader, QRes) + end) + end, + + %% Let's publish and then close the consumer channel. Messages must be + %% returned to the queue + publish(Ch, QQ), + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_until(fun() -> + {3, 0, 3} == get_message_bytes(Leader, QRes) + end), + + rabbit_ct_client_helpers:close_channel(Ch), + + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_until(fun() -> + {3, 3, 0} == get_message_bytes(Leader, QRes) + end), + ok. + +memory_alarm_rolls_wal(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []), [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"), ok = rpc:call(Server, rabbit_alarm, set_alarm, @@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) -> [{{resource_limit, memory, Server}, []}]), timer:sleep(1000), [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"), - ?assert(Wal1 == Wal2). + ?assert(Wal1 == Wal2), + ok = rpc:call(Server, rabbit_alarm, clear_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + ok. %%---------------------------------------------------------------------------- @@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) -> fun(N) -> case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of {ok, {_, Msgs}, _} -> + ct:pal("Msgs ~w", [Msgs]), Msgs; _ -> undefined @@ -2258,3 +2321,14 @@ delete_queues() -> stop_node(Config, Server) -> rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]). + +get_message_bytes(Leader, QRes) -> + case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of + [{QRes, Props, _}] -> + {proplists:get_value(message_bytes, Props), + proplists:get_value(message_bytes_ready, Props), + proplists:get_value(message_bytes_unacknowledged, Props)}; + _ -> + [] + end. + |
