summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorLajos Gerecs <lajos.gerecs@erlang-solutions.com>2019-06-17 14:54:13 +0200
committerLajos Gerecs <lajos.gerecs@erlang-solutions.com>2019-06-21 14:40:44 +0200
commit6d39f9efc4b3737e489ab67476925f4900852801 (patch)
treea393a519e5cc78df0604bbfb0b10e51c5a6692cf /test
parent9a371273abf8877b8cde4b6df5a2746fa035349c (diff)
downloadrabbitmq-server-git-6d39f9efc4b3737e489ab67476925f4900852801.tar.gz
add test for cleanup of exclusive queues in case of restart or partition
When using different than client-local queues exclusive ones can be placed on a different host than the client. When there is a partition or restart these queues are stuck because they are not cleaned up in the nodes as the connection is still alive when the node goes offline, however the queue is dead in the partition. These queues can not be redeclared or consumed on 3.7.x. Related: #2039
Diffstat (limited to 'test')
-rw-r--r--test/simple_ha_SUITE.erl70
1 files changed, 69 insertions, 1 deletions
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index b2caff86a9..d2137a686b 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@@ -39,7 +40,8 @@ groups() ->
{cluster_size_2, [], [
rapid_redeclare,
declare_synchrony,
- clean_up_exclusive_queues
+ clean_up_exclusive_queues,
+ clean_up_and_redeclare_exclusive_queues_on_other_nodes
]},
{cluster_size_3, [], [
consume_survives_stop,
@@ -160,6 +162,43 @@ clean_up_exclusive_queues(Config) ->
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
ok.
+clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
+ QueueCount = 10,
+ QueueNames = lists:map(fun(N) ->
+ NBin = erlang:integer_to_binary(N),
+ <<"exclusive-q-", NBin/binary>>
+ end, lists:seq(1, QueueCount)),
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ LocationMinMasters = [
+ {<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
+ ],
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+
+ ok = rabbit_ct_broker_helpers:kill_node(Config, B),
+
+ Cancels = receive_cancels([]),
+ ?assert(length(Cancels) > 0),
+
+ RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
+
+ ?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
+
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+ Messages = receive_messages([]),
+ ?assertEqual(10, length(Messages)),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
@@ -286,3 +325,32 @@ open_incapable_channel(NodePort) ->
client_properties = Props}),
{ok, Ch} = amqp_connection:open_channel(ConsConn),
Ch.
+
+declare_exclusive(Ch, QueueName, Args) ->
+ Declare = #'queue.declare'{queue = QueueName,
+ exclusive = true,
+ arguments = Args
+ },
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, Declare).
+
+subscribe(Ch, QueueName) ->
+ ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName,
+ no_ack = true}),
+ #'basic.consume_ok'{} = ConsumeOk,
+ receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end.
+
+receive_cancels(Cancels) ->
+ receive
+ #'basic.cancel'{} = C ->
+ receive_cancels([C|Cancels])
+ after ?DELAY ->
+ Cancels
+ end.
+
+receive_messages(All) ->
+ receive
+ {#'basic.deliver'{}, Msg} ->
+ receive_messages([Msg|All])
+ after ?DELAY ->
+ lists:reverse(All)
+ end.