summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-03 13:11:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-03 13:11:27 +0100
commitd6e40e19762e63eab49f31113b81ba3808fad24d (patch)
treee3ed1bdab9147e5609cfe18353e8ee35ba470310 /src
parent215d6eb4451f3f26efb33612e1cb92af82be57ac (diff)
downloadrabbitmq-server-git-d6e40e19762e63eab49f31113b81ba3808fad24d.tar.gz
Avoid the unnecessary callbacks in the various new _sups as much as possible by breaking the declarative child specs but trying hard to keep them declarative as much as possible.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl66
-rw-r--r--src/rabbit_channel_sup.erl53
-rw-r--r--src/rabbit_connection_sup.erl25
-rw-r--r--src/rabbit_framing_channel.erl5
-rw-r--r--src/rabbit_reader.erl86
-rw-r--r--src/rabbit_tests.erl9
6 files changed, 117 insertions, 127 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ee77198673..3478908f9b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -82,11 +82,11 @@
-type(ref() :: any()).
-type(channel_number() :: non_neg_integer()).
--type(pid_fun() :: fun (() -> pid())).
-spec(start_link/6 ::
- (channel_number(), pid_fun(), pid_fun(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
+ (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ rabbit_types:vhost(), pid()) ->
+ 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -110,13 +110,9 @@
%%----------------------------------------------------------------------------
-start_link(Channel, GetReader, GetWriter, Username, VHost, CollectorPid) ->
- Parent = self(),
- {ok, proc_lib:spawn_link(
- fun () ->
- init_and_go([Channel, Parent, GetReader(), GetWriter(),
- Username, VHost, CollectorPid])
- end)}.
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
+ gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid,
+ Username, VHost, CollectorPid], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -175,35 +171,31 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- #ch{state = starting,
- channel = Channel,
- parent_pid = ParentPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none},
- stats_timer = rabbit_event:init_stats_timer()}.
-
-init_and_go(InitArgs) ->
- State = init(InitArgs),
+ State = #ch{state = starting,
+ channel = Channel,
+ parent_pid = ParentPid,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none},
+ stats_timer = rabbit_event:init_stats_timer()},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
- gen_server2:enter_loop(?MODULE, [], State, self(), hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}).
+ {ok, State, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -1112,7 +1104,7 @@ fold_per_queue(F, Acc0, UAQ) ->
start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
Me = self(),
{ok, LPid} =
- supervisor2:start_child(
+ supervisor:start_child(
ParentPid,
{limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 1d02d992d2..b565f23689 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor2).
--export([start_link/8, writer/1, framing_channel/1, channel/1]).
+-export([start_link/8]).
-export([init/1]).
@@ -47,7 +47,7 @@
(rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_access_control:username(), rabbit_types:vhost(), pid()) ->
- ignore | rabbit_types:ok_or_error2(pid(), any())).
+ rabbit_types:ok({pid(), pid()})).
-endif.
@@ -55,32 +55,29 @@
start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
Collector) ->
- supervisor2:start_link(?MODULE, [Protocol, Sock, Channel, FrameMax,
- ReaderPid, Username, VHost, Collector]).
-
-writer(Pid) ->
- hd(supervisor2:find_child(Pid, writer)).
-
-channel(Pid) ->
- hd(supervisor2:find_child(Pid, channel)).
-
-framing_channel(Pid) ->
- hd(supervisor2:find_child(Pid, framing_channel)).
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol]},
+ permanent, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector]},
+ permanent, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ChannelPid, Protocol]},
+ permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, {SupPid, FramingChannelPid}}.
%%----------------------------------------------------------------------------
-init([Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
- Collector]) ->
- Me = self(),
- {ok, {{one_for_all, 0, 1},
- [{framing_channel, {rabbit_framing_channel, start_link,
- [fun () -> channel(Me) end, Protocol]},
- permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]},
- {writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol]},
- permanent, ?MAX_WAIT, worker, [rabbit_writer]},
- {channel, {rabbit_channel, start_link,
- [Channel, fun () -> ReaderPid end,
- fun () -> writer(Me) end, Username, VHost, Collector]},
- permanent, ?MAX_WAIT, worker, [rabbit_channel]}
- ]}}.
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 4ad9d3f04e..8d09961f89 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -33,28 +33,31 @@
-behaviour(supervisor2).
--export([start_link/0, reader/1, channel_sup_sup/1]).
+-export([start_link/0, reader/1]).
-export([init/1]).
-include("rabbit.hrl").
start_link() ->
- supervisor2:start_link(?MODULE, []).
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ permanent, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, _ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link, [ChannelSupSupPid]},
+ permanent, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid}.
init([]) ->
{ok, {{one_for_all, 0, 1},
- [{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- permanent, infinity, supervisor, [rabbit_channel_sup_sup]},
- {reader, {rabbit_reader, start_link, []},
- permanent, ?MAX_WAIT, worker, [rabbit_reader]},
- {collector, {rabbit_queue_collector, start_link, []},
+ [{collector, {rabbit_queue_collector, start_link, []},
permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]}
]}}.
reader(Pid) ->
hd(supervisor2:find_child(Pid, reader)).
-
-channel_sup_sup(Pid) ->
- hd(supervisor2:find_child(Pid, channel_sup_sup)).
-
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 6ece343655..6ee5a5550b 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -39,9 +39,8 @@
%%--------------------------------------------------------------------
-start_link(GetChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(GetChannelPid(), Protocol) end)}.
+start_link(ChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 24bde74d99..4b89db4783 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/1, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/2]).
+-export([init/2, mainloop/2]).
-export([server_properties/0]).
@@ -60,7 +60,7 @@
%---------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_ref, connection_state,
- queue_collector, stats_timer}).
+ queue_collector, stats_timer, channel_sup_sup_pid}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -144,6 +144,7 @@
-ifdef(use_specs).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -152,9 +153,9 @@
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/1 :: (pid()) -> no_return()).
--spec(start_connection/4 ::
- (pid(), any(), rabbit_networking:socket(),
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_networking:socket(),
fun ((rabbit_networking:socket()) ->
rabbit_types:ok_or_error2(
rabbit_networking:socket(), any()))) -> no_return()).
@@ -163,17 +164,17 @@
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform)
end.
system_continue(_Parent, Deb, State) ->
@@ -248,7 +249,7 @@ socket_op(Sock, Fun) ->
exit(shutdown)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -261,21 +262,23 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
[Collector] = supervisor2:find_child(Parent, collector),
try
mainloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- protocol = none,
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none},
- callback = uninitialized_callback,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- stats_timer =
- rabbit_event:init_stats_timer()},
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid
+ },
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -797,22 +800,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State =
- #v1{queue_collector = Collector, parent = Parent}) ->
- #v1{sock = Sock, connection = #connection{
- protocol = Protocol,
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent),
- {ok, ChanSup} = rabbit_channel_sup_sup:start_channel(
- ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(),
- Username, VHost, Collector]),
- ChPid = rabbit_channel_sup:framing_channel(ChanSup),
- link(ChPid),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, {_ChanSup, FrChPid}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, [Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector]),
+ link(FrChPid),
+ put({channel, Channel}, {chpid, FrChPid}),
+ put({chpid, FrChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 79c8a31c43..59cfc06404 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -938,13 +938,10 @@ test_user_management() ->
passed.
-make_fun(Result) ->
- fun () -> Result end.
-
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, make_fun(self()), make_fun(Writer),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
<<"user">>, <<"/">>, self()),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1083,7 +1080,7 @@ test_memory_pressure_spawn() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
<<"guest">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -1157,7 +1154,7 @@ test_memory_pressure() ->
alarm_handler:set_alarm({vm_memory_high_watermark, []}),
Me = self(),
Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch4} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer4),
+ {ok, Ch4} = rabbit_channel:start_link(1, Me, Writer4,
<<"user">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch4, #'channel.open'{}),
ok = test_memory_pressure_flush(Writer4),