diff options
| author | Tim Watson <tim@rabbitmq.com> | 2013-10-09 16:45:39 +0100 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2013-10-09 16:45:39 +0100 |
| commit | ee72323127b5cec01b75de7b4ac4b846a52a43dc (patch) | |
| tree | 7fec2b23453df09f9ffa14ab4a6d13e49a23213a /src | |
| parent | 29ed81dc1972946abec0dc016d5427cbed3a7fcd (diff) | |
| download | rabbitmq-server-git-ee72323127b5cec01b75de7b4ac4b846a52a43dc.tar.gz | |
Defer starting a queue collector until we've received tune-ok
By waiting for the connection to be fully opened, we reduce resource
usage for abandoned connections and DoS vectors. The collector is
started as a child of the connection_helper_sup, thus avoiding a
potential deadlock with the parent supervisor during shutdown.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_connection_helper_sup.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 21 |
3 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index 580aa65f23..8f6c769806 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -19,12 +19,16 @@ -behaviour(supervisor2). -export([start_link/0]). +-export([start_queue_collector/1]). -export([init/1]). +-include("rabbit.hrl"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -32,6 +36,12 @@ start_link() -> supervisor2:start_link(?MODULE, []). +start_queue_collector(SupPid) -> + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). + %%-------------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index d59698732c..1f4ab19c5a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -37,11 +37,6 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Collector} = - supervisor2:start_child( - SupPid, - {collector, {rabbit_queue_collector, start_link, []}, - intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate %% them cleanly. But for 1.0 readers we can't start the real @@ -61,7 +56,7 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSup3Pid, Collector, + [ChannelSup3Pid, ConHelperSupPid, rabbit_heartbeat:start_heartbeat_fun(ConHelperSupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 157b827094..9e81511abf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -36,8 +36,8 @@ %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, - connection_state, queue_collector, heartbeater, stats_timer, - ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, + connection_state, helper_sup_pid, queue_collector, heartbeater, + stats_timer, ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -104,19 +104,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> +start_link(ChannelSup3Pid, HelperPid, StartHeartbeatFun) -> {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, - Collector, StartHeartbeatFun])}. + HelperPid, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -205,7 +205,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -242,7 +242,8 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, recv_len = 0, pending_recv = false, connection_state = pre_init, - queue_collector = Collector, + queue_collector = undefined, %% started on tune-ok + helper_sup_pid = HelperPid, heartbeater = none, ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, @@ -851,6 +852,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, + helper_sup_pid = HelperPid, sock = Sock, start_heartbeat_fun = SHF}) -> ServerFrameMax = server_frame_max(), @@ -863,6 +865,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(HelperPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), @@ -873,6 +877,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection#connection{ timeout_sec = ClientHeartbeat, frame_max = FrameMax}, + queue_collector = Collector, heartbeater = Heartbeater} end; |
