summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-06 18:31:56 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-06 18:31:56 +0100
commit54c3910fb015e5165b0417eaeb48898823fef41a (patch)
treea3debc50e689579f9e81afbc512957ad2faae32e
parent10ae11e5ebd14c39c38c5f2217d1f93e794373d2 (diff)
downloadrabbitmq-server-git-54c3910fb015e5165b0417eaeb48898823fef41a.tar.gz
And now the channel, writer, limiter and framing_channel are also all suitably supervisored
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_channel.erl79
-rw-r--r--src/rabbit_channel_sup.erl66
-rw-r--r--src/rabbit_channel_sup_sup.erl49
-rw-r--r--src/rabbit_connection_sup.erl14
-rw-r--r--src/rabbit_framing_channel.erl20
-rw-r--r--src/rabbit_limiter.erl5
-rw-r--r--src/rabbit_networking.erl17
-rw-r--r--src/rabbit_reader.erl13
-rw-r--r--src/rabbit_writer.erl20
-rw-r--r--src/supervisor2.erl6
11 files changed, 213 insertions, 78 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 547f0a42e2..32a8d0a147 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -164,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 692d7bacf1..b91391eb15 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/6, do/2, do/3, shutdown/1]).
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -44,7 +44,7 @@
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
@@ -73,8 +73,8 @@
-type(ref() :: any()).
--spec(start_link/6 ::
- (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -94,11 +94,18 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- {ok, Pid} = gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []),
- Pid.
+start_link(Channel, ReaderPid, Username, VHost, CollectorPid) ->
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () ->
+ WriterPid = rabbit_channel_sup:writer(Parent),
+ State = init([Channel, Parent, ReaderPid, WriterPid,
+ Username, VHost, CollectorPid]),
+ gen_server2:enter_loop(
+ ?MODULE, [], State, self(), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE})
+ end)}.
do(Pid, Method) ->
do(Pid, Method, none).
@@ -146,30 +153,30 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
+ CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- {ok, #ch{state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
- hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ #ch{state = starting,
+ channel = Channel,
+ parent_pid = ParentPid,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none}}.
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -208,7 +215,8 @@ handle_cast({method, Method, Content}, State) ->
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
-handle_cast(terminate, State) ->
+handle_cast(terminate, State = #ch{parent_pid = ParentPid}) ->
+ supervisor2:stop(ParentPid),
{stop, shutdown, State};
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
@@ -1021,8 +1029,13 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
+ Me = self(),
+ {ok, LPid} =
+ supervisor2:start_child(
+ ParentPid,
+ {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
ok = limit_queues(LPid, State),
LPid.
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 0000000000..f8a7a7c6f4
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/7, writer/1, framing_channel/1, channel/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) ->
+ supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid,
+ Username, VHost, Collector]).
+
+init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, Username, VHost, Collector]},
+ permanent, ?MAX_WAIT, worker, [rabbit_channel]},
+ {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]},
+ permanent, ?MAX_WAIT, worker, [rabbit_writer]},
+ {framing_channel, {rabbit_framing_channel, start_link, []},
+ permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}
+ ]}}.
+
+writer(Pid) ->
+ hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])).
+
+channel(Pid) ->
+ hd(supervisor2:find_child(Pid, channel, worker, [rabbit_channel])).
+
+framing_channel(Pid) ->
+ hd(supervisor2:find_child(Pid, framing_channel, worker,
+ [rabbit_framing_channel])).
+
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
new file mode 100644
index 0000000000..42064709ca
--- /dev/null
+++ b/src/rabbit_channel_sup_sup.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_channel/2]).
+
+-export([init/1]).
+
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_channel_sup]}]}}.
+
+start_channel(Pid, Args) ->
+ supervisor2:start_child(Pid, Args).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index f6c4de2a12..cafe9612b3 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor2).
--export([start_link/0, stop/1]).
+-export([start_link/0, stop/1, reader/1, channel_sup_sup/1]).
-export([init/1]).
@@ -50,5 +50,15 @@ init([]) ->
[{reader, {rabbit_reader, start_link, []},
transient, ?MAX_WAIT, worker, [rabbit_reader]},
{collector, {rabbit_reader_queue_collector, start_link, []},
- transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]}
+ transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]},
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_channel_sup_sup]}
]}}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader, worker, [rabbit_reader])).
+
+channel_sup_sup(Pid) ->
+ hd(supervisor2:find_child(Pid, channel_sup_sup, supervisor,
+ [rabbit_channel_sup_sup])).
+
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index bc1a2a0835..d07d871be9 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,21 +32,17 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/2, process/2, shutdown/1]).
+-export([start_link/0, process/2, shutdown/1]).
%% internal
-export([mainloop/1]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs) ->
- spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of the
- %% channel or reader process terminates us too.
- process_flag(trap_exit, true),
- mainloop(apply(StartFun, StartArgs))
- end).
+start_link() ->
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -60,12 +56,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 878af02976..491ae7d6eb 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> {'ok', pid()}).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
@@ -74,8 +74,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid, UnackedMsgCount) ->
- {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
- Pid.
+ gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
shutdown(undefined) ->
ok;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 6502c6d177..e70946404a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -201,12 +201,10 @@ on_node_down(Node) ->
start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- hd([begin
- ok = rabbit_net:controlling_process(Sock, Reader),
- Reader ! {go, Sock, SockTransform},
- Reader
- end || {reader, Reader, worker, [rabbit_reader]}
- <- supervisor:which_children(Child)]).
+ Reader = rabbit_connection_sup:reader(Child),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -229,10 +227,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- {reader, Pid, worker, [rabbit_reader]}
- <- supervisor:which_children(ConnSup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d789c15ec2..16cf40eeb7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -767,15 +767,18 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
+ State = #v1{queue_collector = Collector, parent = Parent}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
+ ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent),
+ {ok, ChanSup} = rabbit_channel_sup_sup:start_channel(
+ ChanSupSup,
+ [Sock, Channel, FrameMax, self(),
+ Username, VHost, Collector]),
+ ChPid = rabbit_channel_sup:framing_channel(ChanSup),
+ link(ChPid),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index bf6e9bdfbd..ff5ef9d127 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -48,8 +48,10 @@
-ifdef(use_specs).
--spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
+-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) ->
+ {'ok', pid()}).
+-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) ->
+ {'ok', pid()}).
-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
@@ -68,14 +70,16 @@
%%----------------------------------------------------------------------------
start(Sock, Channel, FrameMax) ->
- spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}])}.
start_link(Sock, Channel, FrameMax) ->
- spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}])}.
mainloop(State) ->
receive
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 5f1ec54c80..136a6533fd 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -35,7 +35,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/4,
check_childspecs/1, stop/1]).
-export([behaviour_info/1]).
@@ -109,6 +109,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name, Type, Modules) ->
+ [Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor),
+ Name1 =:= Name, Type1 =:= Type, Modules1 =:= Modules].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).