diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-03 14:53:01 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | 04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (patch) | |
| tree | e468d787ee460d549d38f57b80290d4d37f31975 /test | |
| parent | 9d898823c30ff0e56d97053764b2708a299c46eb (diff) | |
| download | rabbitmq-server-git-04e4d0b5ba486abb16e873b4d3df6e8792b009e6.tar.gz | |
Implement consumer channel timeouts
Such that if a consumer has a message awaiting ack for longer than this
timeout the consumer will either be cancelled (if the client supports
it) or the channel will be closed to ensure the message does not get
stuck permanently.
Diffstat (limited to 'test')
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 50 |
1 files changed, 42 insertions, 8 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index eba0965608..dc2678ba10 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -52,6 +52,7 @@ groups() -> consume_and_nack, consume_and_requeue_multiple_nack, consume_and_multiple_nack, + consumer_timeout, basic_cancel, purge, basic_recover, @@ -129,19 +130,21 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 2, - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, ClusterSize} - ]), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{consumer_timeout, 5000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); false -> - rabbit_ct_helpers:run_steps(Config, []) + rabbit_ct_helpers:run_steps(Config0, []) end. end_per_group(Group, Config) -> @@ -401,6 +404,29 @@ consume_and_multiple_nack(Config) -> requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). +consumer_timeout(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + receive + #'basic.cancel'{ } -> + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + ok + after 20000 -> + flush(1), + exit(cancel_never_happened) + end + after 5000 -> + exit(deliver_timeout) + end. + subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), @@ -614,3 +640,11 @@ receive_basic_deliver(Redelivered) -> {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> ok end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. |
