summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/inet_proxy_dist.erl201
-rw-r--r--test/inet_tcp_proxy.erl134
-rw-r--r--test/inet_tcp_proxy_manager.erl107
3 files changed, 0 insertions, 442 deletions
diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl
deleted file mode 100644
index 32b7641a79..0000000000
--- a/test/inet_proxy_dist.erl
+++ /dev/null
@@ -1,201 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_proxy_dist).
-
-%% A distribution plugin that uses the usual inet_tcp_dist but allows
-%% insertion of a proxy at the receiving end.
-
-%% inet_*_dist "behaviour"
--export([listen/1, accept/1, accept_connection/5,
- setup/5, close/1, select/1, is_node_name/1]).
-
-%% For copypasta from inet_tcp_dist
--export([do_setup/6]).
--import(error_logger,[error_msg/2]).
-
--define(REAL, inet_tcp_dist).
-
-%%----------------------------------------------------------------------------
-
-listen(Name) -> ?REAL:listen(Name).
-select(Node) -> ?REAL:select(Node).
-accept(Listen) -> ?REAL:accept(Listen).
-close(Socket) -> ?REAL:close(Socket).
-is_node_name(Node) -> ?REAL:is_node_name(Node).
-
-accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime).
-
-%% This is copied from inet_tcp_dist, in order to change the
-%% output of erl_epmd:port_please/2.
-
--include_lib("kernel/include/net_address.hrl").
--include_lib("kernel/include/dist_util.hrl").
-
-setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- spawn_opt(?MODULE, do_setup,
- [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
- [link, {priority, max}]).
-
-do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
- [Name, Address] = splitnode(Node, LongOrShortNames),
- case inet:getaddr(Address, inet) of
- {ok, Ip} ->
- Timer = dist_util:start_timer(SetupTime),
- case erl_epmd:port_please(Name, Ip) of
- {port, TcpPort, Version} ->
- ?trace("port_please(~p) -> version ~p~n",
- [Node,Version]),
- dist_util:reset_timer(Timer),
- %% Modification START
- Ret = application:get_env(kernel,
- dist_and_proxy_ports_map),
- PortsMap = case Ret of
- {ok, M} -> M;
- undefined -> []
- end,
- ProxyPort = case inet_tcp_proxy:is_enabled() of
- true -> proplists:get_value(TcpPort, PortsMap, TcpPort);
- false -> TcpPort
- end,
- case inet_tcp:connect(Ip, ProxyPort,
- [{active, false},
- {packet,2}]) of
- {ok, Socket} ->
- {ok, {_, SrcPort}} = inet:sockname(Socket),
- ok = inet_tcp_proxy_manager:register(
- node(), Node, SrcPort, TcpPort, ProxyPort),
- %% Modification END
- HSData = #hs_data{
- kernel_pid = Kernel,
- other_node = Node,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- other_version = Version,
- f_send = fun inet_tcp:send/2,
- f_recv = fun inet_tcp:recv/3,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, false},
- {packet, 4},
- nodelay()])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, true},
- {deliver, port},
- {packet, 4},
- nodelay()])
- end,
- f_getll = fun inet:getll/1,
- f_address =
- fun(_,_) ->
- #net_address{
- address = {Ip,TcpPort},
- host = Address,
- protocol = tcp,
- family = inet}
- end,
- mf_tick = fun tick/1,
- mf_getstat = fun inet_tcp_dist:getstat/1,
- request_type = Type
- },
- dist_util:handshake_we_started(HSData);
- R ->
- io:format("~p failed! ~p~n", [node(), R]),
- %% Other Node may have closed since
- %% port_please !
- ?trace("other node (~p) "
- "closed since port_please.~n",
- [Node]),
- ?shutdown(Node)
- end;
- _ ->
- ?trace("port_please (~p) "
- "failed.~n", [Node]),
- ?shutdown(Node)
- end;
- _Other ->
- ?trace("inet_getaddr(~p) "
- "failed (~p).~n", [Node,_Other]),
- ?shutdown(Node)
- end.
-
-%% If Node is illegal terminate the connection setup!!
-splitnode(Node, LongOrShortNames) ->
- case split_node(atom_to_list(Node), $@, []) of
- [Name|Tail] when Tail =/= [] ->
- Host = lists:append(Tail),
- case split_node(Host, $., []) of
- [_] when LongOrShortNames =:= longnames ->
- error_msg("** System running to use "
- "fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- L when length(L) > 1, LongOrShortNames =:= shortnames ->
- error_msg("** System NOT running to use fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- _ ->
- [Name, Host]
- end;
- [_] ->
- error_msg("** Nodename ~p illegal, no '@' character **~n",
- [Node]),
- ?shutdown(Node);
- _ ->
- error_msg("** Nodename ~p illegal **~n", [Node]),
- ?shutdown(Node)
- end.
-
-split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
-split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
-split_node([], _, Ack) -> [lists:reverse(Ack)].
-
-%% we may not always want the nodelay behaviour
-%% for performance reasons
-
-nodelay() ->
- case application:get_env(kernel, dist_nodelay) of
- undefined ->
- {nodelay, true};
- {ok, true} ->
- {nodelay, true};
- {ok, false} ->
- {nodelay, false};
- _ ->
- {nodelay, true}
- end.
-
-tick(Socket) ->
- case inet_tcp:send(Socket, [], [force]) of
- {error, closed} ->
- self() ! {tcp_closed, Socket},
- {error, closed};
- R ->
- R
- end.
diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl
deleted file mode 100644
index 4498b8f952..0000000000
--- a/test/inet_tcp_proxy.erl
+++ /dev/null
@@ -1,134 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy).
-
-%% A TCP proxy for insertion into the Erlang distribution mechanism,
-%% which allows us to simulate network partitions.
-
--export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]).
-
--define(TABLE, ?MODULE).
-
-%% This can't start_link because there's no supervision hierarchy we
-%% can easily fit it into (we need to survive all application
-%% restarts). So we have to do some horrible error handling.
-
-start(ManagerNode, DistPort, ProxyPort) ->
- application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode),
- Parent = self(),
- Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)),
- MRef = erlang:monitor(process, Pid),
- receive
- ready ->
- erlang:demonitor(MRef),
- ok;
- {'DOWN', MRef, _, _, Reason} ->
- {error, Reason}
- end.
-
-reconnect(Nodes) ->
- [erlang:disconnect_node(N) || N <- Nodes, N =/= node()],
- ok.
-
-is_enabled() ->
- lists:member(?TABLE, ets:all()).
-
-allow(Node) ->
- rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:delete(?TABLE, Node).
-block(Node) ->
- rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:insert(?TABLE, {Node, block}).
-
-%%----------------------------------------------------------------------------
-
-error_handler(Thunk) ->
- fun () ->
- try
- Thunk()
- catch _:{{nodedown, _}, _} ->
- %% The only other node we ever talk to is the test
- %% runner; if that's down then the test is nearly
- %% over; die quietly.
- ok;
- _:X ->
- io:format(user, "TCP proxy died with ~p~n At ~p~n",
- [X, erlang:get_stacktrace()]),
- erlang:halt(1)
- end
- end.
-
-go(Parent, Port, ProxyPort) ->
- ets:new(?TABLE, [public, named_table]),
- {ok, Sock} = gen_tcp:listen(ProxyPort, [inet,
- {reuseaddr, true}]),
- Parent ! ready,
- accept_loop(Sock, Port).
-
-accept_loop(ListenSock, Port) ->
- {ok, Sock} = gen_tcp:accept(ListenSock),
- Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)),
- ok = gen_tcp:controlling_process(Sock, Proxy),
- accept_loop(ListenSock, Port).
-
-run_it(SockIn, Port) ->
- case {inet:peername(SockIn), inet:sockname(SockIn)} of
- {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} ->
- {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort),
- case node() of
- This -> ok;
- _ -> exit({not_me, node(), This})
- end,
- {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]),
- run_loop({SockIn, SockOut}, Remote, []);
- _ ->
- ok
- end.
-
-run_loop(Sockets, RemoteNode, Buf0) ->
- Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode),
- receive
- {tcp, Sock, Data} ->
- Buf = [Data | Buf0],
- case {Block, get(dist_was_blocked)} of
- {true, false} ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution BLOCKED between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- {false, S} when S =:= true orelse S =:= undefined ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution allowed between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- _ ->
- ok
- end,
- case Block of
- false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)),
- run_loop(Sockets, RemoteNode, []);
- true -> run_loop(Sockets, RemoteNode, Buf)
- end;
- {tcp_closed, Sock} ->
- gen_tcp:close(other(Sock, Sockets));
- X ->
- exit({weirdness, X})
- end.
-
-other(A, {A, B}) -> B;
-other(B, {A, B}) -> A.
diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl
deleted file mode 100644
index 18255b8d48..0000000000
--- a/test/inet_tcp_proxy_manager.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy_manager).
-
-%% The TCP proxies need to decide whether to block based on the node
-%% they're running on, and the node connecting to them. The trouble
-%% is, they don't have an easy way to determine the latter. Therefore
-%% when A connects to B we register the source port used by A here, so
-%% that B can later look it up and find out who A is without having to
-%% sniff the distribution protocol.
-%%
-%% That does unfortunately mean that we need a central control
-%% thing. We assume here it's running on the node called
-%% 'standalone_test' since that's where tests are orchestrated from.
-%%
-%% Yes, this leaks. For its intended lifecycle, that's fine.
-
--behaviour(gen_server).
-
--export([start/0, register/5, lookup/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--define(NODE, ct).
-
--record(state, {ports, pending}).
-
-start() ->
- gen_server:start({local, ?MODULE}, ?MODULE, [], []).
-
-register(_From, _To, _SrcPort, Port, Port) ->
- %% No proxy, don't register
- ok;
-register(From, To, SrcPort, _Port, _ProxyPort) ->
- gen_server:call(name(), {register, From, To, SrcPort}, infinity).
-
-lookup(SrcPort) ->
- gen_server:call(name(), {lookup, SrcPort}, infinity).
-
-controller_node() ->
- {ok, ManagerNode} = application:get_env(kernel,
- inet_tcp_proxy_manager_node),
- ManagerNode.
-
-name() ->
- {?MODULE, controller_node()}.
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- net_kernel:monitor_nodes(true),
- {ok, #state{ports = dict:new(),
- pending = []}}.
-
-handle_call({register, FromNode, ToNode, SrcPort}, _From,
- State = #state{ports = Ports,
- pending = Pending}) ->
- {Notify, Pending2} =
- lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending),
- [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify],
- {reply, ok,
- State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports),
- pending = Pending2}};
-
-handle_call({lookup, SrcPort}, From,
- State = #state{ports = Ports, pending = Pending}) ->
- case dict:find(SrcPort, Ports) of
- {ok, {FromNode, ToNode}} ->
- {reply, {ok, FromNode, ToNode}, State};
- error ->
- {noreply, State#state{pending = [{SrcPort, From} | Pending]}}
- end;
-
-handle_call(_Req, _From, State) ->
- {reply, unknown_request, State}.
-
-handle_cast(_C, State) ->
- {noreply, State}.
-
-handle_info({nodedown, Node}, State = #state{ports = Ports}) ->
- Ports1 = dict:filter(
- fun (_, {From, To}) ->
- Node =/= From andalso Node =/= To
- end, Ports),
- {noreply, State#state{ports = Ports1}};
-
-handle_info(_I, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_, State, _) -> {ok, State}.