summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-07-04 13:01:57 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-07-04 13:01:57 +0100
commitf1fe6001f2fa0a28d59ebd5a6197c38753a7b4c6 (patch)
tree645e91bdacbe3f2345bbf4e2db965b9824675a91 /test
parent2170396194b3c0cbab6146d508f7bd5e669bdc39 (diff)
parent33a7f97c4a471541adf05368d92862af4087c4a2 (diff)
downloadrabbitmq-server-git-f1fe6001f2fa0a28d59ebd5a6197c38753a7b4c6.tar.gz
Merge branch 'master' into qq-sync-op-fixes
Diffstat (limited to 'test')
-rw-r--r--test/channel_source_SUITE.erl13
-rw-r--r--test/quorum_queue_SUITE.erl12
-rw-r--r--test/simple_ha_SUITE.erl70
-rw-r--r--test/unit_SUITE.erl8
4 files changed, 94 insertions, 9 deletions
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
index 5211dd7510..d56e9fed0f 100644
--- a/test/channel_source_SUITE.erl
+++ b/test/channel_source_SUITE.erl
@@ -135,8 +135,19 @@ undefined_channel_source(Config) ->
undefined_channel_source1(_Config) ->
ExistingChannels = rabbit_channel:list(),
{_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
- [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ wait_for_server_channel(ExistingChannels, ServerCh, 60),
[{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
_ = rabbit_channel:source(ServerCh, ?MODULE),
[{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
passed.
+
+wait_for_server_channel(ExistingChannels, ServerCh, 0) ->
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels;
+wait_for_server_channel(ExistingChannels, ServerCh, Attempts) ->
+ case rabbit_channel:list() -- ExistingChannels of
+ [ServerCh] ->
+ ok;
+ _ ->
+ timer:sleep(1000),
+ wait_for_server_channel(ExistingChannels, ServerCh, Attempts - 1)
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 1d9789fe89..cc0e7aa75a 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1222,7 +1222,7 @@ add_member_not_running(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, 'rabbit@burrow'])).
+ [<<"/">>, QQ, 'rabbit@burrow', 5000])).
add_member_classic(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1231,7 +1231,7 @@ add_member_classic(Config) ->
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
?assertEqual({error, classic_queue_not_supported},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, CQ, Server])).
+ [<<"/">>, CQ, Server, 5000])).
add_member_already_a_member(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1242,14 +1242,14 @@ add_member_already_a_member(Config) ->
%% idempotent by design
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server])).
+ [<<"/">>, QQ, Server, 5000])).
add_member_not_found(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QQ = ?config(queue_name, Config),
?assertEqual({error, not_found},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server])).
+ [<<"/">>, QQ, Server, 5000])).
add_member(Config) ->
[Server0, Server1] = Servers0 =
@@ -1260,12 +1260,12 @@ add_member(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server1])),
+ [<<"/">>, QQ, Server1, 5000])),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server1])),
+ [<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Servers = lists:sort(Servers0),
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index b2caff86a9..d2137a686b 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@@ -39,7 +40,8 @@ groups() ->
{cluster_size_2, [], [
rapid_redeclare,
declare_synchrony,
- clean_up_exclusive_queues
+ clean_up_exclusive_queues,
+ clean_up_and_redeclare_exclusive_queues_on_other_nodes
]},
{cluster_size_3, [], [
consume_survives_stop,
@@ -160,6 +162,43 @@ clean_up_exclusive_queues(Config) ->
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
ok.
+clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
+ QueueCount = 10,
+ QueueNames = lists:map(fun(N) ->
+ NBin = erlang:integer_to_binary(N),
+ <<"exclusive-q-", NBin/binary>>
+ end, lists:seq(1, QueueCount)),
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ LocationMinMasters = [
+ {<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
+ ],
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+
+ ok = rabbit_ct_broker_helpers:kill_node(Config, B),
+
+ Cancels = receive_cancels([]),
+ ?assert(length(Cancels) > 0),
+
+ RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
+
+ ?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
+
+ lists:foreach(fun(QueueName) ->
+ declare_exclusive(Ch, QueueName, LocationMinMasters),
+ true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
+ subscribe(Ch, QueueName)
+ end, QueueNames),
+ Messages = receive_messages([]),
+ ?assertEqual(10, length(Messages)),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
@@ -286,3 +325,32 @@ open_incapable_channel(NodePort) ->
client_properties = Props}),
{ok, Ch} = amqp_connection:open_channel(ConsConn),
Ch.
+
+declare_exclusive(Ch, QueueName, Args) ->
+ Declare = #'queue.declare'{queue = QueueName,
+ exclusive = true,
+ arguments = Args
+ },
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, Declare).
+
+subscribe(Ch, QueueName) ->
+ ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName,
+ no_ack = true}),
+ #'basic.consume_ok'{} = ConsumeOk,
+ receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end.
+
+receive_cancels(Cancels) ->
+ receive
+ #'basic.cancel'{} = C ->
+ receive_cancels([C|Cancels])
+ after ?DELAY ->
+ Cancels
+ end.
+
+receive_messages(All) ->
+ receive
+ {#'basic.deliver'{}, Msg} ->
+ receive_messages([Msg|All])
+ after ?DELAY ->
+ lists:reverse(All)
+ end.
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index b19289e818..6582c24b1e 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -32,7 +32,9 @@ all() ->
groups() ->
[
{parallel_tests, [parallel], [
- auth_backend_internal_expand_topic_permission,
+ {access_control, [parallel], [
+ auth_backend_internal_expand_topic_permission
+ ]},
{basic_header_handling, [parallel], [
write_table_with_invalid_existing_type,
invalid_existing_headers,
@@ -893,6 +895,10 @@ listing_plugins_from_multiple_directories(Config) ->
end,
ok.
+%%
+%% Access Control
+%%
+
auth_backend_internal_expand_topic_permission(_Config) ->
ExpandMap = #{<<"username">> => <<"guest">>, <<"vhost">> => <<"default">>},
%% simple case