diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2016-09-27 11:20:29 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2016-09-27 11:20:29 +0200 |
| commit | 25de0db933521881673c3889865815abc82b75e7 (patch) | |
| tree | a182ea38e97faed13bc0ea94a9bbe641b3377dcf /test | |
| parent | e7ae83f5cf81b103f3f1e46a64a1e0053dc4ba74 (diff) | |
| parent | a40b517c4f20be94bc007e83541e18a3da85b5f5 (diff) | |
| download | rabbitmq-server-git-25de0db933521881673c3889865815abc82b75e7.tar.gz | |
Merge branch 'stable' into rabbitmq-management-236
Diffstat (limited to 'test')
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 154 | ||||
| -rw-r--r-- | test/gm_SUITE.erl | 47 | ||||
| -rw-r--r-- | test/health_check_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/inet_proxy_dist.erl | 201 | ||||
| -rw-r--r-- | test/inet_tcp_proxy.erl | 134 | ||||
| -rw-r--r-- | test/inet_tcp_proxy_manager.erl | 107 | ||||
| -rw-r--r-- | test/partitions_SUITE.erl | 23 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 146 | ||||
| -rw-r--r-- | test/unit_SUITE.erl | 45 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 8 |
10 files changed, 420 insertions, 464 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 5872d97d4c..bba7fad707 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -31,6 +31,7 @@ %% The first two are change_policy, the last two are change_cluster -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -61,6 +62,10 @@ groups() -> {cluster_size_3, [], [ change_policy, rapid_change + % FIXME: Re-enable those tests when the know issues are + % fixed. + %failing_random_policies, + %random_policy ]} ]} ]. @@ -137,7 +142,7 @@ change_policy(Config) -> assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]), %% Clear the policy, and we go back to non-mirrored - rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), assert_slaves(A, ?QNAME, {A, ''}), %% Test switching "away" from an unmirrored node @@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) -> after 0 -> rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY, <<"all">>), - rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), rapid_loop(Config, Node, MRef) end. @@ -253,6 +258,23 @@ promote_on_shutdown(Config) -> durable = true}), ok. +random_policy(Config) -> + run_proper(fun prop_random_policy/1, [Config]). + +failing_random_policies(Config) -> + [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + %% Those set of policies were found as failing by PropEr in the + %% `random_policy` test above. We add them explicitely here to make + %% sure they get tested. + ?assertEqual(true, test_random_policy(Config, Nodes, + [{nodes, [A, B]}, {nodes, [A]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [{exactly, 3}, undefined, all, {nodes, [B]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3}, + undefined, {exactly, 3}, all])). + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> @@ -327,3 +349,131 @@ get_stacktrace() -> _:e -> erlang:get_stacktrace() end. + +%%---------------------------------------------------------------------------- +run_proper(Fun, Args) -> + ?assertEqual(true, + proper:counterexample(erlang:apply(Fun, Args), + [{numtests, 25}, + {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +prop_random_policy(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + ?FORALL( + Policies, non_empty(list(policy_gen(Nodes))), + test_random_policy(Config, Nodes, Policies)). + +test_random_policy(Config, Nodes, Policies) -> + [NodeA | _] = Nodes, + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), + %% Add some load so mirrors can be busy synchronising + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), + %% Apply policies in parallel on all nodes + apply_in_parallel(Config, Nodes, Policies), + %% Give it some time to generate all internal notifications + timer:sleep(2000), + %% Check the result + Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30), + %% Cleanup + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY), + Result. + +apply_in_parallel(Config, Nodes, Policies) -> + Self = self(), + [spawn_link(fun() -> + [begin + apply_policy(Config, N, Policy) + end || Policy <- Policies], + Self ! parallel_task_done + end) || N <- Nodes], + [receive + parallel_task_done -> + ok + end || _ <- Nodes]. + +%% Proper generators +policy_gen(Nodes) -> + %% Stop mirroring needs to be called often to trigger rabbitmq-server#803 + frequency([{3, undefined}, + {1, all}, + {1, {nodes, nodes_gen(Nodes)}}, + {1, {exactly, choose(1, 3)}} + ]). + +nodes_gen(Nodes) -> + ?LET(List, non_empty(list(oneof(Nodes))), + sets:to_list(sets:from_list(List))). + +%% Checks +wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) -> + %% Ensure the owner/master is able to process a call request, + %% which means that all pending casts have been processed. + %% Use the information returned by owner/master to verify the + %% test result + Info = find_queue(QueueName, NodeA), + Pid = proplists:get_value(pid, Info), + Node = node(Pid), + %% Gets owner/master + case rpc:call(Node, gen_server, call, [Pid, info], 5000) of + {badrpc, _} -> + %% The queue is probably being migrated to another node. + %% Let's wait a bit longer. + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1); + FinalInfo -> + %% The last policy is the final state + LastPolicy = lists:last(TestedPolicies), + case verify_policy(LastPolicy, FinalInfo) of + true -> + true; + false when Tries =:= 1 -> + Policies = rpc:call(Node, rabbit_policy, list, [], 5000), + ct:pal( + "Last policy not applied:~n" + " Queue node: ~s (~p)~n" + " Queue info: ~p~n" + " Configured policies: ~p~n" + " Tested policies: ~p", + [Node, Pid, FinalInfo, Policies, TestedPolicies]), + false; + false -> + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, + Tries - 1) + end + end. + +verify_policy(undefined, Info) -> + %% If the queue is not mirrored, it returns '' + '' == proplists:get_value(slave_pids, Info); +verify_policy(all, Info) -> + 2 == length(proplists:get_value(slave_pids, Info)); +verify_policy({exactly, 1}, Info) -> + %% If the queue is mirrored, it returns a list + [] == proplists:get_value(slave_pids, Info); +verify_policy({exactly, N}, Info) -> + (N - 1) == length(proplists:get_value(slave_pids, Info)); +verify_policy({nodes, Nodes}, Info) -> + Master = node(proplists:get_value(pid, Info)), + Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)], + lists:sort(Nodes) == lists:sort([Master | Slaves]). + +%% Policies +apply_policy(Config, N, undefined) -> + _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY); +apply_policy(Config, N, all) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {nodes, Nodes}) -> + NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes], + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"nodes">>, NNodes}, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {exactly, Exactly}) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"exactly">>, Exactly}, + [{<<"ha-sync-mode">>, <<"automatic">>}]). diff --git a/test/gm_SUITE.erl b/test/gm_SUITE.erl index f5ccf75b70..df73d8ac27 100644 --- a/test/gm_SUITE.erl +++ b/test/gm_SUITE.erl @@ -38,7 +38,9 @@ all() -> broadcast, confirmed_broadcast, member_death, - receive_in_order + receive_in_order, + unexpected_msg, + down_in_members_change ]. init_per_suite(Config) -> @@ -114,6 +116,49 @@ receive_in_order(_Config) -> passed end). +unexpected_msg(_Config) -> + passed = with_two_members( + fun(Pid, _) -> + Pid ! {make_ref(), old_gen_server_answer}, + true = erlang:is_process_alive(Pid), + passed + end). + +down_in_members_change(_Config) -> + %% Setup + ok = gm:create_tables(), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), + passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1), + {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), + passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2), + passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2), + + %% Test. Simulate that the gm group is deleted (forget_group) while + %% processing the 'DOWN' message from the neighbour + process_flag(trap_exit, true), + ok = meck:new(mnesia, [passthrough]), + ok = meck:expect(mnesia, read, fun({gm_group, ?MODULE}) -> + []; + (Key) -> + meck:passthrough([Key]) + end), + gm:leave(Pid2), + Passed = receive + {'EXIT', Pid, shutdown} -> + passed; + {'EXIT', Pid, _} -> + crashed + after 15000 -> + timeout + end, + %% Cleanup + meck:unload(mnesia), + process_flag(trap_exit, false), + passed = Passed. + + do_broadcast(Fun) -> with_two_members(broadcast_fun(Fun)). diff --git a/test/health_check_SUITE.erl b/test/health_check_SUITE.erl index 4d8f56e9d3..50abc97a02 100644 --- a/test/health_check_SUITE.erl +++ b/test/health_check_SUITE.erl @@ -33,6 +33,8 @@ ,ignores_remote_alarms/1 ,detects_local_alarm/1 ,honors_timeout_argument/1 + ,detects_stuck_local_node_monitor/1 + ,ignores_stuck_remote_node_monitor/1 ]). all() -> @@ -47,6 +49,8 @@ groups() -> ,ignores_remote_alarms ,detects_local_alarm ,honors_timeout_argument + ,detects_stuck_local_node_monitor + ,ignores_stuck_remote_node_monitor ]}]. init_per_suite(Config) -> @@ -123,6 +127,21 @@ detects_local_alarm(Config) -> {match, _} = re:run(Str, "resource alarm.*in effect"), ok. +detects_stuck_local_node_monitor(Config) -> + [A|_] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]), + {error, 75, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]), + {match, _} = re:run(Str, "operation node_health_check.*timed out"), + resume_sys_process(Config, A, rabbit_node_monitor), + ok. + +ignores_stuck_remote_node_monitor(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["-t", "5", "node_health_check"]), + resume_sys_process(Config, A, rabbit_node_monitor), + ok. + honors_timeout_argument(Config) -> [A|_] = open_channel_and_declare_queue_everywhere(Config), QPid = suspend_single_queue(Config, A), diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl deleted file mode 100644 index 32b7641a79..0000000000 --- a/test/inet_proxy_dist.erl +++ /dev/null @@ -1,201 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_proxy_dist). - -%% A distribution plugin that uses the usual inet_tcp_dist but allows -%% insertion of a proxy at the receiving end. - -%% inet_*_dist "behaviour" --export([listen/1, accept/1, accept_connection/5, - setup/5, close/1, select/1, is_node_name/1]). - -%% For copypasta from inet_tcp_dist --export([do_setup/6]). --import(error_logger,[error_msg/2]). - --define(REAL, inet_tcp_dist). - -%%---------------------------------------------------------------------------- - -listen(Name) -> ?REAL:listen(Name). -select(Node) -> ?REAL:select(Node). -accept(Listen) -> ?REAL:accept(Listen). -close(Socket) -> ?REAL:close(Socket). -is_node_name(Node) -> ?REAL:is_node_name(Node). - -accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime). - -%% This is copied from inet_tcp_dist, in order to change the -%% output of erl_epmd:port_please/2. - --include_lib("kernel/include/net_address.hrl"). --include_lib("kernel/include/dist_util.hrl"). - -setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> - spawn_opt(?MODULE, do_setup, - [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], - [link, {priority, max}]). - -do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) -> - ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]), - [Name, Address] = splitnode(Node, LongOrShortNames), - case inet:getaddr(Address, inet) of - {ok, Ip} -> - Timer = dist_util:start_timer(SetupTime), - case erl_epmd:port_please(Name, Ip) of - {port, TcpPort, Version} -> - ?trace("port_please(~p) -> version ~p~n", - [Node,Version]), - dist_util:reset_timer(Timer), - %% Modification START - Ret = application:get_env(kernel, - dist_and_proxy_ports_map), - PortsMap = case Ret of - {ok, M} -> M; - undefined -> [] - end, - ProxyPort = case inet_tcp_proxy:is_enabled() of - true -> proplists:get_value(TcpPort, PortsMap, TcpPort); - false -> TcpPort - end, - case inet_tcp:connect(Ip, ProxyPort, - [{active, false}, - {packet,2}]) of - {ok, Socket} -> - {ok, {_, SrcPort}} = inet:sockname(Socket), - ok = inet_tcp_proxy_manager:register( - node(), Node, SrcPort, TcpPort, ProxyPort), - %% Modification END - HSData = #hs_data{ - kernel_pid = Kernel, - other_node = Node, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - other_version = Version, - f_send = fun inet_tcp:send/2, - f_recv = fun inet_tcp:recv/3, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, false}, - {packet, 4}, - nodelay()]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, true}, - {deliver, port}, - {packet, 4}, - nodelay()]) - end, - f_getll = fun inet:getll/1, - f_address = - fun(_,_) -> - #net_address{ - address = {Ip,TcpPort}, - host = Address, - protocol = tcp, - family = inet} - end, - mf_tick = fun tick/1, - mf_getstat = fun inet_tcp_dist:getstat/1, - request_type = Type - }, - dist_util:handshake_we_started(HSData); - R -> - io:format("~p failed! ~p~n", [node(), R]), - %% Other Node may have closed since - %% port_please ! - ?trace("other node (~p) " - "closed since port_please.~n", - [Node]), - ?shutdown(Node) - end; - _ -> - ?trace("port_please (~p) " - "failed.~n", [Node]), - ?shutdown(Node) - end; - _Other -> - ?trace("inet_getaddr(~p) " - "failed (~p).~n", [Node,_Other]), - ?shutdown(Node) - end. - -%% If Node is illegal terminate the connection setup!! -splitnode(Node, LongOrShortNames) -> - case split_node(atom_to_list(Node), $@, []) of - [Name|Tail] when Tail =/= [] -> - Host = lists:append(Tail), - case split_node(Host, $., []) of - [_] when LongOrShortNames =:= longnames -> - error_msg("** System running to use " - "fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), - ?shutdown(Node); - L when length(L) > 1, LongOrShortNames =:= shortnames -> - error_msg("** System NOT running to use fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), - ?shutdown(Node); - _ -> - [Name, Host] - end; - [_] -> - error_msg("** Nodename ~p illegal, no '@' character **~n", - [Node]), - ?shutdown(Node); - _ -> - error_msg("** Nodename ~p illegal **~n", [Node]), - ?shutdown(Node) - end. - -split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; -split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); -split_node([], _, Ack) -> [lists:reverse(Ack)]. - -%% we may not always want the nodelay behaviour -%% for performance reasons - -nodelay() -> - case application:get_env(kernel, dist_nodelay) of - undefined -> - {nodelay, true}; - {ok, true} -> - {nodelay, true}; - {ok, false} -> - {nodelay, false}; - _ -> - {nodelay, true} - end. - -tick(Socket) -> - case inet_tcp:send(Socket, [], [force]) of - {error, closed} -> - self() ! {tcp_closed, Socket}, - {error, closed}; - R -> - R - end. diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl deleted file mode 100644 index 4498b8f952..0000000000 --- a/test/inet_tcp_proxy.erl +++ /dev/null @@ -1,134 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_tcp_proxy). - -%% A TCP proxy for insertion into the Erlang distribution mechanism, -%% which allows us to simulate network partitions. - --export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]). - --define(TABLE, ?MODULE). - -%% This can't start_link because there's no supervision hierarchy we -%% can easily fit it into (we need to survive all application -%% restarts). So we have to do some horrible error handling. - -start(ManagerNode, DistPort, ProxyPort) -> - application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode), - Parent = self(), - Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)), - MRef = erlang:monitor(process, Pid), - receive - ready -> - erlang:demonitor(MRef), - ok; - {'DOWN', MRef, _, _, Reason} -> - {error, Reason} - end. - -reconnect(Nodes) -> - [erlang:disconnect_node(N) || N <- Nodes, N =/= node()], - ok. - -is_enabled() -> - lists:member(?TABLE, ets:all()). - -allow(Node) -> - rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n", - [?MODULE, node(), Node]), - ets:delete(?TABLE, Node). -block(Node) -> - rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n", - [?MODULE, node(), Node]), - ets:insert(?TABLE, {Node, block}). - -%%---------------------------------------------------------------------------- - -error_handler(Thunk) -> - fun () -> - try - Thunk() - catch _:{{nodedown, _}, _} -> - %% The only other node we ever talk to is the test - %% runner; if that's down then the test is nearly - %% over; die quietly. - ok; - _:X -> - io:format(user, "TCP proxy died with ~p~n At ~p~n", - [X, erlang:get_stacktrace()]), - erlang:halt(1) - end - end. - -go(Parent, Port, ProxyPort) -> - ets:new(?TABLE, [public, named_table]), - {ok, Sock} = gen_tcp:listen(ProxyPort, [inet, - {reuseaddr, true}]), - Parent ! ready, - accept_loop(Sock, Port). - -accept_loop(ListenSock, Port) -> - {ok, Sock} = gen_tcp:accept(ListenSock), - Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)), - ok = gen_tcp:controlling_process(Sock, Proxy), - accept_loop(ListenSock, Port). - -run_it(SockIn, Port) -> - case {inet:peername(SockIn), inet:sockname(SockIn)} of - {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} -> - {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort), - case node() of - This -> ok; - _ -> exit({not_me, node(), This}) - end, - {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]), - run_loop({SockIn, SockOut}, Remote, []); - _ -> - ok - end. - -run_loop(Sockets, RemoteNode, Buf0) -> - Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode), - receive - {tcp, Sock, Data} -> - Buf = [Data | Buf0], - case {Block, get(dist_was_blocked)} of - {true, false} -> - put(dist_was_blocked, Block), - rabbit_log:warning( - "(~s) Distribution BLOCKED between ~s and ~s~n", - [?MODULE, node(), RemoteNode]); - {false, S} when S =:= true orelse S =:= undefined -> - put(dist_was_blocked, Block), - rabbit_log:warning( - "(~s) Distribution allowed between ~s and ~s~n", - [?MODULE, node(), RemoteNode]); - _ -> - ok - end, - case Block of - false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)), - run_loop(Sockets, RemoteNode, []); - true -> run_loop(Sockets, RemoteNode, Buf) - end; - {tcp_closed, Sock} -> - gen_tcp:close(other(Sock, Sockets)); - X -> - exit({weirdness, X}) - end. - -other(A, {A, B}) -> B; -other(B, {A, B}) -> A. diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl deleted file mode 100644 index 18255b8d48..0000000000 --- a/test/inet_tcp_proxy_manager.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_tcp_proxy_manager). - -%% The TCP proxies need to decide whether to block based on the node -%% they're running on, and the node connecting to them. The trouble -%% is, they don't have an easy way to determine the latter. Therefore -%% when A connects to B we register the source port used by A here, so -%% that B can later look it up and find out who A is without having to -%% sniff the distribution protocol. -%% -%% That does unfortunately mean that we need a central control -%% thing. We assume here it's running on the node called -%% 'standalone_test' since that's where tests are orchestrated from. -%% -%% Yes, this leaks. For its intended lifecycle, that's fine. - --behaviour(gen_server). - --export([start/0, register/5, lookup/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(NODE, ct). - --record(state, {ports, pending}). - -start() -> - gen_server:start({local, ?MODULE}, ?MODULE, [], []). - -register(_From, _To, _SrcPort, Port, Port) -> - %% No proxy, don't register - ok; -register(From, To, SrcPort, _Port, _ProxyPort) -> - gen_server:call(name(), {register, From, To, SrcPort}, infinity). - -lookup(SrcPort) -> - gen_server:call(name(), {lookup, SrcPort}, infinity). - -controller_node() -> - {ok, ManagerNode} = application:get_env(kernel, - inet_tcp_proxy_manager_node), - ManagerNode. - -name() -> - {?MODULE, controller_node()}. - -%%---------------------------------------------------------------------------- - -init([]) -> - net_kernel:monitor_nodes(true), - {ok, #state{ports = dict:new(), - pending = []}}. - -handle_call({register, FromNode, ToNode, SrcPort}, _From, - State = #state{ports = Ports, - pending = Pending}) -> - {Notify, Pending2} = - lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending), - [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify], - {reply, ok, - State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports), - pending = Pending2}}; - -handle_call({lookup, SrcPort}, From, - State = #state{ports = Ports, pending = Pending}) -> - case dict:find(SrcPort, Ports) of - {ok, {FromNode, ToNode}} -> - {reply, {ok, FromNode, ToNode}, State}; - error -> - {noreply, State#state{pending = [{SrcPort, From} | Pending]}} - end; - -handle_call(_Req, _From, State) -> - {reply, unknown_request, State}. - -handle_cast(_C, State) -> - {noreply, State}. - -handle_info({nodedown, Node}, State = #state{ports = Ports}) -> - Ports1 = dict:filter( - fun (_, {From, To}) -> - Node =/= From andalso Node =/= To - end, Ports), - {noreply, State#state{ports = Ports1}}; - -handle_info(_I, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_, State, _) -> {ok, State}. diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl index 1b901b5940..aa1c1df24f 100644 --- a/test/partitions_SUITE.erl +++ b/test/partitions_SUITE.erl @@ -45,6 +45,8 @@ groups() -> {cluster_size_3, [], [ autoheal, autoheal_after_pause_if_all_down, + autoheal_multiple_partial_partitions, + autoheal_unexpected_finish, ignore, pause_if_all_down_on_blocked, pause_if_all_down_on_down, @@ -307,6 +309,27 @@ do_autoheal(Config) -> Test([{A, B}, {A, C}, {B, C}]), ok. +autoheal_multiple_partial_partitions(Config) -> + set_mode(Config, autoheal), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + [await_listening(N, true) || N <- [A, B, C]], + [await_partitions(N, []) || N <- [A, B, C]], + ok. + +autoheal_unexpected_finish(Config) -> + set_mode(Config, autoheal), + [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]), + Pid ! {autoheal_msg, {autoheal_finished, B}}, + Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]), + ok. + partial_false_positive(Config) -> [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), block([{A, B}]), diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl new file mode 100644 index 0000000000..9305781bda --- /dev/null +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -0,0 +1,146 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbitmqctl_integration_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-export([all/0 + ,groups/0 + ,init_per_suite/1 + ,end_per_suite/1 + ,init_per_group/2 + ,end_per_group/2 + ,init_per_testcase/2 + ,end_per_testcase/2 + ]). + +-export([list_queues_local/1 + ,list_queues_offline/1 + ,list_queues_online/1 + ]). + +all() -> + [{group, list_queues}]. + +groups() -> + [{list_queues, [], + [list_queues_local + ,list_queues_online + ,list_queues_offline + ]}]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(list_queues, Config0) -> + NumNodes = 3, + Config = create_n_node_cluster(Config0, NumNodes), + Config1 = declare_some_queues(Config), + rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1), + Config1; +init_per_group(_, Config) -> + Config. + +create_n_node_cluster(Config0, NumNodes) -> + Config1 = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodes_count, NumNodes}, + {rmq_nodes_clustered, true}]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +declare_some_queues(Config) -> + Nodes = rabbit_ct_helpers:get_config(Config, rmq_nodes), + PerNodeQueues = [ declare_some_queues(Config, NodeNum) + || NodeNum <- lists:seq(0, length(Nodes)-1) ], + rabbit_ct_helpers:set_config(Config, {per_node_queues, PerNodeQueues}). + +declare_some_queues(Config, NodeNum) -> + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, NodeNum), + NumQueues = 5, + Queues = [ list_to_binary(io_lib:format("queue-~b-on-node-~b", [QueueNum, NodeNum])) + || QueueNum <- lists:seq(1, NumQueues) ], + lists:foreach(fun (QueueName) -> + #'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = QueueName, durable = true}) + end, Queues), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + Queues. + +end_per_group(list_queues, Config0) -> + Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of + undefined -> Config0; + C -> C + end, + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config0) -> + rabbit_ct_helpers:testcase_started(Config0, Testcase). + +end_per_testcase(Testcase, Config0) -> + rabbit_ct_helpers:testcase_finished(Config0, Testcase). + +%%---------------------------------------------------------------------------- +%% Test cases +%%---------------------------------------------------------------------------- +list_queues_local(Config) -> + Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))), + Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))), + assert_ctl_queues(Config, 0, ["--local"], Node1Queues), + assert_ctl_queues(Config, 1, ["--local"], Node2Queues), + ok. + +list_queues_online(Config) -> + Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))), + Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))), + OnlineQueues = Node1Queues ++ Node2Queues, + assert_ctl_queues(Config, 0, ["--online"], OnlineQueues), + assert_ctl_queues(Config, 1, ["--online"], OnlineQueues), + ok. + +list_queues_offline(Config) -> + Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))), + OfflineQueues = Node3Queues, + assert_ctl_queues(Config, 0, ["--offline"], OfflineQueues), + assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), + ok. + +%%---------------------------------------------------------------------------- +%% Helpers +%%---------------------------------------------------------------------------- +assert_ctl_queues(Config, Node, Args, Expected0) -> + Expected = lists:sort(Expected0), + Got0 = run_list_queues(Config, Node, Args), + Got = lists:sort(lists:map(fun hd/1, Got0)), + case Got of + Expected -> + ok; + _ -> + ct:pal(error, "Listing queues on node ~p failed. Expected:~n~p~n~nGot:~n~p~n~n", + [Node, Expected, Got]), + exit({list_queues_unexpected_on, Node, Expected, Got}) + end. + +run_list_queues(Config, Node, Args) -> + rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]). diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl index ba0f43f11e..43e812fa3d 100644 --- a/test/unit_SUITE.erl +++ b/test/unit_SUITE.erl @@ -31,7 +31,7 @@ groups() -> [ {parallel_tests, [parallel], [ arguments_parser, - filtering_flags_parsing, + mutually_exclusive_flags_parsing, {basic_header_handling, [parallel], [ write_table_with_invalid_existing_type, invalid_existing_headers, @@ -135,26 +135,41 @@ check_parse_arguments(ExpRes, Fun, As) -> true = SortRes(ExpRes) =:= SortRes(Fun(As)). -filtering_flags_parsing(_Config) -> - Cases = [{[], [], []} - ,{[{"--online", true}], ["--offline", "--online", "--third-option"], [false, true, false]} - ,{[{"--online", true}, {"--third-option", true}, {"--offline", true}], ["--offline", "--online", "--third-option"], [true, true, true]} - ,{[], ["--offline", "--online", "--third-option"], [true, true, true]} - ], - lists:foreach(fun({Vals, Opts, Expect}) -> - case rabbit_cli:filter_opts(Vals, Opts) of - Expect -> +mutually_exclusive_flags_parsing(_Config) -> + Matcher = fun ({ok, Value}, {ok, Value}) -> true; + ({error, Value}, {error, Pattern}) -> + case re:run(Value, Pattern) of + {match, _} -> true; + _ -> false + end; + (_, _) -> false + end, + Spec = [{"--online", online} + ,{"--offline", offline} + ,{"--local", local}], + Default = all, + Cases =[{["--online"], {ok, online}} + ,{[], {ok, Default}} + ,{["--offline"], {ok, offline}} + ,{["--local"], {ok, local}} + ,{["--offline", "--local"], {error, "mutually exclusive"}} + ,{["--offline", "--online"], {error, "mutually exclusive"}} + ,{["--offline", "--local", "--online"], {error, "mutually exclusive"}} + ], + lists:foreach(fun({Opts, Expected}) -> + ExpandedOpts = [ {Opt, true} || Opt <- Opts ], + Got = rabbit_cli:mutually_exclusive_flags(ExpandedOpts, all, Spec), + case Matcher(Got, Expected) of + true -> ok; - Got -> - exit({no_match, Got, Expect, {args, Vals, Opts}}) + false -> + exit({no_match, Got, Expected, {opts, Opts}}) end - end, - Cases). + end, Cases). %% ------------------------------------------------------------------- %% basic_header_handling. %% ------------------------------------------------------------------- - -define(XDEATH_TABLE, [{<<"reason">>, longstr, <<"blah">>}, {<<"queue">>, longstr, <<"foo.bar.baz">>}, diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index e9ecbf5444..98cd77ccbe 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -1684,15 +1684,15 @@ credit_flow_settings(Config) -> credit_flow_settings1(_Config) -> %% default values - passed = test_proc(200, 50), + passed = test_proc(200, 100), - application:set_env(rabbit, credit_flow_default_credit, {100, 20}), - passed = test_proc(100, 20), + application:set_env(rabbit, credit_flow_default_credit, {100, 300}), + passed = test_proc(100, 300), application:unset_env(rabbit, credit_flow_default_credit), % back to defaults - passed = test_proc(200, 50), + passed = test_proc(200, 100), passed. test_proc(InitialCredit, MoreCreditAfter) -> |
