diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | rabbitmq-components.mk | 46 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 16 | ||||
| -rw-r--r-- | test/inet_proxy_dist.erl | 201 | ||||
| -rw-r--r-- | test/inet_tcp_proxy.erl | 134 | ||||
| -rw-r--r-- | test/inet_tcp_proxy_manager.erl | 107 |
6 files changed, 10 insertions, 498 deletions
@@ -4,8 +4,8 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src) # Release artifacts are put in $(PACKAGES_DIR). PACKAGES_DIR ?= $(abspath PACKAGES) -DEPS = ranch $(PLUGINS) -TEST_DEPS = amqp_client meck proper +DEPS = ranch rabbit_common +TEST_DEPS = rabbitmq_ct_helpers amqp_client meck proper define usage_xml_to_erl $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1)))) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index dbad95d738..e0f4c91287 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -5,16 +5,6 @@ ifeq ($(.DEFAULT_GOAL),) .DEFAULT_GOAL = all endif -# Automatically add rabbitmq-common to the dependencies, at least for -# the Makefiles. -ifneq ($(PROJECT),rabbit_common) -ifneq ($(PROJECT),rabbitmq_public_umbrella) -ifeq ($(filter rabbit_common,$(DEPS)),) -DEPS += rabbit_common -endif -endif -endif - # -------------------------------------------------------------------- # RabbitMQ components. # -------------------------------------------------------------------- @@ -248,42 +238,6 @@ prepare-dist:: @: # -------------------------------------------------------------------- -# Run a RabbitMQ node (moved from rabbitmq-run.mk as a workaround). -# -------------------------------------------------------------------- - -# Add "rabbit" to the build dependencies when the user wants to start -# a broker or to the test dependencies when the user wants to test a -# project. -# -# NOTE: This should belong to rabbitmq-run.mk. Unfortunately, it is -# loaded *after* erlang.mk which is too late to add a dependency. That's -# why rabbitmq-components.mk knows the list of targets which start a -# broker and add "rabbit" to the dependencies in this case. - -ifneq ($(PROJECT),rabbit) -ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS)),) -RUN_RMQ_TARGETS = run-broker \ - run-tls-broker \ - run-background-broker \ - run-node \ - run-background-node \ - start-background-node \ - start-background-broker \ - start-rabbit-on-node - -ifneq ($(filter $(RUN_RMQ_TARGETS),$(MAKECMDGOALS)),) -BUILD_DEPS += rabbit -endif -endif - -ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),) -ifneq ($(filter check tests,$(MAKECMDGOALS)),) -TEST_DEPS += rabbit -endif -endif -endif - -# -------------------------------------------------------------------- # rabbitmq-components.mk checks. # -------------------------------------------------------------------- diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 8f711b3473..f2ba9a1519 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -1082,7 +1082,7 @@ stop_server_process() { if [ $rc -ne 0 ] ; then # Try to stop without known PID ocf_log err "${LH} RMQ-server process PIDFILE was not found!" - su_rabbit_cmd "${OCF_RESKEY_ctl} stop 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\"" + su_rabbit_cmd "${OCF_RESKEY_ctl} stop >> \"${OCF_RESKEY_log_dir}/shutdown_log\" 2>&1" if [ $? -eq 0 ] ; then ocf_log info "${LH} RMQ-server process stopped succesfully, although there was no PIDFILE found." ocf_log info "${LH} grant a graceful termintation window ${OCF_RESKEY_stop_time} to end its beam" @@ -1093,7 +1093,7 @@ stop_server_process() { elif [ "${pid}" ] ; then # Try to stop gracefully by known PID ocf_log info "${LH} Execute stop with timeout: ${TIMEOUT_ARG}" - su_rabbit_cmd "${OCF_RESKEY_ctl} stop ${OCF_RESKEY_pid_file} 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\"" + su_rabbit_cmd "${OCF_RESKEY_ctl} stop ${OCF_RESKEY_pid_file} >> \"${OCF_RESKEY_log_dir}/shutdown_log\" 2>&1" [ $? -eq 0 ] && ocf_log info "${LH} RMQ-server process (PID=${pid}) stopped succesfully." fi @@ -1128,7 +1128,7 @@ stop_rmq_server_app() { # stop the app ocf_log info "${LH} Execute stop_app with timeout: ${TIMEOUT_ARG}" - su_rabbit_cmd "${OCF_RESKEY_ctl} stop_app 2>&1 >> \"${OCF_RESKEY_log_dir}/shutdown_log\"" + su_rabbit_cmd "${OCF_RESKEY_ctl} stop_app >> \"${OCF_RESKEY_log_dir}/shutdown_log\" 2>&1" rc=$? if [ $rc -ne 0 ] ; then ocf_log err "${LH} RMQ-server app cannot be stopped." @@ -1161,7 +1161,7 @@ start_beam_process() { ocf_log warn "${LH} found old PID-file '${OCF_RESKEY_pid_file}'." pid=$(cat ${OCF_RESKEY_pid_file}) if [ "${pid}" -a -d "/proc/${pid}" ] ; then - ocf_run cat /proc/${pid}/cmdline | grep -c 'bin/beam' 2>&1 > /dev/null + ocf_run cat /proc/${pid}/cmdline | grep -c 'bin/beam' > /dev/null 2>&1 rc=$? if [ $rc -eq $OCF_SUCCESS ] ; then ocf_log warn "${LH} found beam process with PID=${pid}, killing...'." @@ -1404,7 +1404,7 @@ get_status() { # try to parse the which_applications() output only if it exited w/o errors if [ "${what}" -a $rc -eq 0 ] ; then rc=$OCF_NOT_RUNNING - echo "$body" | grep "\{${what}," 2>&1 > /dev/null && rc=$OCF_SUCCESS + echo "$body" | grep "\{${what}," > /dev/null 2>&1 && rc=$OCF_SUCCESS if [ $rc -ne $OCF_SUCCESS ] ; then ocf_log info "${LH} app ${what} was not found in command output: ${body}" @@ -1667,7 +1667,7 @@ node_health_check_local() { node_health_check_legacy() { local rc_alive local timeout_alive - su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null" + su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels > /dev/null 2>&1" rc_alive=$? [ $rc_alive -eq 137 -o $rc_alive -eq 124 ] && ocf_log err "${LH} 'rabbitmqctl list_channels' timed out, per-node explanation: $(enhanced_list_channels)" check_timeouts $rc_alive "rabbit_list_channels_timeouts" "list_channels" @@ -1706,11 +1706,11 @@ node_health_check_legacy() { rc=$OCF_ERR_GENERIC elif [ -n "${alarms}" ]; then - for node in "${alarms}"; do + for node in ${alarms}; do name=`echo ${node} | perl -n -e "m/memory,'(?<n>\S+)+'/ && print \"$+{n}\n\""` if [ "${name}" = "${RABBITMQ_NODENAME}" ] ; then ocf_log err "${LH} Found raised memory alarm. Erasing the alarm and restarting." - su_rabbit_cmd "${OCF_RESKEY_ctl} set_vm_memory_high_watermark 10 2>&1 > /dev/null" + su_rabbit_cmd "${OCF_RESKEY_ctl} set_vm_memory_high_watermark 10 > /dev/null 2>&1" rc=$OCF_ERR_GENERIC break fi diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl deleted file mode 100644 index 32b7641a79..0000000000 --- a/test/inet_proxy_dist.erl +++ /dev/null @@ -1,201 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_proxy_dist). - -%% A distribution plugin that uses the usual inet_tcp_dist but allows -%% insertion of a proxy at the receiving end. - -%% inet_*_dist "behaviour" --export([listen/1, accept/1, accept_connection/5, - setup/5, close/1, select/1, is_node_name/1]). - -%% For copypasta from inet_tcp_dist --export([do_setup/6]). --import(error_logger,[error_msg/2]). - --define(REAL, inet_tcp_dist). - -%%---------------------------------------------------------------------------- - -listen(Name) -> ?REAL:listen(Name). -select(Node) -> ?REAL:select(Node). -accept(Listen) -> ?REAL:accept(Listen). -close(Socket) -> ?REAL:close(Socket). -is_node_name(Node) -> ?REAL:is_node_name(Node). - -accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime). - -%% This is copied from inet_tcp_dist, in order to change the -%% output of erl_epmd:port_please/2. - --include_lib("kernel/include/net_address.hrl"). --include_lib("kernel/include/dist_util.hrl"). - -setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> - spawn_opt(?MODULE, do_setup, - [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], - [link, {priority, max}]). - -do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) -> - ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]), - [Name, Address] = splitnode(Node, LongOrShortNames), - case inet:getaddr(Address, inet) of - {ok, Ip} -> - Timer = dist_util:start_timer(SetupTime), - case erl_epmd:port_please(Name, Ip) of - {port, TcpPort, Version} -> - ?trace("port_please(~p) -> version ~p~n", - [Node,Version]), - dist_util:reset_timer(Timer), - %% Modification START - Ret = application:get_env(kernel, - dist_and_proxy_ports_map), - PortsMap = case Ret of - {ok, M} -> M; - undefined -> [] - end, - ProxyPort = case inet_tcp_proxy:is_enabled() of - true -> proplists:get_value(TcpPort, PortsMap, TcpPort); - false -> TcpPort - end, - case inet_tcp:connect(Ip, ProxyPort, - [{active, false}, - {packet,2}]) of - {ok, Socket} -> - {ok, {_, SrcPort}} = inet:sockname(Socket), - ok = inet_tcp_proxy_manager:register( - node(), Node, SrcPort, TcpPort, ProxyPort), - %% Modification END - HSData = #hs_data{ - kernel_pid = Kernel, - other_node = Node, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - other_version = Version, - f_send = fun inet_tcp:send/2, - f_recv = fun inet_tcp:recv/3, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, false}, - {packet, 4}, - nodelay()]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, true}, - {deliver, port}, - {packet, 4}, - nodelay()]) - end, - f_getll = fun inet:getll/1, - f_address = - fun(_,_) -> - #net_address{ - address = {Ip,TcpPort}, - host = Address, - protocol = tcp, - family = inet} - end, - mf_tick = fun tick/1, - mf_getstat = fun inet_tcp_dist:getstat/1, - request_type = Type - }, - dist_util:handshake_we_started(HSData); - R -> - io:format("~p failed! ~p~n", [node(), R]), - %% Other Node may have closed since - %% port_please ! - ?trace("other node (~p) " - "closed since port_please.~n", - [Node]), - ?shutdown(Node) - end; - _ -> - ?trace("port_please (~p) " - "failed.~n", [Node]), - ?shutdown(Node) - end; - _Other -> - ?trace("inet_getaddr(~p) " - "failed (~p).~n", [Node,_Other]), - ?shutdown(Node) - end. - -%% If Node is illegal terminate the connection setup!! -splitnode(Node, LongOrShortNames) -> - case split_node(atom_to_list(Node), $@, []) of - [Name|Tail] when Tail =/= [] -> - Host = lists:append(Tail), - case split_node(Host, $., []) of - [_] when LongOrShortNames =:= longnames -> - error_msg("** System running to use " - "fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), - ?shutdown(Node); - L when length(L) > 1, LongOrShortNames =:= shortnames -> - error_msg("** System NOT running to use fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), - ?shutdown(Node); - _ -> - [Name, Host] - end; - [_] -> - error_msg("** Nodename ~p illegal, no '@' character **~n", - [Node]), - ?shutdown(Node); - _ -> - error_msg("** Nodename ~p illegal **~n", [Node]), - ?shutdown(Node) - end. - -split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; -split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); -split_node([], _, Ack) -> [lists:reverse(Ack)]. - -%% we may not always want the nodelay behaviour -%% for performance reasons - -nodelay() -> - case application:get_env(kernel, dist_nodelay) of - undefined -> - {nodelay, true}; - {ok, true} -> - {nodelay, true}; - {ok, false} -> - {nodelay, false}; - _ -> - {nodelay, true} - end. - -tick(Socket) -> - case inet_tcp:send(Socket, [], [force]) of - {error, closed} -> - self() ! {tcp_closed, Socket}, - {error, closed}; - R -> - R - end. diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl deleted file mode 100644 index 4498b8f952..0000000000 --- a/test/inet_tcp_proxy.erl +++ /dev/null @@ -1,134 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_tcp_proxy). - -%% A TCP proxy for insertion into the Erlang distribution mechanism, -%% which allows us to simulate network partitions. - --export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]). - --define(TABLE, ?MODULE). - -%% This can't start_link because there's no supervision hierarchy we -%% can easily fit it into (we need to survive all application -%% restarts). So we have to do some horrible error handling. - -start(ManagerNode, DistPort, ProxyPort) -> - application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode), - Parent = self(), - Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)), - MRef = erlang:monitor(process, Pid), - receive - ready -> - erlang:demonitor(MRef), - ok; - {'DOWN', MRef, _, _, Reason} -> - {error, Reason} - end. - -reconnect(Nodes) -> - [erlang:disconnect_node(N) || N <- Nodes, N =/= node()], - ok. - -is_enabled() -> - lists:member(?TABLE, ets:all()). - -allow(Node) -> - rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n", - [?MODULE, node(), Node]), - ets:delete(?TABLE, Node). -block(Node) -> - rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n", - [?MODULE, node(), Node]), - ets:insert(?TABLE, {Node, block}). - -%%---------------------------------------------------------------------------- - -error_handler(Thunk) -> - fun () -> - try - Thunk() - catch _:{{nodedown, _}, _} -> - %% The only other node we ever talk to is the test - %% runner; if that's down then the test is nearly - %% over; die quietly. - ok; - _:X -> - io:format(user, "TCP proxy died with ~p~n At ~p~n", - [X, erlang:get_stacktrace()]), - erlang:halt(1) - end - end. - -go(Parent, Port, ProxyPort) -> - ets:new(?TABLE, [public, named_table]), - {ok, Sock} = gen_tcp:listen(ProxyPort, [inet, - {reuseaddr, true}]), - Parent ! ready, - accept_loop(Sock, Port). - -accept_loop(ListenSock, Port) -> - {ok, Sock} = gen_tcp:accept(ListenSock), - Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)), - ok = gen_tcp:controlling_process(Sock, Proxy), - accept_loop(ListenSock, Port). - -run_it(SockIn, Port) -> - case {inet:peername(SockIn), inet:sockname(SockIn)} of - {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} -> - {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort), - case node() of - This -> ok; - _ -> exit({not_me, node(), This}) - end, - {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]), - run_loop({SockIn, SockOut}, Remote, []); - _ -> - ok - end. - -run_loop(Sockets, RemoteNode, Buf0) -> - Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode), - receive - {tcp, Sock, Data} -> - Buf = [Data | Buf0], - case {Block, get(dist_was_blocked)} of - {true, false} -> - put(dist_was_blocked, Block), - rabbit_log:warning( - "(~s) Distribution BLOCKED between ~s and ~s~n", - [?MODULE, node(), RemoteNode]); - {false, S} when S =:= true orelse S =:= undefined -> - put(dist_was_blocked, Block), - rabbit_log:warning( - "(~s) Distribution allowed between ~s and ~s~n", - [?MODULE, node(), RemoteNode]); - _ -> - ok - end, - case Block of - false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)), - run_loop(Sockets, RemoteNode, []); - true -> run_loop(Sockets, RemoteNode, Buf) - end; - {tcp_closed, Sock} -> - gen_tcp:close(other(Sock, Sockets)); - X -> - exit({weirdness, X}) - end. - -other(A, {A, B}) -> B; -other(B, {A, B}) -> A. diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl deleted file mode 100644 index 18255b8d48..0000000000 --- a/test/inet_tcp_proxy_manager.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% --module(inet_tcp_proxy_manager). - -%% The TCP proxies need to decide whether to block based on the node -%% they're running on, and the node connecting to them. The trouble -%% is, they don't have an easy way to determine the latter. Therefore -%% when A connects to B we register the source port used by A here, so -%% that B can later look it up and find out who A is without having to -%% sniff the distribution protocol. -%% -%% That does unfortunately mean that we need a central control -%% thing. We assume here it's running on the node called -%% 'standalone_test' since that's where tests are orchestrated from. -%% -%% Yes, this leaks. For its intended lifecycle, that's fine. - --behaviour(gen_server). - --export([start/0, register/5, lookup/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(NODE, ct). - --record(state, {ports, pending}). - -start() -> - gen_server:start({local, ?MODULE}, ?MODULE, [], []). - -register(_From, _To, _SrcPort, Port, Port) -> - %% No proxy, don't register - ok; -register(From, To, SrcPort, _Port, _ProxyPort) -> - gen_server:call(name(), {register, From, To, SrcPort}, infinity). - -lookup(SrcPort) -> - gen_server:call(name(), {lookup, SrcPort}, infinity). - -controller_node() -> - {ok, ManagerNode} = application:get_env(kernel, - inet_tcp_proxy_manager_node), - ManagerNode. - -name() -> - {?MODULE, controller_node()}. - -%%---------------------------------------------------------------------------- - -init([]) -> - net_kernel:monitor_nodes(true), - {ok, #state{ports = dict:new(), - pending = []}}. - -handle_call({register, FromNode, ToNode, SrcPort}, _From, - State = #state{ports = Ports, - pending = Pending}) -> - {Notify, Pending2} = - lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending), - [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify], - {reply, ok, - State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports), - pending = Pending2}}; - -handle_call({lookup, SrcPort}, From, - State = #state{ports = Ports, pending = Pending}) -> - case dict:find(SrcPort, Ports) of - {ok, {FromNode, ToNode}} -> - {reply, {ok, FromNode, ToNode}, State}; - error -> - {noreply, State#state{pending = [{SrcPort, From} | Pending]}} - end; - -handle_call(_Req, _From, State) -> - {reply, unknown_request, State}. - -handle_cast(_C, State) -> - {noreply, State}. - -handle_info({nodedown, Node}, State = #state{ports = Ports}) -> - Ports1 = dict:filter( - fun (_, {From, To}) -> - Node =/= From andalso Node =/= To - end, Ports), - {noreply, State#state{ports = Ports1}}; - -handle_info(_I, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_, State, _) -> {ok, State}. |
