diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_connection_sup.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 75 |
3 files changed, 74 insertions, 31 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 12a532b6fc..d9a4735cf0 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,16 +42,11 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), - {ok, ChannelSupSupPid} = - supervisor2:start_child( - SupPid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, + [SupPid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 31eeef730c..ee430fb407 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -18,7 +18,8 @@ -export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, stop_tcp_listener/1, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info_keys/0, + node_listeners/1, register_connection/1, unregister_connection/1, + connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, close_connection/2, force_connection_event_refresh/0, tcp_host/1]). @@ -40,6 +41,8 @@ -define(FIRST_TEST_BIND_PORT, 10000). +-define(CONNECTION_TABLE, rabbit_connection). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -65,6 +68,8 @@ -spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). +-spec(register_connection/1 :: (pid()) -> ok). +-spec(unregister_connection/1 :: (pid()) -> ok). -spec(connections/0 :: () -> [rabbit_types:connection()]). -spec(connections_local/0 :: () -> [rabbit_types:connection()]). -spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). @@ -117,6 +122,7 @@ %%---------------------------------------------------------------------------- boot() -> + ets:new(?CONNECTION_TABLE, [public, named_table]), ok = start(), ok = boot_tcp(), ok = boot_ssl(). @@ -294,20 +300,15 @@ start_client(Sock) -> start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). +register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok. + +unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok. + connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). -connections_local() -> - [Reader || - {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup), - Reader <- [try - rabbit_connection_sup:reader(ConnSup) - catch exit:{noproc, _} -> - noproc - end], - Reader =/= noproc]. +connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7a28c8a33a..13459350cf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2]). +-export([init/4, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -37,7 +37,8 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). + conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -109,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -203,7 +204,7 @@ name(Sock) -> socket_ends(Sock) -> socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). -start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = name(Sock), @@ -234,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - channel_sup_sup_pid = ChannelSupSupPid, + conn_sup_pid = ConnSupPid, + channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, @@ -244,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_at = never}}, try ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), - recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)), + run({?MODULE, recvloop, + [Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -263,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, %% accounting as accurate as possible we ought to close the %% socket w/o delay before termination. rabbit_net:fast_close(ClientSock), + rabbit_networking:unregister_connection(self()), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. +run({M, F, A}) -> + try apply(M, F, A) + catch {become, MFA} -> run(MFA) + end. + recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{connection_state = blocked}) -> @@ -689,8 +698,17 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% ... and finally, the 1.0 spec is crystal clear! Note that the +%% TLS uses a different protocol number, and would go here. +handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) -> + become_1_0(amqp, {0, 1, 0, 0}, State); + +%% 3 stands for "SASL" +handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) -> + become_1_0(sasl, {3, 1, 0, 0}, State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, A, B, C, D}); + refuse_connection(Sock, {bad_version, {A, B, C, D}}); handle_input(handshake, Other, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); @@ -704,6 +722,7 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> + rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, @@ -799,17 +818,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, + conn_sup_pid = ConnSupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + {ok, ChannelSupSupPid} = + supervisor2:start_child( + ConnSupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - throttle = Throttle#throttle{ - conserve_resources = Conserve}}), + State#v1{connection_state = running, + connection = NewConnection, + channel_sup_sup_pid = ChannelSupSupPid, + throttle = Throttle1}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -979,3 +1005,24 @@ cert_info(F, #v1{sock = Sock}) -> emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +%% 1.0 stub + +become_1_0(Mode, Version, State = #v1{sock = Sock}) -> + case code:is_loaded(rabbit_amqp1_0_reader) of + false -> refuse_connection(Sock, {bad_version, Version}); + _ -> throw({become, {rabbit_amqp1_0_reader, become, + [Mode, pack_for_1_0(State)]}}) + end. + +pack_for_1_0(#v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + queue_collector = QueueCollector, + conn_sup_pid = ConnSupPid, + start_heartbeat_fun = SHF, + buf = Buf, + buf_len = BufLen}) -> + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + Buf, BufLen}. |
