diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-07-01 11:42:13 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-07-01 11:42:13 +0200 |
| commit | fa685982d6c11ac6aa27177726480e18005e0258 (patch) | |
| tree | c5fb8962fe940595460a4b97d917d047b9a9b314 /test | |
| parent | 2c439dcbe22443d1f34b267e5870ec3f7249b67c (diff) | |
| parent | 580896422de202857ab039e64c0cc7fcd5916c9c (diff) | |
| download | rabbitmq-server-git-fa685982d6c11ac6aa27177726480e18005e0258.tar.gz | |
Merge branch 'master' into oauth2-credential-expiration-support
Diffstat (limited to 'test')
| -rw-r--r-- | test/simple_ha_SUITE.erl | 70 |
1 files changed, 69 insertions, 1 deletions
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index b2caff86a9..d2137a686b 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -39,7 +40,8 @@ groups() -> {cluster_size_2, [], [ rapid_redeclare, declare_synchrony, - clean_up_exclusive_queues + clean_up_exclusive_queues, + clean_up_and_redeclare_exclusive_queues_on_other_nodes ]}, {cluster_size_3, [], [ consume_survives_stop, @@ -160,6 +162,43 @@ clean_up_exclusive_queues(Config) -> [[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []), ok. +clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) -> + QueueCount = 10, + QueueNames = lists:map(fun(N) -> + NBin = erlang:integer_to_binary(N), + <<"exclusive-q-", NBin/binary>> + end, lists:seq(1, QueueCount)), + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A), + {ok, Ch} = amqp_connection:open_channel(Conn), + + LocationMinMasters = [ + {<<"x-queue-master-locator">>, longstr, <<"min-masters">>} + ], + lists:foreach(fun(QueueName) -> + declare_exclusive(Ch, QueueName, LocationMinMasters), + subscribe(Ch, QueueName) + end, QueueNames), + + ok = rabbit_ct_broker_helpers:kill_node(Config, B), + + Cancels = receive_cancels([]), + ?assert(length(Cancels) > 0), + + RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []), + + ?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)), + + lists:foreach(fun(QueueName) -> + declare_exclusive(Ch, QueueName, LocationMinMasters), + true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1), + subscribe(Ch, QueueName) + end, QueueNames), + Messages = receive_messages([]), + ?assertEqual(10, length(Messages)), + ok = rabbit_ct_client_helpers:close_connection(Conn). + + consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true). consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true). consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true). @@ -286,3 +325,32 @@ open_incapable_channel(NodePort) -> client_properties = Props}), {ok, Ch} = amqp_connection:open_channel(ConsConn), Ch. + +declare_exclusive(Ch, QueueName, Args) -> + Declare = #'queue.declare'{queue = QueueName, + exclusive = true, + arguments = Args + }, + #'queue.declare_ok'{} = amqp_channel:call(Ch, Declare). + +subscribe(Ch, QueueName) -> + ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName, + no_ack = true}), + #'basic.consume_ok'{} = ConsumeOk, + receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end. + +receive_cancels(Cancels) -> + receive + #'basic.cancel'{} = C -> + receive_cancels([C|Cancels]) + after ?DELAY -> + Cancels + end. + +receive_messages(All) -> + receive + {#'basic.deliver'{}, Msg} -> + receive_messages([Msg|All]) + after ?DELAY -> + lists:reverse(All) + end. |
