diff options
| author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-02-07 13:32:11 +0000 |
|---|---|---|
| committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-02-07 13:32:11 +0000 |
| commit | d018108294d6519a501a973c45d34cc4c9a4623a (patch) | |
| tree | a53db5650c76ced96fe6bdc3a9a11b3346323907 | |
| parent | 8614e2ba638d2c3b8fa6c2e3d647458c40b12190 (diff) | |
| parent | bdaad0a3695966511c0f8e282705945ca7427647 (diff) | |
| download | rabbitmq-server-git-d018108294d6519a501a973c45d34cc4c9a4623a.tar.gz | |
merging in from default
| -rw-r--r-- | Makefile | 9 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/changelog | 12 | ||||
| -rw-r--r-- | src/delegate.erl | 27 | ||||
| -rw-r--r-- | src/delegate_sup.erl | 2 | ||||
| -rw-r--r-- | src/rabbit.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl) | 22 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 17 |
15 files changed, 228 insertions, 64 deletions
@@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt ifndef USE_SPECS # our type specs rely on features and bug fixes in dialyzer that are -# only available in R14A upwards (R13B04 is erts 5.7.5) -# -# NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi) +# only available in R14A upwards (R14A is erts 5.8) +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().') endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs") +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -314,3 +312,4 @@ endif ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) endif + diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d3e9d84b4b..cc7221d63d 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -8,7 +8,8 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup]}, + rabbit_tcp_client_sup, + rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b37f7ab1fa..473168644a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -124,6 +124,12 @@ done rm -rf %{buildroot} %changelog +* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 +- New Upstream Release + +* Tue Feb 1 2011 simon@rabbitmq.com 2.3.0-1 +- New Upstream Release + * Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index a60e691d15..12165dc0ac 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.3.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Thu, 03 Feb 2011 12:43:56 +0000 + +rabbitmq-server (2.3.0-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 01 Feb 2011 12:52:16 +0000 + rabbitmq-server (2.2.0-1) lucid; urgency=low * New Upstream Release diff --git a/src/delegate.erl b/src/delegate.erl index ff55a15b40..46bd8245b3 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,7 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). -endif. @@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) -> {Replies, BadNodes} = case orddict:fetch_keys(Grouped) of [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), - {invoke, Fun, Grouped}, - infinity) + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, @@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die @@ -111,17 +111,22 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count() -> - {ok, Count} = application:get_env(rabbit, delegate_count), +delegate_count([RemoteNode | _]) -> + {ok, Count} = case application:get_env(rabbit, delegate_count) of + undefined -> rpc:call(RemoteNode, application, get_env, + [rabbit, delegate_count]); + Result -> Result + end, Count. delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate() -> +delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(self(), delegate_count())), + undefined -> Name = + delegate_name(erlang:phash2( + self(), delegate_count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 5274722145..e0ffa7c8c1 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -40,7 +40,7 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - DCount = delegate:delegate_count(), + DCount = delegate:delegate_count([node()]), {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a6372c..c6661d390a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -145,13 +145,13 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {requires, log_relay}, - {enables, networking_listening}]}). - --rabbit_boot_step({networking_listening, - [{description, "network listeners available"}]}). + {requires, log_relay}]}). %%--------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3418c663f4..7c7e28fe9e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -617,6 +617,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. +backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> {Guids, BQS1} = Fun(BQS), run_message_queue( @@ -996,10 +1000,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast(sync_timeout, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), - sync_timer_ref = undefined}); +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -1133,9 +1135,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue = BQ}) -> - noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cc3cfbf4f4..a82e5eff3e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, _Reason}, +handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to @@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}, State), erase_queue_stats(QPid), - noreply( - queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). + State1 = case Reason of + normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); + _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) + end, + noreply(queue_blocked(QPid, State1)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -505,6 +508,8 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + %% these confirms will be emitted even when a queue dies, but that + %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; @@ -735,7 +740,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ, + limiter_pid = LimiterPid}) -> OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -749,6 +755,7 @@ handle_method(#'basic.recover_async'{requeue = true}, QPid, lists:reverse(MsgIds), self()) end) end, ok, UAMQ), + ok = notify_limiter(LimiterPid, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1253,6 +1260,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_nacks([], State) -> + State; +send_nacks(MXs, State) -> + MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], + coalesce_and_send(MsgSeqNos, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), @@ -1262,28 +1279,32 @@ send_confirms(State = #ch{confirmed = C}) -> send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - send_confirm(MsgSeqNo, WriterPid), + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = MsgSeqNo}), State; -send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> - SCs = lists:usort(Cs), +send_confirms(Cs, State) -> + coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> + #'basic.ack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + +coalesce_and_send(MsgSeqNos, MkMsgFun, + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case gb_trees:is_empty(UC) of - true -> lists:last(SCs) + 1; + true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, - {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; _ -> ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), - multiple = true}) + WriterPid, MkMsgFun(lists:last(Ms), true)) end, - [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + [ok = rabbit_writer:send_command( + WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -send_confirm(SeqNo, WriterPid) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = SeqNo}). - terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d426d55df5..d21cfdb7fb 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,9 +31,11 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {rabbit_types:protocol(), rabbit_net:socket(), + {'tcp', rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()} | + {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), + rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -41,7 +43,7 @@ %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 1c2bbb6548..dbdc6cd429 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -22,6 +22,21 @@ -export([init/1]). +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (mfa()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, mfa()) -> + rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Callback) -> supervisor2:start_link(?MODULE, Callback). @@ -29,6 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 0000000000..3b8c9fba39 --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_direct). + +-export([boot/0, connect/3, start_channel/5]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(boot/0 :: () -> 'ok'). +-spec(connect/3 :: (binary(), binary(), binary()) -> + {'ok', {rabbit_types:user(), + rabbit_framing:amqp_table()}}). +-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid()) -> + {'ok', pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +%%---------------------------------------------------------------------------- + +connect(Username, Password, VHost) -> + case lists:keymember(rabbit, 1, application:which_applications()) of + true -> + try rabbit_access_control:user_pass_login(Username, Password) of + #user{} = User -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> {ok, {User, rabbit_reader:server_properties()}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end + catch + exit:#amqp_error{name = access_refused} -> {error, auth_failure} + end; + false -> + {error, broker_not_found_on_node} + end. + +start_channel(Number, ClientChannelPid, User, VHost, Collector) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + {ok, ChannelPid}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3a4fb024fe..7d9167973d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -240,11 +240,20 @@ assert_args_equivalence1(Orig, New, Name, Key) -> {Same, Same} -> ok; {Orig1, New1} -> protocol_error( precondition_failed, - "inequivalent arg '~s' for ~s: " - "required ~w, received ~w", - [Key, rabbit_misc:rs(Name), New1, Orig1]) + "inequivalent arg '~s' for ~s: " + "received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) end. +val(undefined) -> + "none"; +val({Type, Value}) -> + Fmt = case is_binary(Value) of + true -> "the value '~s' of type '~s'"; + false -> "the value '~w' of type '~s'" + end, + lists:flatten(io_lib:format(Fmt, [Value, Type])). + dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 031d4f181d..283d25c716 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -115,13 +115,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. %% inet_parse:address takes care of ip string, like "0.0.0.0" diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 52e0bb9dd5..4bb87f19d5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -72,7 +72,13 @@ %% pre-init: %% receive protocol header -> send connection.start, *starting* %% starting: -%% receive connection.start_ok -> send connection.tune, *tuning* +%% receive connection.start_ok -> *securing* +%% securing: +%% check authentication credentials +%% if authentication success -> send connection.tune, *tuning* +%% if more challenge needed -> send connection.secure, +%% receive connection.secure_ok *securing* +%% otherwise send close, *exit* %% tuning: %% receive connection.tune_ok -> start heartbeats, *opening* %% opening: @@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -940,8 +949,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), |
