diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-05 17:43:32 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | d389d045df3f60bb19511d3240f3cee7c1de534c (patch) | |
| tree | 77370a074a006a4bbdbec92c8e93a9aadefa2b5d /test | |
| parent | 04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (diff) | |
| download | rabbitmq-server-git-d389d045df3f60bb19511d3240f3cee7c1de534c.tar.gz | |
Refactor channel state
To put static fields into a nested record to avoid some copying on
update.
Also exit the channel when a basic.get times out.
[#164212469]
Diffstat (limited to 'test')
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 123 |
1 files changed, 96 insertions, 27 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index dc2678ba10..00d597c75a 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -53,13 +53,14 @@ groups() -> consume_and_requeue_multiple_nack, consume_and_multiple_nack, consumer_timeout, + consumer_timeout_basic_get, basic_cancel, purge, basic_recover, delete_immediately_by_resource ], [ - {parallel_tests, [], + {parallel_tests, [], [ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, @@ -135,7 +136,9 @@ init_per_group(Group, Config0) -> true -> ClusterSize = 2, Config = rabbit_ct_helpers:merge_app_env( - Config0, {rabbit, [{consumer_timeout, 5000}]}), + Config0, {rabbit, [{channel_queue_cleanup_interval, 1000}, + {quorum_tick_interval, 1000}, + {consumer_timeout, 15000}]}), Config1 = rabbit_ct_helpers:set_config( Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -196,7 +199,7 @@ consume_first_empty(Config) -> consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - consume(Ch, QName, [<<"msg1">>]), + consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). consume_from_empty_queue(Config) -> @@ -271,7 +274,9 @@ consume_and_ack(Config) -> [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -284,7 +289,9 @@ consume_and_multiple_ack(Config) -> wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -299,7 +306,9 @@ subscribe_and_ack(Config) -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -317,7 +326,9 @@ subscribe_and_multiple_ack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -346,7 +357,9 @@ subscribe_and_requeue_multiple_nack(Config) -> multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -360,7 +373,9 @@ consume_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -374,7 +389,9 @@ consume_and_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -388,7 +405,9 @@ consume_and_requeue_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -402,7 +421,9 @@ consume_and_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consumer_timeout(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -419,13 +440,39 @@ consumer_timeout(Config) -> #'basic.cancel'{ } -> wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), ok - after 20000 -> + after 30000 -> flush(1), exit(cancel_never_happened) end after 5000 -> exit(deliver_timeout) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. + +consumer_timeout_basic_get(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_DelTag] = consume(Ch, QName, [<<"msg1">>]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -449,7 +496,9 @@ subscribe_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -467,7 +516,9 @@ subscribe_and_nack(Config) -> multiple = false, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -487,7 +538,9 @@ subscribe_and_multiple_nack(Config) -> multiple = true, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. %% TODO test with single active basic_cancel(Config) -> @@ -498,11 +551,12 @@ basic_cancel(Config) -> publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - subscribe(Ch, QName, false), + CTag = atom_to_binary(?FUNCTION_NAME, utf8), + subscribe(Ch, QName, false, CTag), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), ?assertEqual([], lists:filter(fun(Props) -> @@ -515,7 +569,9 @@ basic_cancel(Config) -> wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) after 5000 -> exit(basic_deliver_timeout) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. purge(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -527,7 +583,9 @@ purge(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. basic_recover(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -539,7 +597,9 @@ basic_recover(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_fails(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -557,7 +617,9 @@ delete_immediately_by_pid_fails(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_succeeds(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -576,7 +638,9 @@ delete_immediately_by_pid_succeeds(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_resource(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -594,7 +658,9 @@ delete_immediately_by_resource(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. %%%%%%%%%%%%%%%%%%%%%%%% %% Test helpers @@ -626,12 +692,15 @@ consume_empty(Ch, QName) -> amqp_channel:call(Ch, #'basic.get'{queue = QName})). subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = <<"ctag">>}, + consumer_tag = Ctag}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = Ctag} -> ok end. |
