summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_connection_sup.erl7
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_reader.erl75
3 files changed, 74 insertions, 31 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 12a532b6fc..d9a4735cf0 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,16 +42,11 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector,
+ [SupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 31eeef730c..ee430fb407 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -18,7 +18,8 @@
-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info_keys/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
@@ -40,6 +41,8 @@
-define(FIRST_TEST_BIND_PORT, 10000).
+-define(CONNECTION_TABLE, rabbit_connection).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -65,6 +68,8 @@
-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(register_connection/1 :: (pid()) -> ok).
+-spec(unregister_connection/1 :: (pid()) -> ok).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
-spec(connections_local/0 :: () -> [rabbit_types:connection()]).
-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -117,6 +122,7 @@
%%----------------------------------------------------------------------------
boot() ->
+ ets:new(?CONNECTION_TABLE, [public, named_table]),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
@@ -294,20 +300,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
+register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok.
+
+unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok.
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() ->
- [Reader ||
- {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- Reader <- [try
- rabbit_connection_sup:reader(ConnSup)
- catch exit:{noproc, _} ->
- noproc
- end],
- Reader =/= noproc].
+connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7a28c8a33a..13459350cf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,7 +37,8 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+ conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -109,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -203,7 +204,7 @@ name(Sock) ->
socket_ends(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
@@ -234,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ conn_sup_pid = ConnSupPid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
@@ -244,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -263,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -689,8 +698,17 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+%% TLS uses a different protocol number, and would go here.
+handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) ->
+ become_1_0(amqp, {0, 1, 0, 0}, State);
+
+%% 3 stands for "SASL"
+handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) ->
+ become_1_0(sasl, {3, 1, 0, 0}, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -704,6 +722,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -799,17 +818,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
+ conn_sup_pid = ConnSupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ConnSupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- throttle = Throttle#throttle{
- conserve_resources = Conserve}}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -979,3 +1005,24 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+
+become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, {bad_version, Version});
+ _ -> throw({become, {rabbit_amqp1_0_reader, become,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ conn_sup_pid = ConnSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ Buf, BufLen}.