summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-05 17:43:32 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commitd389d045df3f60bb19511d3240f3cee7c1de534c (patch)
tree77370a074a006a4bbdbec92c8e93a9aadefa2b5d /test
parent04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (diff)
downloadrabbitmq-server-git-d389d045df3f60bb19511d3240f3cee7c1de534c.tar.gz
Refactor channel state
To put static fields into a nested record to avoid some copying on update. Also exit the channel when a basic.get times out. [#164212469]
Diffstat (limited to 'test')
-rw-r--r--test/queue_parallel_SUITE.erl123
1 files changed, 96 insertions, 27 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index dc2678ba10..00d597c75a 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -53,13 +53,14 @@ groups() ->
consume_and_requeue_multiple_nack,
consume_and_multiple_nack,
consumer_timeout,
+ consumer_timeout_basic_get,
basic_cancel,
purge,
basic_recover,
delete_immediately_by_resource
],
[
- {parallel_tests, [],
+ {parallel_tests, [],
[
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
@@ -135,7 +136,9 @@ init_per_group(Group, Config0) ->
true ->
ClusterSize = 2,
Config = rabbit_ct_helpers:merge_app_env(
- Config0, {rabbit, [{consumer_timeout, 5000}]}),
+ Config0, {rabbit, [{channel_queue_cleanup_interval, 1000},
+ {quorum_tick_interval, 1000},
+ {consumer_timeout, 15000}]}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -196,7 +199,7 @@ consume_first_empty(Config) ->
consume_empty(Ch, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- consume(Ch, QName, [<<"msg1">>]),
+ consume(Ch, QName, true, [<<"msg1">>]),
rabbit_ct_client_helpers:close_channel(Ch).
consume_from_empty_queue(Config) ->
@@ -271,7 +274,9 @@ consume_and_ack(Config) ->
[DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -284,7 +289,9 @@ consume_and_multiple_ack(Config) ->
wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -299,7 +306,9 @@ subscribe_and_ack(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -317,7 +326,9 @@ subscribe_and_multiple_ack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -346,7 +357,9 @@ subscribe_and_requeue_multiple_nack(Config) ->
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -360,7 +373,9 @@ consume_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -374,7 +389,9 @@ consume_and_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -388,7 +405,9 @@ consume_and_requeue_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -402,7 +421,9 @@ consume_and_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consumer_timeout(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -419,13 +440,39 @@ consumer_timeout(Config) ->
#'basic.cancel'{ } ->
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
ok
- after 20000 ->
+ after 30000 ->
flush(1),
exit(cancel_never_happened)
end
after 5000 ->
exit(deliver_timeout)
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
+consumer_timeout_basic_get(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_DelTag] = consume(Ch, QName, [<<"msg1">>]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -449,7 +496,9 @@ subscribe_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -467,7 +516,9 @@ subscribe_and_nack(Config) ->
multiple = false,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -487,7 +538,9 @@ subscribe_and_multiple_nack(Config) ->
multiple = true,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%% TODO test with single active
basic_cancel(Config) ->
@@ -498,11 +551,12 @@ basic_cancel(Config) ->
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- subscribe(Ch, QName, false),
+ CTag = atom_to_binary(?FUNCTION_NAME, utf8),
+ subscribe(Ch, QName, false, CTag),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
?assertEqual([], lists:filter(fun(Props) ->
@@ -515,7 +569,9 @@ basic_cancel(Config) ->
wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
after 5000 ->
exit(basic_deliver_timeout)
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
purge(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -527,7 +583,9 @@ purge(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
{'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
basic_recover(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -539,7 +597,9 @@ basic_recover(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_fails(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -557,7 +617,9 @@ delete_immediately_by_pid_fails(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_succeeds(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -576,7 +638,9 @@ delete_immediately_by_pid_succeeds(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_resource(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -594,7 +658,9 @@ delete_immediately_by_resource(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%%%%%%%%%%%%%%%%%%%%%%%%
%% Test helpers
@@ -626,12 +692,15 @@ consume_empty(Ch, QName) ->
amqp_channel:call(Ch, #'basic.get'{queue = QName})).
subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
- consumer_tag = <<"ctag">>},
+ consumer_tag = Ctag},
self()),
receive
- #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
ok
end.