summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-18 13:59:14 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-18 13:59:14 +0000
commit90fa7f36263fe8d86b037628d65a87915f1a3c91 (patch)
tree17b615c8610667f102fc8b41bbbbb42d2e905535 /src
parentd8ebb900ad0477e737e5d20c4337e1837508348a (diff)
downloadrabbitmq-server-git-90fa7f36263fe8d86b037628d65a87915f1a3c91.tar.gz
various reader related changes for AMQP 1.0
- mechanism for the reader to 'become' a different reader. - become the 1.0 reader if an AMQP 1.0 header is presented by a client and the reader is present. That way we can support 1.0 on the same port as 0-{8,9,9-1}. - defer starting of the channel_sup_sup and do that in the reader. This allows the AMQP 1.0 reader to start its own versio of the sup. It also makes aborted connections less costly. - track connections in an ets table rather than implicitly via the supervisor. That way AMQP 1.0 connections can exclude themselves, since they are already tracked via their direct connections.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_connection_sup.erl7
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_reader.erl75
3 files changed, 74 insertions, 31 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 12a532b6fc..d9a4735cf0 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,16 +42,11 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector,
+ [SupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 31eeef730c..ee430fb407 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -18,7 +18,8 @@
-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info_keys/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
@@ -40,6 +41,8 @@
-define(FIRST_TEST_BIND_PORT, 10000).
+-define(CONNECTION_TABLE, rabbit_connection).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -65,6 +68,8 @@
-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(register_connection/1 :: (pid()) -> ok).
+-spec(unregister_connection/1 :: (pid()) -> ok).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
-spec(connections_local/0 :: () -> [rabbit_types:connection()]).
-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -117,6 +122,7 @@
%%----------------------------------------------------------------------------
boot() ->
+ ets:new(?CONNECTION_TABLE, [public, named_table]),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
@@ -294,20 +300,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
+register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok.
+
+unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok.
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() ->
- [Reader ||
- {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- Reader <- [try
- rabbit_connection_sup:reader(ConnSup)
- catch exit:{noproc, _} ->
- noproc
- end],
- Reader =/= noproc].
+connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7a28c8a33a..13459350cf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,7 +37,8 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+ conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -109,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -203,7 +204,7 @@ name(Sock) ->
socket_ends(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
@@ -234,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ conn_sup_pid = ConnSupPid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
@@ -244,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -263,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -689,8 +698,17 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+%% TLS uses a different protocol number, and would go here.
+handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) ->
+ become_1_0(amqp, {0, 1, 0, 0}, State);
+
+%% 3 stands for "SASL"
+handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) ->
+ become_1_0(sasl, {3, 1, 0, 0}, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -704,6 +722,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -799,17 +818,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
+ conn_sup_pid = ConnSupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ConnSupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- throttle = Throttle#throttle{
- conserve_resources = Conserve}}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -979,3 +1005,24 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+
+become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, {bad_version, Version});
+ _ -> throw({become, {rabbit_amqp1_0_reader, become,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ conn_sup_pid = ConnSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ Buf, BufLen}.