diff options
| author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-19 21:27:03 +0000 |
|---|---|---|
| committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-19 21:27:03 +0000 |
| commit | bd66179e0db81bb0703e521400470df89f6647d9 (patch) | |
| tree | b775bd82609a2f2774c85d138d08d76e752803e9 /src | |
| parent | fbd631733f2e25c8842292e48d956f09ca8040e1 (diff) | |
| parent | 23d7a9e6877fdd22337fd3ddbd333ac607788a9d (diff) | |
| download | rabbitmq-server-git-bd66179e0db81bb0703e521400470df89f6647d9.tar.gz | |
merging in from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_channel_sup_sup.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl) | 7 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 4 |
7 files changed, 90 insertions, 19 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a6372c..9de6e7941d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -145,11 +145,19 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}, + {enables, direct_listening}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}, {enables, networking_listening}]}). +-rabbit_boot_step({direct_listening, + [{description, "direct connection listeners available"}]}). + -rabbit_boot_step({networking_listening, [{description, "network listeners available"}]}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d426d55df5..3b00181bc6 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/1]). +-export([start_link/2]). -export([init/1]). @@ -35,14 +35,15 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_types:user(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). +-spec(start_link/2 :: (atom(), start_link_args()) -> + {'ok', pid(), {pid(), any()}}). -endif. %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, - Collector}) -> +start_link(tcp, {Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( @@ -58,7 +59,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link(direct, {Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index e2561c802e..9240ee3713 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_channel/2]). +-export([start_link/0, start_channel/3]). -export([init/1]). @@ -27,8 +27,9 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), {pid(), any()}}). +-spec(start_channel/3 :: (pid(), atom(), + rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), {pid(), any()}}). -endif. @@ -37,8 +38,8 @@ start_link() -> supervisor2:start_link(?MODULE, []). -start_channel(Pid, Args) -> - supervisor2:start_child(Pid, [Args]). +start_channel(Pid, Type, Args) -> + supervisor2:start_child(Pid, [Type, Args]). %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 1c2bbb6548..336a8cd78a 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -29,6 +29,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 0000000000..b1c5f41597 --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_direct). + +-export([boot/0, start_channel/1]). + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, [direct]}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +start_channel(Args) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child(rabbit_direct_client_sup, [Args]), + {ok, ChannelPid}. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9788c922bf..0a7d9dd7eb 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -108,13 +108,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. getaddr(Host) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 696dc26588..c2245a13d5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -940,8 +940,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, tcp, {Protocol, Sock, Channel, FrameMax, + self(), User, VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), |
