diff options
| author | Lajos Gerecs <lajos.gerecs@erlang-solutions.com> | 2019-06-17 14:54:13 +0200 |
|---|---|---|
| committer | Lajos Gerecs <lajos.gerecs@erlang-solutions.com> | 2019-06-21 14:40:44 +0200 |
| commit | 6d39f9efc4b3737e489ab67476925f4900852801 (patch) | |
| tree | a393a519e5cc78df0604bbfb0b10e51c5a6692cf /test | |
| parent | 9a371273abf8877b8cde4b6df5a2746fa035349c (diff) | |
| download | rabbitmq-server-git-6d39f9efc4b3737e489ab67476925f4900852801.tar.gz | |
add test for cleanup of exclusive queues in case of restart or partition
When using different than client-local queues exclusive ones can
be placed on a different host than the client. When there is a partition
or restart these queues are stuck because they are not cleaned up in
the nodes as the connection is still alive when the node goes offline,
however the queue is dead in the partition. These queues can not be
redeclared or consumed on 3.7.x.
Related: #2039
Diffstat (limited to 'test')
| -rw-r--r-- | test/simple_ha_SUITE.erl | 70 |
1 files changed, 69 insertions, 1 deletions
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index b2caff86a9..d2137a686b 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -39,7 +40,8 @@ groups() -> {cluster_size_2, [], [ rapid_redeclare, declare_synchrony, - clean_up_exclusive_queues + clean_up_exclusive_queues, + clean_up_and_redeclare_exclusive_queues_on_other_nodes ]}, {cluster_size_3, [], [ consume_survives_stop, @@ -160,6 +162,43 @@ clean_up_exclusive_queues(Config) -> [[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []), ok. +clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) -> + QueueCount = 10, + QueueNames = lists:map(fun(N) -> + NBin = erlang:integer_to_binary(N), + <<"exclusive-q-", NBin/binary>> + end, lists:seq(1, QueueCount)), + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A), + {ok, Ch} = amqp_connection:open_channel(Conn), + + LocationMinMasters = [ + {<<"x-queue-master-locator">>, longstr, <<"min-masters">>} + ], + lists:foreach(fun(QueueName) -> + declare_exclusive(Ch, QueueName, LocationMinMasters), + subscribe(Ch, QueueName) + end, QueueNames), + + ok = rabbit_ct_broker_helpers:kill_node(Config, B), + + Cancels = receive_cancels([]), + ?assert(length(Cancels) > 0), + + RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []), + + ?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)), + + lists:foreach(fun(QueueName) -> + declare_exclusive(Ch, QueueName, LocationMinMasters), + true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1), + subscribe(Ch, QueueName) + end, QueueNames), + Messages = receive_messages([]), + ?assertEqual(10, length(Messages)), + ok = rabbit_ct_client_helpers:close_connection(Conn). + + consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true). consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true). consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true). @@ -286,3 +325,32 @@ open_incapable_channel(NodePort) -> client_properties = Props}), {ok, Ch} = amqp_connection:open_channel(ConsConn), Ch. + +declare_exclusive(Ch, QueueName, Args) -> + Declare = #'queue.declare'{queue = QueueName, + exclusive = true, + arguments = Args + }, + #'queue.declare_ok'{} = amqp_channel:call(Ch, Declare). + +subscribe(Ch, QueueName) -> + ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName, + no_ack = true}), + #'basic.consume_ok'{} = ConsumeOk, + receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end. + +receive_cancels(Cancels) -> + receive + #'basic.cancel'{} = C -> + receive_cancels([C|Cancels]) + after ?DELAY -> + Cancels + end. + +receive_messages(All) -> + receive + {#'basic.deliver'{}, Msg} -> + receive_messages([Msg|All]) + after ?DELAY -> + lists:reverse(All) + end. |
