summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/queue_parallel_SUITE.erl50
1 files changed, 42 insertions, 8 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index eba0965608..dc2678ba10 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -52,6 +52,7 @@ groups() ->
consume_and_nack,
consume_and_requeue_multiple_nack,
consume_and_multiple_nack,
+ consumer_timeout,
basic_cancel,
purge,
basic_recover,
@@ -129,19 +130,21 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
-init_per_group(Group, Config) ->
+init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Group},
- {rmq_nodes_count, ClusterSize}
- ]),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{consumer_timeout, 5000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
false ->
- rabbit_ct_helpers:run_steps(Config, [])
+ rabbit_ct_helpers:run_steps(Config0, [])
end.
end_per_group(Group, Config) ->
@@ -401,6 +404,29 @@ consume_and_multiple_nack(Config) ->
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+consumer_timeout(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ receive
+ #'basic.cancel'{ } ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ ok
+ after 20000 ->
+ flush(1),
+ exit(cancel_never_happened)
+ end
+ after 5000 ->
+ exit(deliver_timeout)
+ end.
+
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
@@ -614,3 +640,11 @@ receive_basic_deliver(Redelivered) ->
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
ok
end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.