diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.app.src | 6 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 6 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 105 | ||||
| -rw-r--r-- | src/tcp_acceptor_sup.erl | 43 | ||||
| -rw-r--r-- | src/tcp_listener.erl | 53 | ||||
| -rw-r--r-- | src/tcp_listener_sup.erl | 60 |
7 files changed, 55 insertions, 227 deletions
diff --git a/src/rabbit.app.src b/src/rabbit.app.src index fe9b1635ea..ab25c9e089 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -8,7 +8,6 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup, rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]}, %% we also depend on crypto, public_key and ssl but they shouldn't be @@ -57,10 +56,7 @@ {reverse_dns_lookups, false}, {cluster_partition_handling, ignore}, {cluster_keepalive_interval, 10000}, - {tcp_listen_options, [binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 128}, + {tcp_listen_options, [{backlog, 128}, {nodelay, true}, {linger, {true, 0}}, {exit_on_close, false}]}, diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 982608556a..a64a2217da 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -17,8 +17,9 @@ -module(rabbit_connection_sup). -behaviour(supervisor2). +-behaviour(ranch_protocol). --export([start_link/0, reader/1]). +-export([start_link/4, reader/1]). -export([init/1]). @@ -28,14 +29,14 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid(), pid()}). +-spec(start_link/4 :: (any(), rabbit_net:socket(), module(), any()) -> {'ok', pid(), pid()}). -spec(reader/1 :: (pid()) -> pid()). -endif. %%-------------------------------------------------------------------------- -start_link() -> +start_link(Ref, Sock, _Transport, _Opts) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate @@ -55,7 +56,7 @@ start_link() -> {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, [HelperSup]}, + {reader, {rabbit_reader, start_link, [HelperSup, Ref, Sock]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 534a8883e1..4146eeb447 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -134,7 +134,11 @@ interesting_sups0() -> PluginProcs = plugin_sups(), [MsgIndexProcs, MgmtDbProcs, PluginProcs]. -conn_sups() -> [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup]. +%% @todo I have doubts about this ssl_connection_sup and the +%% amqp_sup. They don't seem to exist anywhere. +%% @todo We probably need to put the equivalent process here +%% (the one our Ranch supervisor is under). +conn_sups() -> [ssl_connection_sup, amqp_sup]. conn_sups(With) -> [{Sup, With} || Sup <- conn_sups()]. distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1} | diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl deleted file mode 100644 index 75f216c3dd..0000000000 --- a/src/tcp_acceptor.erl +++ /dev/null @@ -1,105 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(tcp_acceptor). - --behaviour(gen_server). - --export([start_link/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {callback, sock, ref}). - -%%-------------------------------------------------------------------- - -start_link(Callback, LSock) -> - gen_server:start_link(?MODULE, {Callback, LSock}, []). - -%%-------------------------------------------------------------------- - -init({Callback, LSock}) -> - gen_server:cast(self(), accept), - {ok, #state{callback=Callback, sock=LSock}}. - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(accept, State) -> - ok = file_handle_cache:obtain(), - accept(State); - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({inet_async, LSock, Ref, {ok, Sock}}, - State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> - - %% patch up the socket so it looks like one we got from - %% gen_tcp:accept/1 - {ok, Mod} = inet_db:lookup_socket(LSock), - inet_db:register_socket(Sock, Mod), - - %% handle - case tune_buffer_size(Sock) of - ok -> file_handle_cache:transfer( - apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain(); - {error, enotconn} -> catch port_close(Sock); - {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock), - error_logger:error_msg( - "failed to tune buffer size of " - "connection accepted on ~s:~p - ~s~n", - [rabbit_misc:ntoab(IPAddress), Port, - rabbit_misc:format_inet_error(Err)]), - catch port_close(Sock) - end, - - %% accept more - accept(State); - -handle_info({inet_async, LSock, Ref, {error, Reason}}, - State=#state{sock=LSock, ref=Ref}) -> - case Reason of - closed -> {stop, normal, State}; %% listening socket closed - econnaborted -> accept(State); %% client sent RST before we accepted - _ -> {stop, {accept_failed, Reason}, State} - end; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -accept(State = #state{sock=LSock}) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> {noreply, State#state{ref=Ref}}; - Error -> {stop, {cannot_accept, Error}, State} - end. - -tune_buffer_size(Sock) -> - case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of - {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - inet:setopts(Sock, [{buffer, BufSz}]); - Error -> Error - end. diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl deleted file mode 100644 index 22c886e0ab..0000000000 --- a/src/tcp_acceptor_sup.erl +++ /dev/null @@ -1,43 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(tcp_acceptor_sup). - --behaviour(supervisor). - --export([start_link/2]). - --export([init/1]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(mfargs() :: {atom(), atom(), [any()]}). - --spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(Name, Callback) -> - supervisor:start_link({local,Name}, ?MODULE, Callback). - -init(Callback) -> - {ok, {{simple_one_for_one, 10, 10}, - [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]}, - transient, brutal_kill, worker, [tcp_acceptor]}]}}. diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 307249af09..571622c80d 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -18,12 +18,12 @@ -behaviour(gen_server). --export([start_link/8]). +-export([start_link/5]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {sock, on_startup, on_shutdown, label}). +-record(state, {on_startup, on_shutdown, label, ip, port}). %%---------------------------------------------------------------------------- @@ -31,52 +31,31 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/8 :: - (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], - integer(), atom(), mfargs(), mfargs(), string()) -> +-spec(start_link/5 :: + (inet:ip_address(), inet:port_number(), + mfargs(), mfargs(), string()) -> rabbit_types:ok_pid_or_error()). -endif. %%-------------------------------------------------------------------- -start_link(IPAddress, Port, SocketOpts, - ConcurrentAcceptorCount, AcceptorSup, +start_link(IPAddress, Port, OnStartup, OnShutdown, Label) -> gen_server:start_link( - ?MODULE, {IPAddress, Port, SocketOpts, - ConcurrentAcceptorCount, AcceptorSup, + ?MODULE, {IPAddress, Port, OnStartup, OnShutdown, Label}, []). %%-------------------------------------------------------------------- -init({IPAddress, Port, SocketOpts, - ConcurrentAcceptorCount, AcceptorSup, - {M,F,A} = OnStartup, OnShutdown, Label}) -> +init({IPAddress, Port, {M,F,A} = OnStartup, OnShutdown, Label}) -> process_flag(trap_exit, true), - case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, - {active, false}]) of - {ok, LSock} -> - lists:foreach(fun (_) -> - {ok, _APid} = supervisor:start_child( - AcceptorSup, [LSock]) - end, - lists:duplicate(ConcurrentAcceptorCount, dummy)), - {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg( - "started ~s on ~s:~p~n", - [Label, rabbit_misc:ntoab(LIPAddress), LPort]), - apply(M, F, A ++ [IPAddress, Port]), - {ok, #state{sock = LSock, - on_startup = OnStartup, on_shutdown = OnShutdown, - label = Label}}; - {error, Reason} -> - error_logger:error_msg( - "failed to start ~s on ~s:~p - ~p (~s)~n", - [Label, rabbit_misc:ntoab(IPAddress), Port, - Reason, inet:format_error(Reason)]), - {stop, {cannot_listen, IPAddress, Port, Reason}} - end. + error_logger:info_msg( + "started ~s on ~s:~p~n", + [Label, rabbit_misc:ntoab(IPAddress), Port]), + apply(M, F, A ++ [IPAddress, Port]), + {ok, #state{on_startup = OnStartup, on_shutdown = OnShutdown, + label = Label, ip=IPAddress, port=Port}}. handle_call(_Request, _From, State) -> {noreply, State}. @@ -87,9 +66,7 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> - {ok, {IPAddress, Port}} = inet:sockname(LSock), - gen_tcp:close(LSock), +terminate(_Reason, #state{on_shutdown = {M,F,A}, label=Label, ip=IPAddress, port=Port}) -> error_logger:info_msg("stopped ~s on ~s:~p~n", [Label, rabbit_misc:ntoab(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 94bdecc28c..54da154d8d 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/7, start_link/8]). +-export([start_link/9, start_link/10]). -export([init/1]). @@ -28,43 +28,41 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/7 :: - (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], - mfargs(), mfargs(), mfargs(), string()) -> +-spec(start_link/9 :: + (inet:ip_address(), inet:port_number(), module(), [gen_tcp:listen_option()], + module(), any(), mfargs(), mfargs(), string()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/8 :: - (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], - mfargs(), mfargs(), mfargs(), integer(), string()) -> +-spec(start_link/10 :: + (inet:ip_address(), inet:port_number(), module(), [gen_tcp:listen_option()], + module(), any(), mfargs(), mfargs(), integer(), string()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- -start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, Label) -> - start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, 1, Label). +start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, + Label) -> + start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, + 1, Label). -start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount, Label) -> +start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, + ConcurrentAcceptorCount, Label) -> supervisor:start_link( - ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount, Label}). + ?MODULE, {IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, + ConcurrentAcceptorCount, Label}). -init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount, Label}) -> - %% This is gross. The tcp_listener needs to know about the - %% tcp_acceptor_sup, and the only way I can think of accomplishing - %% that without jumping through hoops is to register the - %% tcp_acceptor_sup. - Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port), - {ok, {{one_for_all, 10, 10}, - [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link, - [Name, AcceptCallback]}, - transient, infinity, supervisor, [tcp_acceptor_sup]}, - {tcp_listener, {tcp_listener, start_link, - [IPAddress, Port, SocketOpts, - ConcurrentAcceptorCount, Name, - OnStartup, OnShutdown, Label]}, - transient, 16#ffffffff, worker, [tcp_listener]}]}}. +init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, + ConcurrentAcceptorCount, Label}) -> + {ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout), + {ok, {{one_for_all, 10, 10}, [ + ranch:child_spec({acceptor, IPAddress, Port}, ConcurrentAcceptorCount, + Transport, [{port, Port}, {ip, IPAddress}, + {max_connections, infinity}, + {ack_timeout, AckTimeout}, + {connection_type, supervisor}|SocketOpts], + ProtoSup, ProtoOpts), + {tcp_listener, {tcp_listener, start_link, + [IPAddress, Port, + OnStartup, OnShutdown, Label]}, + transient, 16#ffffffff, worker, [tcp_listener]}]}}. |
