summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-07-01 11:42:13 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-07-01 11:42:13 +0200
commitfa685982d6c11ac6aa27177726480e18005e0258 (patch)
treec5fb8962fe940595460a4b97d917d047b9a9b314 /test
parent2c439dcbe22443d1f34b267e5870ec3f7249b67c (diff)
parent580896422de202857ab039e64c0cc7fcd5916c9c (diff)
downloadrabbitmq-server-git-fa685982d6c11ac6aa27177726480e18005e0258.tar.gz
Merge branch 'master' into oauth2-credential-expiration-support
Diffstat (limited to 'test')
-rw-r--r--test/simple_ha_SUITE.erl70
1 files changed, 69 insertions, 1 deletions
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index b2caff86a9..d2137a686b 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@@ -39,7 +40,8 @@ groups() ->
{cluster_size_2, [], [
rapid_redeclare,
declare_synchrony,
- clean_up_exclusive_queues
+ clean_up_exclusive_queues,
+ clean_up_and_redeclare_exclusive_queues_on_other_nodes
]},
{cluster_size_3, [], [
consume_survives_stop,
@@ -160,6 +162,43 @@ clean_up_exclusive_queues(Config) ->
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
ok.
+clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
+ QueueCount = 10,
+ QueueNames = lists:map(fun(N) ->
+ NBin = erlang:integer_to_binary(N),
+ <<"exclusive-q-", NBin/binary>>
+ end, lists:seq(1, QueueCount)),
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ LocationMinMasters = [
+ {<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
+ ],
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+
+ ok = rabbit_ct_broker_helpers:kill_node(Config, B),
+
+ Cancels = receive_cancels([]),
+ ?assert(length(Cancels) > 0),
+
+ RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
+
+ ?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
+
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+ Messages = receive_messages([]),
+ ?assertEqual(10, length(Messages)),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
@@ -286,3 +325,32 @@ open_incapable_channel(NodePort) ->
client_properties = Props}),
{ok, Ch} = amqp_connection:open_channel(ConsConn),
Ch.
+
+declare_exclusive(Ch, QueueName, Args) ->
+ Declare = #'queue.declare'{queue = QueueName,
+ exclusive = true,
+ arguments = Args
+ },
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, Declare).
+
+subscribe(Ch, QueueName) ->
+ ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName,
+ no_ack = true}),
+ #'basic.consume_ok'{} = ConsumeOk,
+ receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end.
+
+receive_cancels(Cancels) ->
+ receive
+ #'basic.cancel'{} = C ->
+ receive_cancels([C|Cancels])
+ after ?DELAY ->
+ Cancels
+ end.
+
+receive_messages(All) ->
+ receive
+ {#'basic.deliver'{}, Msg} ->
+ receive_messages([Msg|All])
+ after ?DELAY ->
+ lists:reverse(All)
+ end.