summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-11-19 19:37:12 +0300
committerMichael Klishin <michael@novemberain.com>2015-11-19 19:37:12 +0300
commit1676d53aba201590665124973178105d31ee6b65 (patch)
tree1c9c184f29d70798fa6dec8c938658ab2cc64ab4
parent7c9f0454700d4748ff53e4e1097d0aa600d997aa (diff)
parented42f6c4d1d028b8055868f5a2543264e5a3b0b3 (diff)
downloadrabbitmq-server-git-1676d53aba201590665124973178105d31ee6b65.tar.gz
Merge pull request #414 from rabbitmq/rabbitmq-server-260
Use Ranch for TCP acceptor pool
-rw-r--r--Makefile3
-rw-r--r--src/rabbit.app.src6
-rw-r--r--src/rabbit_connection_sup.erl9
-rw-r--r--src/rabbit_vm.erl6
-rw-r--r--src/tcp_acceptor.erl105
-rw-r--r--src/tcp_acceptor_sup.erl43
-rw-r--r--src/tcp_listener.erl53
-rw-r--r--src/tcp_listener_sup.erl60
8 files changed, 57 insertions, 228 deletions
diff --git a/Makefile b/Makefile
index 2c7e7b7867..5ce7af4dcd 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,8 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
# Release artifacts are put in $(PACKAGES_DIR).
PACKAGES_DIR ?= $(abspath PACKAGES)
-DEPS = $(PLUGINS)
+DEPS = ranch $(PLUGINS)
+dep_ranch = git https://github.com/ninenines/ranch 1.2.0
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index fe9b1635ea..ab25c9e089 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -8,7 +8,6 @@
rabbit_node_monitor,
rabbit_router,
rabbit_sup,
- rabbit_tcp_client_sup,
rabbit_direct_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
@@ -57,10 +56,7 @@
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{cluster_keepalive_interval, 10000},
- {tcp_listen_options, [binary,
- {packet, raw},
- {reuseaddr, true},
- {backlog, 128},
+ {tcp_listen_options, [{backlog, 128},
{nodelay, true},
{linger, {true, 0}},
{exit_on_close, false}]},
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 982608556a..a64a2217da 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -17,8 +17,9 @@
-module(rabbit_connection_sup).
-behaviour(supervisor2).
+-behaviour(ranch_protocol).
--export([start_link/0, reader/1]).
+-export([start_link/4, reader/1]).
-export([init/1]).
@@ -28,14 +29,14 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid(), pid()}).
+-spec(start_link/4 :: (any(), rabbit_net:socket(), module(), any()) -> {'ok', pid(), pid()}).
-spec(reader/1 :: (pid()) -> pid()).
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
+start_link(Ref, Sock, _Transport, _Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
@@ -55,7 +56,7 @@ start_link() ->
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link, [HelperSup]},
+ {reader, {rabbit_reader, start_link, [HelperSup, Ref, Sock]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 534a8883e1..4146eeb447 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -134,7 +134,11 @@ interesting_sups0() ->
PluginProcs = plugin_sups(),
[MsgIndexProcs, MgmtDbProcs, PluginProcs].
-conn_sups() -> [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup].
+%% @todo I have doubts about this ssl_connection_sup and the
+%% amqp_sup. They don't seem to exist anywhere.
+%% @todo We probably need to put the equivalent process here
+%% (the one our Ranch supervisor is under).
+conn_sups() -> [ssl_connection_sup, amqp_sup].
conn_sups(With) -> [{Sup, With} || Sup <- conn_sups()].
distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1} |
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
deleted file mode 100644
index 75f216c3dd..0000000000
--- a/src/tcp_acceptor.erl
+++ /dev/null
@@ -1,105 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(tcp_acceptor).
-
--behaviour(gen_server).
-
--export([start_link/2]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(state, {callback, sock, ref}).
-
-%%--------------------------------------------------------------------
-
-start_link(Callback, LSock) ->
- gen_server:start_link(?MODULE, {Callback, LSock}, []).
-
-%%--------------------------------------------------------------------
-
-init({Callback, LSock}) ->
- gen_server:cast(self(), accept),
- {ok, #state{callback=Callback, sock=LSock}}.
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(accept, State) ->
- ok = file_handle_cache:obtain(),
- accept(State);
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({inet_async, LSock, Ref, {ok, Sock}},
- State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
-
- %% patch up the socket so it looks like one we got from
- %% gen_tcp:accept/1
- {ok, Mod} = inet_db:lookup_socket(LSock),
- inet_db:register_socket(Sock, Mod),
-
- %% handle
- case tune_buffer_size(Sock) of
- ok -> file_handle_cache:transfer(
- apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain();
- {error, enotconn} -> catch port_close(Sock);
- {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock),
- error_logger:error_msg(
- "failed to tune buffer size of "
- "connection accepted on ~s:~p - ~s~n",
- [rabbit_misc:ntoab(IPAddress), Port,
- rabbit_misc:format_inet_error(Err)]),
- catch port_close(Sock)
- end,
-
- %% accept more
- accept(State);
-
-handle_info({inet_async, LSock, Ref, {error, Reason}},
- State=#state{sock=LSock, ref=Ref}) ->
- case Reason of
- closed -> {stop, normal, State}; %% listening socket closed
- econnaborted -> accept(State); %% client sent RST before we accepted
- _ -> {stop, {accept_failed, Reason}, State}
- end;
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-accept(State = #state{sock=LSock}) ->
- case prim_inet:async_accept(LSock, -1) of
- {ok, Ref} -> {noreply, State#state{ref=Ref}};
- Error -> {stop, {cannot_accept, Error}, State}
- end.
-
-tune_buffer_size(Sock) ->
- case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of
- {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- inet:setopts(Sock, [{buffer, BufSz}]);
- Error -> Error
- end.
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
deleted file mode 100644
index 22c886e0ab..0000000000
--- a/src/tcp_acceptor_sup.erl
+++ /dev/null
@@ -1,43 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(tcp_acceptor_sup).
-
--behaviour(supervisor).
-
--export([start_link/2]).
-
--export([init/1]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(mfargs() :: {atom(), atom(), [any()]}).
-
--spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(Name, Callback) ->
- supervisor:start_link({local,Name}, ?MODULE, Callback).
-
-init(Callback) ->
- {ok, {{simple_one_for_one, 10, 10},
- [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
- transient, brutal_kill, worker, [tcp_acceptor]}]}}.
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 307249af09..571622c80d 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -18,12 +18,12 @@
-behaviour(gen_server).
--export([start_link/8]).
+-export([start_link/5]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {sock, on_startup, on_shutdown, label}).
+-record(state, {on_startup, on_shutdown, label, ip, port}).
%%----------------------------------------------------------------------------
@@ -31,52 +31,31 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/8 ::
- (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
- integer(), atom(), mfargs(), mfargs(), string()) ->
+-spec(start_link/5 ::
+ (inet:ip_address(), inet:port_number(),
+ mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%--------------------------------------------------------------------
-start_link(IPAddress, Port, SocketOpts,
- ConcurrentAcceptorCount, AcceptorSup,
+start_link(IPAddress, Port,
OnStartup, OnShutdown, Label) ->
gen_server:start_link(
- ?MODULE, {IPAddress, Port, SocketOpts,
- ConcurrentAcceptorCount, AcceptorSup,
+ ?MODULE, {IPAddress, Port,
OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
-init({IPAddress, Port, SocketOpts,
- ConcurrentAcceptorCount, AcceptorSup,
- {M,F,A} = OnStartup, OnShutdown, Label}) ->
+init({IPAddress, Port, {M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
- case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
- {active, false}]) of
- {ok, LSock} ->
- lists:foreach(fun (_) ->
- {ok, _APid} = supervisor:start_child(
- AcceptorSup, [LSock])
- end,
- lists:duplicate(ConcurrentAcceptorCount, dummy)),
- {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
- error_logger:info_msg(
- "started ~s on ~s:~p~n",
- [Label, rabbit_misc:ntoab(LIPAddress), LPort]),
- apply(M, F, A ++ [IPAddress, Port]),
- {ok, #state{sock = LSock,
- on_startup = OnStartup, on_shutdown = OnShutdown,
- label = Label}};
- {error, Reason} ->
- error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p (~s)~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port,
- Reason, inet:format_error(Reason)]),
- {stop, {cannot_listen, IPAddress, Port, Reason}}
- end.
+ error_logger:info_msg(
+ "started ~s on ~s:~p~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port]),
+ apply(M, F, A ++ [IPAddress, Port]),
+ {ok, #state{on_startup = OnStartup, on_shutdown = OnShutdown,
+ label = Label, ip=IPAddress, port=Port}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -87,9 +66,7 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
- {ok, {IPAddress, Port}} = inet:sockname(LSock),
- gen_tcp:close(LSock),
+terminate(_Reason, #state{on_shutdown = {M,F,A}, label=Label, ip=IPAddress, port=Port}) ->
error_logger:info_msg("stopped ~s on ~s:~p~n",
[Label, rabbit_misc:ntoab(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 94bdecc28c..54da154d8d 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/7, start_link/8]).
+-export([start_link/9, start_link/10]).
-export([init/1]).
@@ -28,43 +28,41 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/7 ::
- (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
- mfargs(), mfargs(), mfargs(), string()) ->
+-spec(start_link/9 ::
+ (inet:ip_address(), inet:port_number(), module(), [gen_tcp:listen_option()],
+ module(), any(), mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
--spec(start_link/8 ::
- (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
- mfargs(), mfargs(), mfargs(), integer(), string()) ->
+-spec(start_link/10 ::
+ (inet:ip_address(), inet:port_number(), module(), [gen_tcp:listen_option()],
+ module(), any(), mfargs(), mfargs(), integer(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
-start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, Label) ->
- start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, 1, Label).
+start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
+ Label) ->
+ start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
+ 1, Label).
-start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount, Label) ->
+start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
+ ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
- ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount, Label}).
+ ?MODULE, {IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
+ ConcurrentAcceptorCount, Label}).
-init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount, Label}) ->
- %% This is gross. The tcp_listener needs to know about the
- %% tcp_acceptor_sup, and the only way I can think of accomplishing
- %% that without jumping through hoops is to register the
- %% tcp_acceptor_sup.
- Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port),
- {ok, {{one_for_all, 10, 10},
- [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
- [Name, AcceptCallback]},
- transient, infinity, supervisor, [tcp_acceptor_sup]},
- {tcp_listener, {tcp_listener, start_link,
- [IPAddress, Port, SocketOpts,
- ConcurrentAcceptorCount, Name,
- OnStartup, OnShutdown, Label]},
- transient, 16#ffffffff, worker, [tcp_listener]}]}}.
+init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
+ ConcurrentAcceptorCount, Label}) ->
+ {ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout),
+ {ok, {{one_for_all, 10, 10}, [
+ ranch:child_spec({acceptor, IPAddress, Port}, ConcurrentAcceptorCount,
+ Transport, [{port, Port}, {ip, IPAddress},
+ {max_connections, infinity},
+ {ack_timeout, AckTimeout},
+ {connection_type, supervisor}|SocketOpts],
+ ProtoSup, ProtoOpts),
+ {tcp_listener, {tcp_listener, start_link,
+ [IPAddress, Port,
+ OnStartup, OnShutdown, Label]},
+ transient, 16#ffffffff, worker, [tcp_listener]}]}}.