summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-10-09 16:45:39 +0100
committerTim Watson <tim@rabbitmq.com>2013-10-09 16:45:39 +0100
commitee72323127b5cec01b75de7b4ac4b846a52a43dc (patch)
tree7fec2b23453df09f9ffa14ab4a6d13e49a23213a /src
parent29ed81dc1972946abec0dc016d5427cbed3a7fcd (diff)
downloadrabbitmq-server-git-ee72323127b5cec01b75de7b4ac4b846a52a43dc.tar.gz
Defer starting a queue collector until we've received tune-ok
By waiting for the connection to be fully opened, we reduce resource usage for abandoned connections and DoS vectors. The collector is started as a child of the connection_helper_sup, thus avoiding a potential deadlock with the parent supervisor during shutdown.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_connection_helper_sup.erl10
-rw-r--r--src/rabbit_connection_sup.erl7
-rw-r--r--src/rabbit_reader.erl21
3 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index 580aa65f23..8f6c769806 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -19,12 +19,16 @@
-behaviour(supervisor2).
-export([start_link/0]).
+-export([start_queue_collector/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -32,6 +36,12 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
+start_queue_collector(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
+
%%--------------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index d59698732c..1f4ab19c5a 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -37,11 +37,6 @@
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, Collector} =
- supervisor2:start_child(
- SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
- intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
%% them cleanly. But for 1.0 readers we can't start the real
@@ -61,7 +56,7 @@ start_link() ->
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSup3Pid, Collector,
+ [ChannelSup3Pid, ConHelperSupPid,
rabbit_heartbeat:start_heartbeat_fun(ConHelperSupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 157b827094..9e81511abf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -36,8 +36,8 @@
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, queue_collector, heartbeater, stats_timer,
- ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ connection_state, helper_sup_pid, queue_collector, heartbeater,
+ stats_timer, ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
@@ -104,19 +104,19 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
+start_link(ChannelSup3Pid, HelperPid, StartHeartbeatFun) ->
{ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
- Collector, StartHeartbeatFun])}.
+ HelperPid, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
+init(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -205,7 +205,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
@@ -242,7 +242,8 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
- queue_collector = Collector,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup_pid = HelperPid,
heartbeater = none,
ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
@@ -851,6 +852,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
+ helper_sup_pid = HelperPid,
sock = Sock,
start_heartbeat_fun = SHF}) ->
ServerFrameMax = server_frame_max(),
@@ -863,6 +865,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ServerFrameMax]);
true ->
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(HelperPid),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
@@ -873,6 +877,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax},
+ queue_collector = Collector,
heartbeater = Heartbeater}
end;