summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2016-09-27 11:20:29 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2016-09-27 11:20:29 +0200
commit25de0db933521881673c3889865815abc82b75e7 (patch)
treea182ea38e97faed13bc0ea94a9bbe641b3377dcf /test
parente7ae83f5cf81b103f3f1e46a64a1e0053dc4ba74 (diff)
parenta40b517c4f20be94bc007e83541e18a3da85b5f5 (diff)
downloadrabbitmq-server-git-25de0db933521881673c3889865815abc82b75e7.tar.gz
Merge branch 'stable' into rabbitmq-management-236
Diffstat (limited to 'test')
-rw-r--r--test/dynamic_ha_SUITE.erl154
-rw-r--r--test/gm_SUITE.erl47
-rw-r--r--test/health_check_SUITE.erl19
-rw-r--r--test/inet_proxy_dist.erl201
-rw-r--r--test/inet_tcp_proxy.erl134
-rw-r--r--test/inet_tcp_proxy_manager.erl107
-rw-r--r--test/partitions_SUITE.erl23
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl146
-rw-r--r--test/unit_SUITE.erl45
-rw-r--r--test/unit_inbroker_SUITE.erl8
10 files changed, 420 insertions, 464 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 5872d97d4c..bba7fad707 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -31,6 +31,7 @@
%% The first two are change_policy, the last two are change_cluster
-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -61,6 +62,10 @@ groups() ->
{cluster_size_3, [], [
change_policy,
rapid_change
+ % FIXME: Re-enable those tests when the know issues are
+ % fixed.
+ %failing_random_policies,
+ %random_policy
]}
]}
].
@@ -137,7 +142,7 @@ change_policy(Config) ->
assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
%% Clear the policy, and we go back to non-mirrored
- rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
assert_slaves(A, ?QNAME, {A, ''}),
%% Test switching "away" from an unmirrored node
@@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) ->
after 0 ->
rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
<<"all">>),
- rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
rapid_loop(Config, Node, MRef)
end.
@@ -253,6 +258,23 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+random_policy(Config) ->
+ run_proper(fun prop_random_policy/1, [Config]).
+
+failing_random_policies(Config) ->
+ [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ %% Those set of policies were found as failing by PropEr in the
+ %% `random_policy` test above. We add them explicitely here to make
+ %% sure they get tested.
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{nodes, [A, B]}, {nodes, [A]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{exactly, 3}, undefined, all, {nodes, [B]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3},
+ undefined, {exactly, 3}, all])).
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->
@@ -327,3 +349,131 @@ get_stacktrace() ->
_:e ->
erlang:get_stacktrace()
end.
+
+%%----------------------------------------------------------------------------
+run_proper(Fun, Args) ->
+ ?assertEqual(true,
+ proper:counterexample(erlang:apply(Fun, Args),
+ [{numtests, 25},
+ {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])).
+
+prop_random_policy(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ ?FORALL(
+ Policies, non_empty(list(policy_gen(Nodes))),
+ test_random_policy(Config, Nodes, Policies)).
+
+test_random_policy(Config, Nodes, Policies) ->
+ [NodeA | _] = Nodes,
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ %% Add some load so mirrors can be busy synchronising
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
+ %% Apply policies in parallel on all nodes
+ apply_in_parallel(Config, Nodes, Policies),
+ %% Give it some time to generate all internal notifications
+ timer:sleep(2000),
+ %% Check the result
+ Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
+ %% Cleanup
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
+ Result.
+
+apply_in_parallel(Config, Nodes, Policies) ->
+ Self = self(),
+ [spawn_link(fun() ->
+ [begin
+ apply_policy(Config, N, Policy)
+ end || Policy <- Policies],
+ Self ! parallel_task_done
+ end) || N <- Nodes],
+ [receive
+ parallel_task_done ->
+ ok
+ end || _ <- Nodes].
+
+%% Proper generators
+policy_gen(Nodes) ->
+ %% Stop mirroring needs to be called often to trigger rabbitmq-server#803
+ frequency([{3, undefined},
+ {1, all},
+ {1, {nodes, nodes_gen(Nodes)}},
+ {1, {exactly, choose(1, 3)}}
+ ]).
+
+nodes_gen(Nodes) ->
+ ?LET(List, non_empty(list(oneof(Nodes))),
+ sets:to_list(sets:from_list(List))).
+
+%% Checks
+wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
+ %% Ensure the owner/master is able to process a call request,
+ %% which means that all pending casts have been processed.
+ %% Use the information returned by owner/master to verify the
+ %% test result
+ Info = find_queue(QueueName, NodeA),
+ Pid = proplists:get_value(pid, Info),
+ Node = node(Pid),
+ %% Gets owner/master
+ case rpc:call(Node, gen_server, call, [Pid, info], 5000) of
+ {badrpc, _} ->
+ %% The queue is probably being migrated to another node.
+ %% Let's wait a bit longer.
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
+ FinalInfo ->
+ %% The last policy is the final state
+ LastPolicy = lists:last(TestedPolicies),
+ case verify_policy(LastPolicy, FinalInfo) of
+ true ->
+ true;
+ false when Tries =:= 1 ->
+ Policies = rpc:call(Node, rabbit_policy, list, [], 5000),
+ ct:pal(
+ "Last policy not applied:~n"
+ " Queue node: ~s (~p)~n"
+ " Queue info: ~p~n"
+ " Configured policies: ~p~n"
+ " Tested policies: ~p",
+ [Node, Pid, FinalInfo, Policies, TestedPolicies]),
+ false;
+ false ->
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies,
+ Tries - 1)
+ end
+ end.
+
+verify_policy(undefined, Info) ->
+ %% If the queue is not mirrored, it returns ''
+ '' == proplists:get_value(slave_pids, Info);
+verify_policy(all, Info) ->
+ 2 == length(proplists:get_value(slave_pids, Info));
+verify_policy({exactly, 1}, Info) ->
+ %% If the queue is mirrored, it returns a list
+ [] == proplists:get_value(slave_pids, Info);
+verify_policy({exactly, N}, Info) ->
+ (N - 1) == length(proplists:get_value(slave_pids, Info));
+verify_policy({nodes, Nodes}, Info) ->
+ Master = node(proplists:get_value(pid, Info)),
+ Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)],
+ lists:sort(Nodes) == lists:sort([Master | Slaves]).
+
+%% Policies
+apply_policy(Config, N, undefined) ->
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY);
+apply_policy(Config, N, all) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {nodes, Nodes}) ->
+ NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes],
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"nodes">>, NNodes},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {exactly, Exactly}) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"exactly">>, Exactly},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]).
diff --git a/test/gm_SUITE.erl b/test/gm_SUITE.erl
index f5ccf75b70..df73d8ac27 100644
--- a/test/gm_SUITE.erl
+++ b/test/gm_SUITE.erl
@@ -38,7 +38,9 @@ all() ->
broadcast,
confirmed_broadcast,
member_death,
- receive_in_order
+ receive_in_order,
+ unexpected_msg,
+ down_in_members_change
].
init_per_suite(Config) ->
@@ -114,6 +116,49 @@ receive_in_order(_Config) ->
passed
end).
+unexpected_msg(_Config) ->
+ passed = with_two_members(
+ fun(Pid, _) ->
+ Pid ! {make_ref(), old_gen_server_answer},
+ true = erlang:is_process_alive(Pid),
+ passed
+ end).
+
+down_in_members_change(_Config) ->
+ %% Setup
+ ok = gm:create_tables(),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
+ passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
+ passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
+ passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
+
+ %% Test. Simulate that the gm group is deleted (forget_group) while
+ %% processing the 'DOWN' message from the neighbour
+ process_flag(trap_exit, true),
+ ok = meck:new(mnesia, [passthrough]),
+ ok = meck:expect(mnesia, read, fun({gm_group, ?MODULE}) ->
+ [];
+ (Key) ->
+ meck:passthrough([Key])
+ end),
+ gm:leave(Pid2),
+ Passed = receive
+ {'EXIT', Pid, shutdown} ->
+ passed;
+ {'EXIT', Pid, _} ->
+ crashed
+ after 15000 ->
+ timeout
+ end,
+ %% Cleanup
+ meck:unload(mnesia),
+ process_flag(trap_exit, false),
+ passed = Passed.
+
+
do_broadcast(Fun) ->
with_two_members(broadcast_fun(Fun)).
diff --git a/test/health_check_SUITE.erl b/test/health_check_SUITE.erl
index 4d8f56e9d3..50abc97a02 100644
--- a/test/health_check_SUITE.erl
+++ b/test/health_check_SUITE.erl
@@ -33,6 +33,8 @@
,ignores_remote_alarms/1
,detects_local_alarm/1
,honors_timeout_argument/1
+ ,detects_stuck_local_node_monitor/1
+ ,ignores_stuck_remote_node_monitor/1
]).
all() ->
@@ -47,6 +49,8 @@ groups() ->
,ignores_remote_alarms
,detects_local_alarm
,honors_timeout_argument
+ ,detects_stuck_local_node_monitor
+ ,ignores_stuck_remote_node_monitor
]}].
init_per_suite(Config) ->
@@ -123,6 +127,21 @@ detects_local_alarm(Config) ->
{match, _} = re:run(Str, "resource alarm.*in effect"),
ok.
+detects_stuck_local_node_monitor(Config) ->
+ [A|_] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]),
+ {error, 75, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
+ {match, _} = re:run(Str, "operation node_health_check.*timed out"),
+ resume_sys_process(Config, A, rabbit_node_monitor),
+ ok.
+
+ignores_stuck_remote_node_monitor(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["-t", "5", "node_health_check"]),
+ resume_sys_process(Config, A, rabbit_node_monitor),
+ ok.
+
honors_timeout_argument(Config) ->
[A|_] = open_channel_and_declare_queue_everywhere(Config),
QPid = suspend_single_queue(Config, A),
diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl
deleted file mode 100644
index 32b7641a79..0000000000
--- a/test/inet_proxy_dist.erl
+++ /dev/null
@@ -1,201 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_proxy_dist).
-
-%% A distribution plugin that uses the usual inet_tcp_dist but allows
-%% insertion of a proxy at the receiving end.
-
-%% inet_*_dist "behaviour"
--export([listen/1, accept/1, accept_connection/5,
- setup/5, close/1, select/1, is_node_name/1]).
-
-%% For copypasta from inet_tcp_dist
--export([do_setup/6]).
--import(error_logger,[error_msg/2]).
-
--define(REAL, inet_tcp_dist).
-
-%%----------------------------------------------------------------------------
-
-listen(Name) -> ?REAL:listen(Name).
-select(Node) -> ?REAL:select(Node).
-accept(Listen) -> ?REAL:accept(Listen).
-close(Socket) -> ?REAL:close(Socket).
-is_node_name(Node) -> ?REAL:is_node_name(Node).
-
-accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime).
-
-%% This is copied from inet_tcp_dist, in order to change the
-%% output of erl_epmd:port_please/2.
-
--include_lib("kernel/include/net_address.hrl").
--include_lib("kernel/include/dist_util.hrl").
-
-setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- spawn_opt(?MODULE, do_setup,
- [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
- [link, {priority, max}]).
-
-do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
- [Name, Address] = splitnode(Node, LongOrShortNames),
- case inet:getaddr(Address, inet) of
- {ok, Ip} ->
- Timer = dist_util:start_timer(SetupTime),
- case erl_epmd:port_please(Name, Ip) of
- {port, TcpPort, Version} ->
- ?trace("port_please(~p) -> version ~p~n",
- [Node,Version]),
- dist_util:reset_timer(Timer),
- %% Modification START
- Ret = application:get_env(kernel,
- dist_and_proxy_ports_map),
- PortsMap = case Ret of
- {ok, M} -> M;
- undefined -> []
- end,
- ProxyPort = case inet_tcp_proxy:is_enabled() of
- true -> proplists:get_value(TcpPort, PortsMap, TcpPort);
- false -> TcpPort
- end,
- case inet_tcp:connect(Ip, ProxyPort,
- [{active, false},
- {packet,2}]) of
- {ok, Socket} ->
- {ok, {_, SrcPort}} = inet:sockname(Socket),
- ok = inet_tcp_proxy_manager:register(
- node(), Node, SrcPort, TcpPort, ProxyPort),
- %% Modification END
- HSData = #hs_data{
- kernel_pid = Kernel,
- other_node = Node,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- other_version = Version,
- f_send = fun inet_tcp:send/2,
- f_recv = fun inet_tcp:recv/3,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, false},
- {packet, 4},
- nodelay()])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, true},
- {deliver, port},
- {packet, 4},
- nodelay()])
- end,
- f_getll = fun inet:getll/1,
- f_address =
- fun(_,_) ->
- #net_address{
- address = {Ip,TcpPort},
- host = Address,
- protocol = tcp,
- family = inet}
- end,
- mf_tick = fun tick/1,
- mf_getstat = fun inet_tcp_dist:getstat/1,
- request_type = Type
- },
- dist_util:handshake_we_started(HSData);
- R ->
- io:format("~p failed! ~p~n", [node(), R]),
- %% Other Node may have closed since
- %% port_please !
- ?trace("other node (~p) "
- "closed since port_please.~n",
- [Node]),
- ?shutdown(Node)
- end;
- _ ->
- ?trace("port_please (~p) "
- "failed.~n", [Node]),
- ?shutdown(Node)
- end;
- _Other ->
- ?trace("inet_getaddr(~p) "
- "failed (~p).~n", [Node,_Other]),
- ?shutdown(Node)
- end.
-
-%% If Node is illegal terminate the connection setup!!
-splitnode(Node, LongOrShortNames) ->
- case split_node(atom_to_list(Node), $@, []) of
- [Name|Tail] when Tail =/= [] ->
- Host = lists:append(Tail),
- case split_node(Host, $., []) of
- [_] when LongOrShortNames =:= longnames ->
- error_msg("** System running to use "
- "fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- L when length(L) > 1, LongOrShortNames =:= shortnames ->
- error_msg("** System NOT running to use fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- _ ->
- [Name, Host]
- end;
- [_] ->
- error_msg("** Nodename ~p illegal, no '@' character **~n",
- [Node]),
- ?shutdown(Node);
- _ ->
- error_msg("** Nodename ~p illegal **~n", [Node]),
- ?shutdown(Node)
- end.
-
-split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
-split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
-split_node([], _, Ack) -> [lists:reverse(Ack)].
-
-%% we may not always want the nodelay behaviour
-%% for performance reasons
-
-nodelay() ->
- case application:get_env(kernel, dist_nodelay) of
- undefined ->
- {nodelay, true};
- {ok, true} ->
- {nodelay, true};
- {ok, false} ->
- {nodelay, false};
- _ ->
- {nodelay, true}
- end.
-
-tick(Socket) ->
- case inet_tcp:send(Socket, [], [force]) of
- {error, closed} ->
- self() ! {tcp_closed, Socket},
- {error, closed};
- R ->
- R
- end.
diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl
deleted file mode 100644
index 4498b8f952..0000000000
--- a/test/inet_tcp_proxy.erl
+++ /dev/null
@@ -1,134 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy).
-
-%% A TCP proxy for insertion into the Erlang distribution mechanism,
-%% which allows us to simulate network partitions.
-
--export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]).
-
--define(TABLE, ?MODULE).
-
-%% This can't start_link because there's no supervision hierarchy we
-%% can easily fit it into (we need to survive all application
-%% restarts). So we have to do some horrible error handling.
-
-start(ManagerNode, DistPort, ProxyPort) ->
- application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode),
- Parent = self(),
- Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)),
- MRef = erlang:monitor(process, Pid),
- receive
- ready ->
- erlang:demonitor(MRef),
- ok;
- {'DOWN', MRef, _, _, Reason} ->
- {error, Reason}
- end.
-
-reconnect(Nodes) ->
- [erlang:disconnect_node(N) || N <- Nodes, N =/= node()],
- ok.
-
-is_enabled() ->
- lists:member(?TABLE, ets:all()).
-
-allow(Node) ->
- rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:delete(?TABLE, Node).
-block(Node) ->
- rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:insert(?TABLE, {Node, block}).
-
-%%----------------------------------------------------------------------------
-
-error_handler(Thunk) ->
- fun () ->
- try
- Thunk()
- catch _:{{nodedown, _}, _} ->
- %% The only other node we ever talk to is the test
- %% runner; if that's down then the test is nearly
- %% over; die quietly.
- ok;
- _:X ->
- io:format(user, "TCP proxy died with ~p~n At ~p~n",
- [X, erlang:get_stacktrace()]),
- erlang:halt(1)
- end
- end.
-
-go(Parent, Port, ProxyPort) ->
- ets:new(?TABLE, [public, named_table]),
- {ok, Sock} = gen_tcp:listen(ProxyPort, [inet,
- {reuseaddr, true}]),
- Parent ! ready,
- accept_loop(Sock, Port).
-
-accept_loop(ListenSock, Port) ->
- {ok, Sock} = gen_tcp:accept(ListenSock),
- Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)),
- ok = gen_tcp:controlling_process(Sock, Proxy),
- accept_loop(ListenSock, Port).
-
-run_it(SockIn, Port) ->
- case {inet:peername(SockIn), inet:sockname(SockIn)} of
- {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} ->
- {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort),
- case node() of
- This -> ok;
- _ -> exit({not_me, node(), This})
- end,
- {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]),
- run_loop({SockIn, SockOut}, Remote, []);
- _ ->
- ok
- end.
-
-run_loop(Sockets, RemoteNode, Buf0) ->
- Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode),
- receive
- {tcp, Sock, Data} ->
- Buf = [Data | Buf0],
- case {Block, get(dist_was_blocked)} of
- {true, false} ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution BLOCKED between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- {false, S} when S =:= true orelse S =:= undefined ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution allowed between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- _ ->
- ok
- end,
- case Block of
- false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)),
- run_loop(Sockets, RemoteNode, []);
- true -> run_loop(Sockets, RemoteNode, Buf)
- end;
- {tcp_closed, Sock} ->
- gen_tcp:close(other(Sock, Sockets));
- X ->
- exit({weirdness, X})
- end.
-
-other(A, {A, B}) -> B;
-other(B, {A, B}) -> A.
diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl
deleted file mode 100644
index 18255b8d48..0000000000
--- a/test/inet_tcp_proxy_manager.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy_manager).
-
-%% The TCP proxies need to decide whether to block based on the node
-%% they're running on, and the node connecting to them. The trouble
-%% is, they don't have an easy way to determine the latter. Therefore
-%% when A connects to B we register the source port used by A here, so
-%% that B can later look it up and find out who A is without having to
-%% sniff the distribution protocol.
-%%
-%% That does unfortunately mean that we need a central control
-%% thing. We assume here it's running on the node called
-%% 'standalone_test' since that's where tests are orchestrated from.
-%%
-%% Yes, this leaks. For its intended lifecycle, that's fine.
-
--behaviour(gen_server).
-
--export([start/0, register/5, lookup/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--define(NODE, ct).
-
--record(state, {ports, pending}).
-
-start() ->
- gen_server:start({local, ?MODULE}, ?MODULE, [], []).
-
-register(_From, _To, _SrcPort, Port, Port) ->
- %% No proxy, don't register
- ok;
-register(From, To, SrcPort, _Port, _ProxyPort) ->
- gen_server:call(name(), {register, From, To, SrcPort}, infinity).
-
-lookup(SrcPort) ->
- gen_server:call(name(), {lookup, SrcPort}, infinity).
-
-controller_node() ->
- {ok, ManagerNode} = application:get_env(kernel,
- inet_tcp_proxy_manager_node),
- ManagerNode.
-
-name() ->
- {?MODULE, controller_node()}.
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- net_kernel:monitor_nodes(true),
- {ok, #state{ports = dict:new(),
- pending = []}}.
-
-handle_call({register, FromNode, ToNode, SrcPort}, _From,
- State = #state{ports = Ports,
- pending = Pending}) ->
- {Notify, Pending2} =
- lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending),
- [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify],
- {reply, ok,
- State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports),
- pending = Pending2}};
-
-handle_call({lookup, SrcPort}, From,
- State = #state{ports = Ports, pending = Pending}) ->
- case dict:find(SrcPort, Ports) of
- {ok, {FromNode, ToNode}} ->
- {reply, {ok, FromNode, ToNode}, State};
- error ->
- {noreply, State#state{pending = [{SrcPort, From} | Pending]}}
- end;
-
-handle_call(_Req, _From, State) ->
- {reply, unknown_request, State}.
-
-handle_cast(_C, State) ->
- {noreply, State}.
-
-handle_info({nodedown, Node}, State = #state{ports = Ports}) ->
- Ports1 = dict:filter(
- fun (_, {From, To}) ->
- Node =/= From andalso Node =/= To
- end, Ports),
- {noreply, State#state{ports = Ports1}};
-
-handle_info(_I, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_, State, _) -> {ok, State}.
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index 1b901b5940..aa1c1df24f 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -45,6 +45,8 @@ groups() ->
{cluster_size_3, [], [
autoheal,
autoheal_after_pause_if_all_down,
+ autoheal_multiple_partial_partitions,
+ autoheal_unexpected_finish,
ignore,
pause_if_all_down_on_blocked,
pause_if_all_down_on_down,
@@ -307,6 +309,27 @@ do_autoheal(Config) ->
Test([{A, B}, {A, C}, {B, C}]),
ok.
+autoheal_multiple_partial_partitions(Config) ->
+ set_mode(Config, autoheal),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ [await_listening(N, true) || N <- [A, B, C]],
+ [await_partitions(N, []) || N <- [A, B, C]],
+ ok.
+
+autoheal_unexpected_finish(Config) ->
+ set_mode(Config, autoheal),
+ [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
+ Pid ! {autoheal_msg, {autoheal_finished, B}},
+ Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
+ ok.
+
partial_false_positive(Config) ->
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
block([{A, B}]),
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
new file mode 100644
index 0000000000..9305781bda
--- /dev/null
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -0,0 +1,146 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbitmqctl_integration_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-export([all/0
+ ,groups/0
+ ,init_per_suite/1
+ ,end_per_suite/1
+ ,init_per_group/2
+ ,end_per_group/2
+ ,init_per_testcase/2
+ ,end_per_testcase/2
+ ]).
+
+-export([list_queues_local/1
+ ,list_queues_offline/1
+ ,list_queues_online/1
+ ]).
+
+all() ->
+ [{group, list_queues}].
+
+groups() ->
+ [{list_queues, [],
+ [list_queues_local
+ ,list_queues_online
+ ,list_queues_offline
+ ]}].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(list_queues, Config0) ->
+ NumNodes = 3,
+ Config = create_n_node_cluster(Config0, NumNodes),
+ Config1 = declare_some_queues(Config),
+ rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1),
+ Config1;
+init_per_group(_, Config) ->
+ Config.
+
+create_n_node_cluster(Config0, NumNodes) ->
+ Config1 = rabbit_ct_helpers:set_config(
+ Config0, [{rmq_nodes_count, NumNodes},
+ {rmq_nodes_clustered, true}]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+declare_some_queues(Config) ->
+ Nodes = rabbit_ct_helpers:get_config(Config, rmq_nodes),
+ PerNodeQueues = [ declare_some_queues(Config, NodeNum)
+ || NodeNum <- lists:seq(0, length(Nodes)-1) ],
+ rabbit_ct_helpers:set_config(Config, {per_node_queues, PerNodeQueues}).
+
+declare_some_queues(Config, NodeNum) ->
+ {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, NodeNum),
+ NumQueues = 5,
+ Queues = [ list_to_binary(io_lib:format("queue-~b-on-node-~b", [QueueNum, NodeNum]))
+ || QueueNum <- lists:seq(1, NumQueues) ],
+ lists:foreach(fun (QueueName) ->
+ #'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = QueueName, durable = true})
+ end, Queues),
+ rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
+ Queues.
+
+end_per_group(list_queues, Config0) ->
+ Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of
+ undefined -> Config0;
+ C -> C
+ end,
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config0) ->
+ rabbit_ct_helpers:testcase_started(Config0, Testcase).
+
+end_per_testcase(Testcase, Config0) ->
+ rabbit_ct_helpers:testcase_finished(Config0, Testcase).
+
+%%----------------------------------------------------------------------------
+%% Test cases
+%%----------------------------------------------------------------------------
+list_queues_local(Config) ->
+ Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
+ Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
+ assert_ctl_queues(Config, 0, ["--local"], Node1Queues),
+ assert_ctl_queues(Config, 1, ["--local"], Node2Queues),
+ ok.
+
+list_queues_online(Config) ->
+ Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
+ Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
+ OnlineQueues = Node1Queues ++ Node2Queues,
+ assert_ctl_queues(Config, 0, ["--online"], OnlineQueues),
+ assert_ctl_queues(Config, 1, ["--online"], OnlineQueues),
+ ok.
+
+list_queues_offline(Config) ->
+ Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))),
+ OfflineQueues = Node3Queues,
+ assert_ctl_queues(Config, 0, ["--offline"], OfflineQueues),
+ assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
+ ok.
+
+%%----------------------------------------------------------------------------
+%% Helpers
+%%----------------------------------------------------------------------------
+assert_ctl_queues(Config, Node, Args, Expected0) ->
+ Expected = lists:sort(Expected0),
+ Got0 = run_list_queues(Config, Node, Args),
+ Got = lists:sort(lists:map(fun hd/1, Got0)),
+ case Got of
+ Expected ->
+ ok;
+ _ ->
+ ct:pal(error, "Listing queues on node ~p failed. Expected:~n~p~n~nGot:~n~p~n~n",
+ [Node, Expected, Got]),
+ exit({list_queues_unexpected_on, Node, Expected, Got})
+ end.
+
+run_list_queues(Config, Node, Args) ->
+ rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]).
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index ba0f43f11e..43e812fa3d 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -31,7 +31,7 @@ groups() ->
[
{parallel_tests, [parallel], [
arguments_parser,
- filtering_flags_parsing,
+ mutually_exclusive_flags_parsing,
{basic_header_handling, [parallel], [
write_table_with_invalid_existing_type,
invalid_existing_headers,
@@ -135,26 +135,41 @@ check_parse_arguments(ExpRes, Fun, As) ->
true = SortRes(ExpRes) =:= SortRes(Fun(As)).
-filtering_flags_parsing(_Config) ->
- Cases = [{[], [], []}
- ,{[{"--online", true}], ["--offline", "--online", "--third-option"], [false, true, false]}
- ,{[{"--online", true}, {"--third-option", true}, {"--offline", true}], ["--offline", "--online", "--third-option"], [true, true, true]}
- ,{[], ["--offline", "--online", "--third-option"], [true, true, true]}
- ],
- lists:foreach(fun({Vals, Opts, Expect}) ->
- case rabbit_cli:filter_opts(Vals, Opts) of
- Expect ->
+mutually_exclusive_flags_parsing(_Config) ->
+ Matcher = fun ({ok, Value}, {ok, Value}) -> true;
+ ({error, Value}, {error, Pattern}) ->
+ case re:run(Value, Pattern) of
+ {match, _} -> true;
+ _ -> false
+ end;
+ (_, _) -> false
+ end,
+ Spec = [{"--online", online}
+ ,{"--offline", offline}
+ ,{"--local", local}],
+ Default = all,
+ Cases =[{["--online"], {ok, online}}
+ ,{[], {ok, Default}}
+ ,{["--offline"], {ok, offline}}
+ ,{["--local"], {ok, local}}
+ ,{["--offline", "--local"], {error, "mutually exclusive"}}
+ ,{["--offline", "--online"], {error, "mutually exclusive"}}
+ ,{["--offline", "--local", "--online"], {error, "mutually exclusive"}}
+ ],
+ lists:foreach(fun({Opts, Expected}) ->
+ ExpandedOpts = [ {Opt, true} || Opt <- Opts ],
+ Got = rabbit_cli:mutually_exclusive_flags(ExpandedOpts, all, Spec),
+ case Matcher(Got, Expected) of
+ true ->
ok;
- Got ->
- exit({no_match, Got, Expect, {args, Vals, Opts}})
+ false ->
+ exit({no_match, Got, Expected, {opts, Opts}})
end
- end,
- Cases).
+ end, Cases).
%% -------------------------------------------------------------------
%% basic_header_handling.
%% -------------------------------------------------------------------
-
-define(XDEATH_TABLE,
[{<<"reason">>, longstr, <<"blah">>},
{<<"queue">>, longstr, <<"foo.bar.baz">>},
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index e9ecbf5444..98cd77ccbe 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -1684,15 +1684,15 @@ credit_flow_settings(Config) ->
credit_flow_settings1(_Config) ->
%% default values
- passed = test_proc(200, 50),
+ passed = test_proc(200, 100),
- application:set_env(rabbit, credit_flow_default_credit, {100, 20}),
- passed = test_proc(100, 20),
+ application:set_env(rabbit, credit_flow_default_credit, {100, 300}),
+ passed = test_proc(100, 300),
application:unset_env(rabbit, credit_flow_default_credit),
% back to defaults
- passed = test_proc(200, 50),
+ passed = test_proc(200, 100),
passed.
test_proc(InitialCredit, MoreCreditAfter) ->