diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 13:11:27 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 13:11:27 +0100 |
| commit | d6e40e19762e63eab49f31113b81ba3808fad24d (patch) | |
| tree | e3ed1bdab9147e5609cfe18353e8ee35ba470310 /src | |
| parent | 215d6eb4451f3f26efb33612e1cb92af82be57ac (diff) | |
| download | rabbitmq-server-git-d6e40e19762e63eab49f31113b81ba3808fad24d.tar.gz | |
Avoid the unnecessary callbacks in the various new _sups as much as possible by breaking the declarative child specs but trying hard to keep them declarative as much as possible.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 9 |
6 files changed, 117 insertions, 127 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ee77198673..3478908f9b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -82,11 +82,11 @@ -type(ref() :: any()). -type(channel_number() :: non_neg_integer()). --type(pid_fun() :: fun (() -> pid())). -spec(start_link/6 :: - (channel_number(), pid_fun(), pid_fun(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). + (channel_number(), pid(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -110,13 +110,9 @@ %%---------------------------------------------------------------------------- -start_link(Channel, GetReader, GetWriter, Username, VHost, CollectorPid) -> - Parent = self(), - {ok, proc_lib:spawn_link( - fun () -> - init_and_go([Channel, Parent, GetReader(), GetWriter(), - Username, VHost, CollectorPid]) - end)}. +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> + gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid, + Username, VHost, CollectorPid], []). do(Pid, Method) -> do(Pid, Method, none). @@ -175,35 +171,31 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - #ch{state = starting, - channel = Channel, - parent_pid = ParentPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}, - stats_timer = rabbit_event:init_stats_timer()}. - -init_and_go(InitArgs) -> - State = init(InitArgs), + State = #ch{state = starting, + channel = Channel, + parent_pid = ParentPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}, + stats_timer = rabbit_event:init_stats_timer()}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), - gen_server2:enter_loop(?MODULE, [], State, self(), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}). + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -1112,7 +1104,7 @@ fold_per_queue(F, Acc0, UAQ) -> start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> Me = self(), {ok, LPid} = - supervisor2:start_child( + supervisor:start_child( ParentPid, {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}), diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 1d02d992d2..b565f23689 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/8, writer/1, framing_channel/1, channel/1]). +-export([start_link/8]). -export([init/1]). @@ -47,7 +47,7 @@ (rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - ignore | rabbit_types:ok_or_error2(pid(), any())). + rabbit_types:ok({pid(), pid()})). -endif. @@ -55,32 +55,29 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> - supervisor2:start_link(?MODULE, [Protocol, Sock, Channel, FrameMax, - ReaderPid, Username, VHost, Collector]). - -writer(Pid) -> - hd(supervisor2:find_child(Pid, writer)). - -channel(Pid) -> - hd(supervisor2:find_child(Pid, channel)). - -framing_channel(Pid) -> - hd(supervisor2:find_child(Pid, framing_channel)). + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ChannelPid, Protocol]}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, {SupPid, FramingChannelPid}}. %%---------------------------------------------------------------------------- -init([Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, - Collector]) -> - Me = self(), - {ok, {{one_for_all, 0, 1}, - [{framing_channel, {rabbit_framing_channel, start_link, - [fun () -> channel(Me) end, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}, - {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_writer]}, - {channel, {rabbit_channel, start_link, - [Channel, fun () -> ReaderPid end, - fun () -> writer(Me) end, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]} - ]}}. +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 4ad9d3f04e..8d09961f89 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -33,28 +33,31 @@ -behaviour(supervisor2). --export([start_link/0, reader/1, channel_sup_sup/1]). +-export([start_link/0, reader/1]). -export([init/1]). -include("rabbit.hrl"). start_link() -> - supervisor2:start_link(?MODULE, []). + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + permanent, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, _ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, [ChannelSupSupPid]}, + permanent, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid}. init([]) -> {ok, {{one_for_all, 0, 1}, - [{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - permanent, infinity, supervisor, [rabbit_channel_sup_sup]}, - {reader, {rabbit_reader, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_reader]}, - {collector, {rabbit_queue_collector, start_link, []}, + [{collector, {rabbit_queue_collector, start_link, []}, permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]} ]}}. reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). - -channel_sup_sup(Pid) -> - hd(supervisor2:find_child(Pid, channel_sup_sup)). - diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 6ece343655..6ee5a5550b 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -39,9 +39,8 @@ %%-------------------------------------------------------------------- -start_link(GetChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(GetChannelPid(), Protocol) end)}. +start_link(ChannelPid, Protocol) -> + {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 24bde74d99..4b89db4783 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/1, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/2, mainloop/2]). -export([server_properties/0]). @@ -60,7 +60,7 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_ref, connection_state, - queue_collector, stats_timer}). + queue_collector, stats_timer, channel_sup_sup_pid}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -144,6 +144,7 @@ -ifdef(use_specs). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -152,9 +153,9 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/1 :: (pid()) -> no_return()). --spec(start_connection/4 :: - (pid(), any(), rabbit_networking:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( rabbit_networking:socket(), any()))) -> no_return()). @@ -163,17 +164,17 @@ %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) end. system_continue(_Parent, Deb, State) -> @@ -248,7 +249,7 @@ socket_op(Sock, Fun) -> exit(shutdown) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -261,21 +262,23 @@ start_connection(Parent, Deb, Sock, SockTransform) -> [Collector] = supervisor2:find_child(Parent, collector), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - protocol = none, - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none}, - callback = uninitialized_callback, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - stats_timer = - rabbit_event:init_stats_timer()}, + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid + }, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -797,22 +800,21 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State = - #v1{queue_collector = Collector, parent = Parent}) -> - #v1{sock = Sock, connection = #connection{ - protocol = Protocol, - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent), - {ok, ChanSup} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), - Username, VHost, Collector]), - ChPid = rabbit_channel_sup:framing_channel(ChanSup), - link(ChPid), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, {_ChanSup, FrChPid}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, [Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector]), + link(FrChPid), + put({channel, Channel}, {chpid, FrChPid}), + put({chpid, FrChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 79c8a31c43..59cfc06404 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -938,13 +938,10 @@ test_user_management() -> passed. -make_fun(Result) -> - fun () -> Result end. - test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, make_fun(self()), make_fun(Writer), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- @@ -1083,7 +1080,7 @@ test_memory_pressure_spawn() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, <<"guest">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok @@ -1157,7 +1154,7 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - {ok, Ch4} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer4), + {ok, Ch4} = rabbit_channel:start_link(1, Me, Writer4, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), ok = test_memory_pressure_flush(Writer4), |
