summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2016-09-21 10:09:46 +0100
committerDiana Corbacho <diana@rabbitmq.com>2016-09-21 10:09:46 +0100
commit8ebdbc60a597613ee7b9ed1cc1a9460951913e77 (patch)
tree12df38aefa885bf431e533d499576dc5c69e9443
parent7a53030deadc9109ad14113e81a0e18cbe9bf3db (diff)
parentbe9a3f7f84d3f6c680d3019360c5458f946aa462 (diff)
downloadrabbitmq-server-git-8ebdbc60a597613ee7b9ed1cc1a9460951913e77.tar.gz
Merge branch 'stable'
-rw-r--r--Makefile4
-rw-r--r--rabbitmq-components.mk46
-rw-r--r--src/gm.erl34
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl66
-rw-r--r--test/gm_SUITE.erl2
-rw-r--r--test/inet_proxy_dist.erl201
-rw-r--r--test/inet_tcp_proxy.erl134
-rw-r--r--test/inet_tcp_proxy_manager.erl107
9 files changed, 81 insertions, 537 deletions
diff --git a/Makefile b/Makefile
index fa6b82d02c..854ce7aced 100644
--- a/Makefile
+++ b/Makefile
@@ -4,8 +4,8 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
# Release artifacts are put in $(PACKAGES_DIR).
PACKAGES_DIR ?= $(abspath PACKAGES)
-DEPS = ranch lager $(PLUGINS)
-TEST_DEPS = amqp_client meck proper
+DEPS = ranch lager rabbit_common
+TEST_DEPS = rabbitmq_ct_helpers amqp_client meck proper
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index dbad95d738..e0f4c91287 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -5,16 +5,6 @@ ifeq ($(.DEFAULT_GOAL),)
.DEFAULT_GOAL = all
endif
-# Automatically add rabbitmq-common to the dependencies, at least for
-# the Makefiles.
-ifneq ($(PROJECT),rabbit_common)
-ifneq ($(PROJECT),rabbitmq_public_umbrella)
-ifeq ($(filter rabbit_common,$(DEPS)),)
-DEPS += rabbit_common
-endif
-endif
-endif
-
# --------------------------------------------------------------------
# RabbitMQ components.
# --------------------------------------------------------------------
@@ -248,42 +238,6 @@ prepare-dist::
@:
# --------------------------------------------------------------------
-# Run a RabbitMQ node (moved from rabbitmq-run.mk as a workaround).
-# --------------------------------------------------------------------
-
-# Add "rabbit" to the build dependencies when the user wants to start
-# a broker or to the test dependencies when the user wants to test a
-# project.
-#
-# NOTE: This should belong to rabbitmq-run.mk. Unfortunately, it is
-# loaded *after* erlang.mk which is too late to add a dependency. That's
-# why rabbitmq-components.mk knows the list of targets which start a
-# broker and add "rabbit" to the dependencies in this case.
-
-ifneq ($(PROJECT),rabbit)
-ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS)),)
-RUN_RMQ_TARGETS = run-broker \
- run-tls-broker \
- run-background-broker \
- run-node \
- run-background-node \
- start-background-node \
- start-background-broker \
- start-rabbit-on-node
-
-ifneq ($(filter $(RUN_RMQ_TARGETS),$(MAKECMDGOALS)),)
-BUILD_DEPS += rabbit
-endif
-endif
-
-ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),)
-ifneq ($(filter check tests,$(MAKECMDGOALS)),)
-TEST_DEPS += rabbit
-endif
-endif
-endif
-
-# --------------------------------------------------------------------
# rabbitmq-components.mk checks.
# --------------------------------------------------------------------
diff --git a/src/gm.erl b/src/gm.erl
index aef23c7269..aa4ffcf511 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -611,7 +611,7 @@ handle_call({add_on_right, NewMember}, _From,
handle_callback_result({Result, {ok, Group}, State1})
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end.
%% add_on_right causes a catchup to be sent immediately from the left,
@@ -646,7 +646,7 @@ handle_cast({?TAG, ReqVer, Msg},
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end;
handle_cast({broadcast, _Msg, _SizeHint},
@@ -675,16 +675,21 @@ handle_cast(join, State = #state { self = Self,
module = Module,
callback_args = Args,
txn_executor = TxnFun }) ->
- View = join_group(Self, GroupName, TxnFun),
- MembersState =
- case alive_view_members(View) of
- [Self] -> blank_member_state();
- _ -> undefined
- end,
- State1 = check_neighbours(State #state { view = View,
- members_state = MembersState }),
- handle_callback_result(
- {Module:joined(Args, get_pids(all_known_members(View))), State1});
+ try
+ View = join_group(Self, GroupName, TxnFun),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, get_pids(all_known_members(View))), State1})
+ catch
+ lost_membership ->
+ {stop, shutdown, State}
+ end;
handle_cast({validate_members, OldMembers},
State = #state { view = View,
@@ -756,7 +761,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
end
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end;
handle_info(_, State) ->
%% Discard any unexpected messages, such as late replies from neighbour_call/2
@@ -768,7 +773,6 @@ handle_info(_, State) ->
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -871,7 +875,7 @@ handle_msg({activity, Left, Activity},
Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
catch
lost_membership ->
- {{stop, normal}, State}
+ {{stop, shutdown}, State}
end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2c42e348ca..3e6d961d5f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1037,7 +1037,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
end.
handle_call({init, Recover}, From, State) ->
- init_it(Recover, From, State);
+ try
+ init_it(Recover, From, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1189,7 +1199,17 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
handle_cast(init, State) ->
- init_it({no_barrier, non_clean_shutdown}, none, State);
+ try
+ init_it({no_barrier, non_clean_shutdown}, none, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 78d7341584..b006e37eb2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) ->
State.
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), depth_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- Self = self(),
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue{gm_pids = GMPids}]
- = mnesia:read({rabbit_queue, QName}),
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
- state = live})
- end),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- %% We need synchronous add here (i.e. do not return until the
- %% slave is running) so that when queue declaration is finished
- %% all slaves are up; we don't want to end up with unsynced slaves
- %% just by declaring a new queue. But add can't be synchronous all
- %% the time as it can be called by slaves and that's
- %% deadlock-prone.
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
- #state { name = QName,
- gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = dict:new(),
- confirmed = [],
- known_senders = sets:new(),
- wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }.
+ case rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()) of
+ {ok, CPid} ->
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
+ end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
+ #state { name = QName,
+ gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = dict:new(),
+ confirmed = [],
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
+ {error, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ % is trapping exists
+ throw({coordinator_not_started, Reason})
+ end.
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
diff --git a/test/gm_SUITE.erl b/test/gm_SUITE.erl
index 8b07c9efad..df73d8ac27 100644
--- a/test/gm_SUITE.erl
+++ b/test/gm_SUITE.erl
@@ -146,7 +146,7 @@ down_in_members_change(_Config) ->
end),
gm:leave(Pid2),
Passed = receive
- {'EXIT', Pid, normal} ->
+ {'EXIT', Pid, shutdown} ->
passed;
{'EXIT', Pid, _} ->
crashed
diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl
deleted file mode 100644
index 32b7641a79..0000000000
--- a/test/inet_proxy_dist.erl
+++ /dev/null
@@ -1,201 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_proxy_dist).
-
-%% A distribution plugin that uses the usual inet_tcp_dist but allows
-%% insertion of a proxy at the receiving end.
-
-%% inet_*_dist "behaviour"
--export([listen/1, accept/1, accept_connection/5,
- setup/5, close/1, select/1, is_node_name/1]).
-
-%% For copypasta from inet_tcp_dist
--export([do_setup/6]).
--import(error_logger,[error_msg/2]).
-
--define(REAL, inet_tcp_dist).
-
-%%----------------------------------------------------------------------------
-
-listen(Name) -> ?REAL:listen(Name).
-select(Node) -> ?REAL:select(Node).
-accept(Listen) -> ?REAL:accept(Listen).
-close(Socket) -> ?REAL:close(Socket).
-is_node_name(Node) -> ?REAL:is_node_name(Node).
-
-accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime).
-
-%% This is copied from inet_tcp_dist, in order to change the
-%% output of erl_epmd:port_please/2.
-
--include_lib("kernel/include/net_address.hrl").
--include_lib("kernel/include/dist_util.hrl").
-
-setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- spawn_opt(?MODULE, do_setup,
- [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
- [link, {priority, max}]).
-
-do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
- [Name, Address] = splitnode(Node, LongOrShortNames),
- case inet:getaddr(Address, inet) of
- {ok, Ip} ->
- Timer = dist_util:start_timer(SetupTime),
- case erl_epmd:port_please(Name, Ip) of
- {port, TcpPort, Version} ->
- ?trace("port_please(~p) -> version ~p~n",
- [Node,Version]),
- dist_util:reset_timer(Timer),
- %% Modification START
- Ret = application:get_env(kernel,
- dist_and_proxy_ports_map),
- PortsMap = case Ret of
- {ok, M} -> M;
- undefined -> []
- end,
- ProxyPort = case inet_tcp_proxy:is_enabled() of
- true -> proplists:get_value(TcpPort, PortsMap, TcpPort);
- false -> TcpPort
- end,
- case inet_tcp:connect(Ip, ProxyPort,
- [{active, false},
- {packet,2}]) of
- {ok, Socket} ->
- {ok, {_, SrcPort}} = inet:sockname(Socket),
- ok = inet_tcp_proxy_manager:register(
- node(), Node, SrcPort, TcpPort, ProxyPort),
- %% Modification END
- HSData = #hs_data{
- kernel_pid = Kernel,
- other_node = Node,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- other_version = Version,
- f_send = fun inet_tcp:send/2,
- f_recv = fun inet_tcp:recv/3,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, false},
- {packet, 4},
- nodelay()])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, true},
- {deliver, port},
- {packet, 4},
- nodelay()])
- end,
- f_getll = fun inet:getll/1,
- f_address =
- fun(_,_) ->
- #net_address{
- address = {Ip,TcpPort},
- host = Address,
- protocol = tcp,
- family = inet}
- end,
- mf_tick = fun tick/1,
- mf_getstat = fun inet_tcp_dist:getstat/1,
- request_type = Type
- },
- dist_util:handshake_we_started(HSData);
- R ->
- io:format("~p failed! ~p~n", [node(), R]),
- %% Other Node may have closed since
- %% port_please !
- ?trace("other node (~p) "
- "closed since port_please.~n",
- [Node]),
- ?shutdown(Node)
- end;
- _ ->
- ?trace("port_please (~p) "
- "failed.~n", [Node]),
- ?shutdown(Node)
- end;
- _Other ->
- ?trace("inet_getaddr(~p) "
- "failed (~p).~n", [Node,_Other]),
- ?shutdown(Node)
- end.
-
-%% If Node is illegal terminate the connection setup!!
-splitnode(Node, LongOrShortNames) ->
- case split_node(atom_to_list(Node), $@, []) of
- [Name|Tail] when Tail =/= [] ->
- Host = lists:append(Tail),
- case split_node(Host, $., []) of
- [_] when LongOrShortNames =:= longnames ->
- error_msg("** System running to use "
- "fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- L when length(L) > 1, LongOrShortNames =:= shortnames ->
- error_msg("** System NOT running to use fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
- ?shutdown(Node);
- _ ->
- [Name, Host]
- end;
- [_] ->
- error_msg("** Nodename ~p illegal, no '@' character **~n",
- [Node]),
- ?shutdown(Node);
- _ ->
- error_msg("** Nodename ~p illegal **~n", [Node]),
- ?shutdown(Node)
- end.
-
-split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
-split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
-split_node([], _, Ack) -> [lists:reverse(Ack)].
-
-%% we may not always want the nodelay behaviour
-%% for performance reasons
-
-nodelay() ->
- case application:get_env(kernel, dist_nodelay) of
- undefined ->
- {nodelay, true};
- {ok, true} ->
- {nodelay, true};
- {ok, false} ->
- {nodelay, false};
- _ ->
- {nodelay, true}
- end.
-
-tick(Socket) ->
- case inet_tcp:send(Socket, [], [force]) of
- {error, closed} ->
- self() ! {tcp_closed, Socket},
- {error, closed};
- R ->
- R
- end.
diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl
deleted file mode 100644
index 4498b8f952..0000000000
--- a/test/inet_tcp_proxy.erl
+++ /dev/null
@@ -1,134 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy).
-
-%% A TCP proxy for insertion into the Erlang distribution mechanism,
-%% which allows us to simulate network partitions.
-
--export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]).
-
--define(TABLE, ?MODULE).
-
-%% This can't start_link because there's no supervision hierarchy we
-%% can easily fit it into (we need to survive all application
-%% restarts). So we have to do some horrible error handling.
-
-start(ManagerNode, DistPort, ProxyPort) ->
- application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode),
- Parent = self(),
- Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)),
- MRef = erlang:monitor(process, Pid),
- receive
- ready ->
- erlang:demonitor(MRef),
- ok;
- {'DOWN', MRef, _, _, Reason} ->
- {error, Reason}
- end.
-
-reconnect(Nodes) ->
- [erlang:disconnect_node(N) || N <- Nodes, N =/= node()],
- ok.
-
-is_enabled() ->
- lists:member(?TABLE, ets:all()).
-
-allow(Node) ->
- rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:delete(?TABLE, Node).
-block(Node) ->
- rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n",
- [?MODULE, node(), Node]),
- ets:insert(?TABLE, {Node, block}).
-
-%%----------------------------------------------------------------------------
-
-error_handler(Thunk) ->
- fun () ->
- try
- Thunk()
- catch _:{{nodedown, _}, _} ->
- %% The only other node we ever talk to is the test
- %% runner; if that's down then the test is nearly
- %% over; die quietly.
- ok;
- _:X ->
- io:format(user, "TCP proxy died with ~p~n At ~p~n",
- [X, erlang:get_stacktrace()]),
- erlang:halt(1)
- end
- end.
-
-go(Parent, Port, ProxyPort) ->
- ets:new(?TABLE, [public, named_table]),
- {ok, Sock} = gen_tcp:listen(ProxyPort, [inet,
- {reuseaddr, true}]),
- Parent ! ready,
- accept_loop(Sock, Port).
-
-accept_loop(ListenSock, Port) ->
- {ok, Sock} = gen_tcp:accept(ListenSock),
- Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)),
- ok = gen_tcp:controlling_process(Sock, Proxy),
- accept_loop(ListenSock, Port).
-
-run_it(SockIn, Port) ->
- case {inet:peername(SockIn), inet:sockname(SockIn)} of
- {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} ->
- {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort),
- case node() of
- This -> ok;
- _ -> exit({not_me, node(), This})
- end,
- {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]),
- run_loop({SockIn, SockOut}, Remote, []);
- _ ->
- ok
- end.
-
-run_loop(Sockets, RemoteNode, Buf0) ->
- Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode),
- receive
- {tcp, Sock, Data} ->
- Buf = [Data | Buf0],
- case {Block, get(dist_was_blocked)} of
- {true, false} ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution BLOCKED between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- {false, S} when S =:= true orelse S =:= undefined ->
- put(dist_was_blocked, Block),
- rabbit_log:warning(
- "(~s) Distribution allowed between ~s and ~s~n",
- [?MODULE, node(), RemoteNode]);
- _ ->
- ok
- end,
- case Block of
- false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)),
- run_loop(Sockets, RemoteNode, []);
- true -> run_loop(Sockets, RemoteNode, Buf)
- end;
- {tcp_closed, Sock} ->
- gen_tcp:close(other(Sock, Sockets));
- X ->
- exit({weirdness, X})
- end.
-
-other(A, {A, B}) -> B;
-other(B, {A, B}) -> A.
diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl
deleted file mode 100644
index 18255b8d48..0000000000
--- a/test/inet_tcp_proxy_manager.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
--module(inet_tcp_proxy_manager).
-
-%% The TCP proxies need to decide whether to block based on the node
-%% they're running on, and the node connecting to them. The trouble
-%% is, they don't have an easy way to determine the latter. Therefore
-%% when A connects to B we register the source port used by A here, so
-%% that B can later look it up and find out who A is without having to
-%% sniff the distribution protocol.
-%%
-%% That does unfortunately mean that we need a central control
-%% thing. We assume here it's running on the node called
-%% 'standalone_test' since that's where tests are orchestrated from.
-%%
-%% Yes, this leaks. For its intended lifecycle, that's fine.
-
--behaviour(gen_server).
-
--export([start/0, register/5, lookup/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--define(NODE, ct).
-
--record(state, {ports, pending}).
-
-start() ->
- gen_server:start({local, ?MODULE}, ?MODULE, [], []).
-
-register(_From, _To, _SrcPort, Port, Port) ->
- %% No proxy, don't register
- ok;
-register(From, To, SrcPort, _Port, _ProxyPort) ->
- gen_server:call(name(), {register, From, To, SrcPort}, infinity).
-
-lookup(SrcPort) ->
- gen_server:call(name(), {lookup, SrcPort}, infinity).
-
-controller_node() ->
- {ok, ManagerNode} = application:get_env(kernel,
- inet_tcp_proxy_manager_node),
- ManagerNode.
-
-name() ->
- {?MODULE, controller_node()}.
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- net_kernel:monitor_nodes(true),
- {ok, #state{ports = dict:new(),
- pending = []}}.
-
-handle_call({register, FromNode, ToNode, SrcPort}, _From,
- State = #state{ports = Ports,
- pending = Pending}) ->
- {Notify, Pending2} =
- lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending),
- [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify],
- {reply, ok,
- State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports),
- pending = Pending2}};
-
-handle_call({lookup, SrcPort}, From,
- State = #state{ports = Ports, pending = Pending}) ->
- case dict:find(SrcPort, Ports) of
- {ok, {FromNode, ToNode}} ->
- {reply, {ok, FromNode, ToNode}, State};
- error ->
- {noreply, State#state{pending = [{SrcPort, From} | Pending]}}
- end;
-
-handle_call(_Req, _From, State) ->
- {reply, unknown_request, State}.
-
-handle_cast(_C, State) ->
- {noreply, State}.
-
-handle_info({nodedown, Node}, State = #state{ports = Ports}) ->
- Ports1 = dict:filter(
- fun (_, {From, To}) ->
- Node =/= From andalso Node =/= To
- end, Ports),
- {noreply, State#state{ports = Ports1}};
-
-handle_info(_I, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_, State, _) -> {ok, State}.