diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-06 18:31:56 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-06 18:31:56 +0100 |
| commit | 54c3910fb015e5165b0417eaeb48898823fef41a (patch) | |
| tree | a3debc50e689579f9e81afbc512957ad2faae32e | |
| parent | 10ae11e5ebd14c39c38c5f2217d1f93e794373d2 (diff) | |
| download | rabbitmq-server-git-54c3910fb015e5165b0417eaeb48898823fef41a.tar.gz | |
And now the channel, writer, limiter and framing_channel are also all suitably supervisored
| -rw-r--r-- | src/gen_server2.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_channel_sup_sup.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 20 | ||||
| -rw-r--r-- | src/supervisor2.erl | 6 |
11 files changed, 213 insertions, 78 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 547f0a42e2..32a8d0a147 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -164,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). -export([behaviour_info/1]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 692d7bacf1..b91391eb15 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -44,7 +44,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, @@ -73,8 +73,8 @@ -type(ref() :: any()). --spec(start_link/6 :: - (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -94,11 +94,18 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - {ok, Pid} = gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []), - Pid. +start_link(Channel, ReaderPid, Username, VHost, CollectorPid) -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> + WriterPid = rabbit_channel_sup:writer(Parent), + State = init([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]), + gen_server2:enter_loop( + ?MODULE, [], State, self(), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}) + end)}. do(Pid, Method) -> do(Pid, Method, none). @@ -146,30 +153,30 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, + CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - {ok, #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}}, - hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + #ch{state = starting, + channel = Channel, + parent_pid = ParentPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -208,7 +215,8 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{parent_pid = ParentPid}) -> + supervisor2:stop(ParentPid), {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> @@ -1021,8 +1029,13 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> + Me = self(), + {ok, LPid} = + supervisor2:start_child( + ParentPid, + {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}), ok = limit_queues(LPid, State), LPid. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 0000000000..f8a7a7c6f4 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/7, writer/1, framing_channel/1, channel/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> + supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, + Username, VHost, Collector]). + +init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> + {ok, {{one_for_all, 0, 1}, + [{channel, {rabbit_channel, start_link, + [Channel, ReaderPid, Username, VHost, Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}, + {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}, + {framing_channel, {rabbit_framing_channel, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} + ]}}. + +writer(Pid) -> + hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). + +channel(Pid) -> + hd(supervisor2:find_child(Pid, channel, worker, [rabbit_channel])). + +framing_channel(Pid) -> + hd(supervisor2:find_child(Pid, framing_channel, worker, + [rabbit_framing_channel])). + diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl new file mode 100644 index 0000000000..42064709ca --- /dev/null +++ b/src/rabbit_channel_sup_sup.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_channel/2]). + +-export([init/1]). + +start_link() -> + supervisor2:start_link(?MODULE, []). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_channel_sup]}]}}. + +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index f6c4de2a12..cafe9612b3 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/0, stop/1]). +-export([start_link/0, stop/1, reader/1, channel_sup_sup/1]). -export([init/1]). @@ -50,5 +50,15 @@ init([]) -> [{reader, {rabbit_reader, start_link, []}, transient, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_reader_queue_collector, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]} + transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]}, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_channel_sup_sup]} ]}}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader, worker, [rabbit_reader])). + +channel_sup_sup(Pid) -> + hd(supervisor2:find_child(Pid, channel_sup_sup, supervisor, + [rabbit_channel_sup_sup])). + diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index bc1a2a0835..d07d871be9 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,21 +32,17 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/0, process/2, shutdown/1]). %% internal -export([mainloop/1]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> - spawn_link( - fun () -> - %% we trap exits so that a normal termination of the - %% channel or reader process terminates us too. - process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) - end). +start_link() -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -60,12 +56,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 878af02976..491ae7d6eb 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> {'ok', pid()}). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -74,8 +74,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), - Pid. + gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). shutdown(undefined) -> ok; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 6502c6d177..e70946404a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -201,12 +201,10 @@ on_node_down(Node) -> start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - hd([begin - ok = rabbit_net:controlling_process(Sock, Reader), - Reader ! {go, Sock, SockTransform}, - Reader - end || {reader, Reader, worker, [rabbit_reader]} - <- supervisor:which_children(Child)]). + Reader = rabbit_connection_sup:reader(Child), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -229,10 +227,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup), - {reader, Pid, worker, [rabbit_reader]} - <- supervisor:which_children(ConnSup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d789c15ec2..16cf40eeb7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -767,15 +767,18 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> + State = #v1{queue_collector = Collector, parent = Parent}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent), + {ok, ChanSup} = rabbit_channel_sup_sup:start_channel( + ChanSupSup, + [Sock, Channel, FrameMax, self(), + Username, VHost, Collector]), + ChPid = rabbit_channel_sup:framing_channel(ChanSup), + link(ChPid), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6e9bdfbd..ff5ef9d127 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -48,8 +48,10 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> + {'ok', pid()}). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> + {'ok', pid()}). -spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). @@ -68,14 +70,16 @@ %%---------------------------------------------------------------------------- start(Sock, Channel, FrameMax) -> - spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). + {ok, + proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}])}. start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}])}. mainloop(State) -> receive diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 5f1ec54c80..136a6533fd 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -35,7 +35,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/4, check_childspecs/1, stop/1]). -export([behaviour_info/1]). @@ -109,6 +109,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name, Type, Modules) -> + [Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor), + Name1 =:= Name, Type1 =:= Type, Modules1 =:= Modules]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). |
