summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-02 14:04:15 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-26 13:30:46 +0200
commit7d470b84f004cbcddefa960df0c65bbcb95d9a83 (patch)
treeca9b1479eab062e312d68f42edd7c833f9491d6c /src
parent5f2c1152890f44e60af92b8ca97b922f0d0e9bed (diff)
downloadrabbitmq-server-git-7d470b84f004cbcddefa960df0c65bbcb95d9a83.tar.gz
Move rabbit_networking and rabbit_reader from rabbitmq-common
They are not used by rabbitmq-common or rabbitmq-erlang-client anymore. This solves a dependency of rabbitmq-common on rabbitmq-server. [#118490793]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_networking.erl440
-rw-r--r--src/rabbit_reader.erl1574
2 files changed, 2014 insertions, 0 deletions
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
new file mode 100644
index 0000000000..a44b3fa334
--- /dev/null
+++ b/src/rabbit_networking.erl
@@ -0,0 +1,440 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_networking).
+
+%% This module contains various functions that deal with networking,
+%% TCP and TLS listeners, and connection information.
+%%
+%% It also contains a boot step — boot/0 — that starts networking machinery.
+%% This module primarily covers AMQP 0-9-1 but some bits are reused in
+%% plugins that provide protocol support, e.g. STOMP or MQTT.
+%%
+%% Functions in this module take care of normalising TCP listener options,
+%% including dual IP stack cases, and starting the AMQP 0-9-1 listener(s).
+%%
+%% See also tcp_listener_sup and tcp_listener.
+
+-export([boot/0, start_tcp_listener/2, start_ssl_listener/3,
+ stop_tcp_listener/1, on_node_down/1, active_listeners/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
+ connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1, connection_info_all/3,
+ close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
+
+%% Used by TCP-based transports, e.g. STOMP adapter
+-export([tcp_listener_addresses/1, tcp_listener_spec/9,
+ ensure_ssl/0, fix_ssl_options/1, poodle_check/1]).
+
+-export([tcp_listener_started/4, tcp_listener_stopped/4]).
+
+%% Internal
+-export([connections_local/0]).
+
+-include("rabbit.hrl").
+
+%% IANA-suggested ephemeral port range is 49152 to 65535
+-define(FIRST_TEST_BIND_PORT, 49152).
+
+%%----------------------------------------------------------------------------
+
+-export_type([ip_port/0, hostname/0]).
+
+-type hostname() :: rabbit_net:hostname().
+-type ip_port() :: rabbit_net:ip_port().
+
+-type family() :: atom().
+-type listener_config() :: ip_port() |
+ {hostname(), ip_port()} |
+ {hostname(), ip_port(), family()}.
+-type address() :: {inet:ip_address(), ip_port(), family()}.
+-type name_prefix() :: atom().
+-type protocol() :: atom().
+-type label() :: string().
+
+-spec start_tcp_listener(listener_config(), integer()) -> 'ok'.
+-spec start_ssl_listener
+ (listener_config(), rabbit_types:infos(), integer()) -> 'ok'.
+-spec stop_tcp_listener(listener_config()) -> 'ok'.
+-spec active_listeners() -> [rabbit_types:listener()].
+-spec node_listeners(node()) -> [rabbit_types:listener()].
+-spec register_connection(pid()) -> ok.
+-spec unregister_connection(pid()) -> ok.
+-spec connections() -> [rabbit_types:connection()].
+-spec connections_local() -> [rabbit_types:connection()].
+-spec connection_info_keys() -> rabbit_types:info_keys().
+-spec connection_info(rabbit_types:connection()) -> rabbit_types:infos().
+-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+-spec connection_info_all() -> [rabbit_types:infos()].
+-spec connection_info_all(rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+-spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) ->
+ 'ok'.
+-spec close_connection(pid(), string()) -> 'ok'.
+-spec force_connection_event_refresh(reference()) -> 'ok'.
+
+-spec on_node_down(node()) -> 'ok'.
+-spec tcp_listener_addresses(listener_config()) -> [address()].
+-spec tcp_listener_spec
+ (name_prefix(), address(), [gen_tcp:listen_option()], module(), module(),
+ protocol(), any(), non_neg_integer(), label()) ->
+ supervisor:child_spec().
+-spec ensure_ssl() -> rabbit_types:infos().
+-spec poodle_check(atom()) -> 'ok' | 'danger'.
+
+-spec boot() -> 'ok'.
+-spec tcp_listener_started
+ (_, _,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()}, _) ->
+ 'ok'.
+-spec tcp_listener_stopped
+ (_, _,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok'.
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ ok = record_distribution_listener(),
+ _ = application:start(ranch),
+ ok = boot_tcp(application:get_env(rabbit, num_tcp_acceptors, 10)),
+ ok = boot_ssl(application:get_env(rabbit, num_ssl_acceptors, 1)).
+
+boot_tcp(NumAcceptors) ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Listener, NumAcceptors) || Listener <- TcpListeners],
+ ok.
+
+boot_ssl(NumAcceptors) ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ SslOpts = ensure_ssl(),
+ case poodle_check('AMQP') of
+ ok -> [start_ssl_listener(L, SslOpts, NumAcceptors) || L <- SslListeners];
+ danger -> ok
+ end,
+ ok
+ end.
+
+ensure_ssl() ->
+ {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
+ ok = app_utils:start_applications(SslAppsConfig),
+ {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
+ rabbit_ssl_options:fix(SslOptsConfig).
+
+poodle_check(Context) ->
+ {ok, Vsn} = application:get_key(ssl, vsn),
+ case rabbit_misc:version_compare(Vsn, "5.3", gte) of %% R16B01
+ true -> ok;
+ false -> case application:get_env(rabbit, ssl_allow_poodle_attack) of
+ {ok, true} -> ok;
+ _ -> log_poodle_fail(Context),
+ danger
+ end
+ end.
+
+log_poodle_fail(Context) ->
+ rabbit_log:error(
+ "The installed version of Erlang (~s) contains the bug OTP-10905,~n"
+ "which makes it impossible to disable SSLv3. This makes the system~n"
+ "vulnerable to the POODLE attack. SSL listeners for ~s have therefore~n"
+ "been disabled.~n~n"
+ "You are advised to upgrade to a recent Erlang version; R16B01 is the~n"
+ "first version in which this bug is fixed, but later is usually~n"
+ "better.~n~n"
+ "If you cannot upgrade now and want to re-enable SSL listeners, you can~n"
+ "set the config item 'ssl_allow_poodle_attack' to 'true' in the~n"
+ "'rabbit' section of your configuration file.~n",
+ [rabbit_misc:otp_release(), Context]).
+
+fix_ssl_options(Config) ->
+ rabbit_ssl_options:fix(Config).
+
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
+ %% Variant to prevent lots of hacking around in bash and batch files
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
+ %% auto: determine family IPv4 / IPv6 after converting to IP address
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
+ Listener <- port_to_listeners(Port)]).
+
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts,
+ {?MODULE, tcp_listener_started, [Protocol, SocketOpts]},
+ {?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]},
+ NumAcceptors, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
+start_tcp_listener(Listener, NumAcceptors) ->
+ start_listener(Listener, NumAcceptors, amqp, "TCP Listener", tcp_opts()).
+
+start_ssl_listener(Listener, SslOpts, NumAcceptors) ->
+ start_listener(Listener, NumAcceptors, 'amqp/ssl', "SSL Listener", tcp_opts() ++ SslOpts).
+
+start_listener(Listener, NumAcceptors, Protocol, Label, Opts) ->
+ [start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ||
+ Address <- tcp_listener_addresses(Listener)],
+ ok.
+
+start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ->
+ Transport = case Protocol of
+ amqp -> ranch_tcp;
+ 'amqp/ssl' -> ranch_ssl
+ end,
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, Opts,
+ Transport, rabbit_connection_sup, [], Protocol,
+ NumAcceptors, Label),
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
+
+stop_tcp_listener(Listener) ->
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
+ ok.
+
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
+ Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
+ ok = supervisor:terminate_child(rabbit_sup, Name),
+ ok = supervisor:delete_child(rabbit_sup, Name).
+
+tcp_listener_started(Protocol, Opts, IPAddress, Port) ->
+ %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
+ %% We need the host so we can distinguish multiple instances of the above
+ %% in a cluster.
+ ok = mnesia:dirty_write(
+ rabbit_listener,
+ #listener{node = node(),
+ protocol = Protocol,
+ host = tcp_host(IPAddress),
+ ip_address = IPAddress,
+ port = Port,
+ opts = Opts}).
+
+tcp_listener_stopped(Protocol, Opts, IPAddress, Port) ->
+ ok = mnesia:dirty_delete_object(
+ rabbit_listener,
+ #listener{node = node(),
+ protocol = Protocol,
+ host = tcp_host(IPAddress),
+ ip_address = IPAddress,
+ port = Port,
+ opts = Opts}).
+
+record_distribution_listener() ->
+ {Name, Host} = rabbit_nodes:parts(node()),
+ {port, Port, _Version} = erl_epmd:port_please(Name, Host),
+ tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port).
+
+active_listeners() ->
+ rabbit_misc:dirty_read_all(rabbit_listener).
+
+node_listeners(Node) ->
+ mnesia:dirty_read(rabbit_listener, Node).
+
+on_node_down(Node) ->
+ case lists:member(Node, nodes()) of
+ false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
+ true -> rabbit_log:info(
+ "Keep ~s listeners: the node is already back~n", [Node])
+ end.
+
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
+
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+
+connections() ->
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
+ rabbit_networking, connections_local, []).
+
+connections_local() -> pg_local:get_members(rabbit_connections).
+
+connection_info_keys() -> rabbit_reader:info_keys().
+
+connection_info(Pid) -> rabbit_reader:info(Pid).
+connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+
+connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+
+connection_info_all(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
+ connections()).
+
+close_connection(Pid, Explanation) ->
+ rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
+ case lists:member(Pid, connections()) of
+ true -> rabbit_reader:shutdown(Pid, Explanation);
+ false -> throw({error, {not_a_connection_pid, Pid}})
+ end.
+
+force_connection_event_refresh(Ref) ->
+ [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
+ ok.
+
+%%--------------------------------------------------------------------
+
+tcp_host(IPAddress) ->
+ rabbit_net:tcp_host(IPAddress).
+
+cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+
+tcp_opts() ->
+ {ok, ConfigOpts} = application:get_env(rabbit, tcp_listen_options),
+ ConfigOpts.
+
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
+%%--------------------------------------------------------------------
+
+%% There are three kinds of machine (for our purposes).
+%%
+%% * Those which treat IPv4 addresses as a special kind of IPv6 address
+%% ("Single stack")
+%% - Linux by default, Windows Vista and later
+%% - We also treat any (hypothetical?) IPv6-only machine the same way
+%% * Those which consider IPv6 and IPv4 to be completely separate things
+%% ("Dual stack")
+%% - OpenBSD, Windows XP / 2003, Linux if so configured
+%% * Those which do not support IPv6.
+%% - Ancient/weird OSes, Linux if so configured
+%%
+%% How to reconfigure Linux to test this:
+%% Single stack (default):
+%% echo 0 > /proc/sys/net/ipv6/bindv6only
+%% Dual stack:
+%% echo 1 > /proc/sys/net/ipv6/bindv6only
+%% IPv4 only:
+%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
+%% sudo update-grub && sudo reboot
+%%
+%% This matters in (and only in) the case where the sysadmin (or the
+%% app descriptor) has only supplied a port and we wish to bind to
+%% "all addresses". This means different things depending on whether
+%% we're single or dual stack. On single stack binding to "::"
+%% implicitly includes all IPv4 addresses, and subsequently attempting
+%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
+%% only bind to IPv6 addresses, and we need another listener bound to
+%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
+%% want to bind to "0.0.0.0".
+%%
+%% Unfortunately it seems there is no way to detect single vs dual stack
+%% apart from attempting to bind to the port.
+port_to_listeners(Port) ->
+ IPv4 = {"0.0.0.0", Port, inet},
+ IPv6 = {"::", Port, inet6},
+ case ipv6_status(?FIRST_TEST_BIND_PORT) of
+ single_stack -> [IPv6];
+ ipv6_only -> [IPv6];
+ dual_stack -> [IPv6, IPv4];
+ ipv4_only -> [IPv4]
+ end.
+
+ipv6_status(TestPort) ->
+ IPv4 = [inet, {ip, {0,0,0,0}}],
+ IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
+ case gen_tcp:listen(TestPort, IPv6) of
+ {ok, LSock6} ->
+ case gen_tcp:listen(TestPort, IPv4) of
+ {ok, LSock4} ->
+ %% Dual stack
+ gen_tcp:close(LSock6),
+ gen_tcp:close(LSock4),
+ dual_stack;
+ %% Checking the error here would only let us
+ %% distinguish single stack IPv6 / IPv4 vs IPv6 only,
+ %% which we figure out below anyway.
+ {error, _} ->
+ gen_tcp:close(LSock6),
+ case gen_tcp:listen(TestPort, IPv4) of
+ %% Single stack
+ {ok, LSock4} -> gen_tcp:close(LSock4),
+ single_stack;
+ %% IPv6-only machine. Welcome to the future.
+ {error, eafnosupport} -> ipv6_only; %% Linux
+ {error, eprotonosupport}-> ipv6_only; %% FreeBSD
+ %% Dual stack machine with something already
+ %% on IPv4.
+ {error, _} -> ipv6_status(TestPort + 1)
+ end
+ end;
+ %% IPv4-only machine. Welcome to the 90s.
+ {error, eafnosupport} -> %% Linux
+ ipv4_only;
+ {error, eprotonosupport} -> %% FreeBSD
+ ipv4_only;
+ %% Port in use
+ {error, _} ->
+ ipv6_status(TestPort + 1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
new file mode 100644
index 0000000000..b10e8b15aa
--- /dev/null
+++ b/src/rabbit_reader.erl
@@ -0,0 +1,1574 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_reader).
+
+%% This is an AMQP 0-9-1 connection implementation. If AMQP 1.0 plugin is enabled,
+%% this module passes control of incoming AMQP 1.0 connections to it.
+%%
+%% Every connection (as in, a process using this module)
+%% is a controlling process for a server socket.
+%%
+%% Connections have a number of responsibilities:
+%%
+%% * Performing protocol handshake
+%% * Parsing incoming data and dispatching protocol methods
+%% * Authenticating clients (with the help of authentication backends)
+%% * Enforcing TCP backpressure (throttling clients)
+%% * Enforcing connection limits, e.g. channel_max
+%% * Channel management
+%% * Setting up heartbeater and alarm notifications
+%% * Emitting connection and network activity metric events
+%% * Gracefully handling client disconnects, channel termination, etc
+%%
+%% and a few more.
+%%
+%% Every connection has
+%%
+%% * a queue collector which is responsible for keeping
+%% track of exclusive queues on the connection and their cleanup.
+%% * a heartbeater that's responsible for sending heartbeat frames to clients,
+%% keeping track of the incoming ones and notifying connection about
+%% heartbeat timeouts
+%% * Stats timer, a timer that is used to periodically emit metric events
+%%
+%% Some dependencies are started under a separate supervisor to avoid deadlocks
+%% during system shutdown. See rabbit_channel_sup:start_link/0 for details.
+%%
+%% Reader processes are special processes (in the OTP sense).
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/2,
+ shutdown/2]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
+-export([init/4, mainloop/4, recvloop/4]).
+
+-export([conserve_resources/3, server_properties/1]).
+
+-define(NORMAL_TIMEOUT, 3).
+-define(CLOSING_TIMEOUT, 30).
+-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+%% we wait for this many seconds before closing TCP connection
+%% with a client that failed to log in. Provides some relief
+%% from connection storms and DoS.
+-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
+
+%%--------------------------------------------------------------------------
+
+-record(v1, {
+ %% parent process
+ parent,
+ %% socket
+ sock,
+ %% connection state, see connection record
+ connection,
+ callback,
+ recv_len,
+ pending_recv,
+ %% pre_init | securing | running | blocking | blocked | closing | closed | {become, F}
+ connection_state,
+ %% see comment in rabbit_connection_sup:start_link/0
+ helper_sup,
+ %% takes care of cleaning up exclusive queues,
+ %% see rabbit_queue_collector
+ queue_collector,
+ %% sends and receives heartbeat frames,
+ %% see rabbit_heartbeat
+ heartbeater,
+ %% timer used to emit statistics
+ stats_timer,
+ %% channel supervisor
+ channel_sup_sup_pid,
+ %% how many channels this connection has
+ channel_count,
+ %% throttling state, for both
+ %% credit- and resource-driven flow control
+ throttle}).
+
+-record(connection, {
+ %% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
+ name,
+ %% used for logging: same as `name`, but optionally
+ %% augmented with user-supplied name
+ log_name,
+ %% server host
+ host,
+ %% client host
+ peer_host,
+ %% server port
+ port,
+ %% client port
+ peer_port,
+ %% protocol framing implementation module,
+ %% e.g. rabbit_framing_amqp_0_9_1
+ protocol,
+ user,
+ %% heartbeat timeout value used, 0 means
+ %% heartbeats are disabled
+ timeout_sec,
+ %% maximum allowed frame size,
+ %% see frame_max in the AMQP 0-9-1 spec
+ frame_max,
+ %% greatest channel number allowed,
+ %% see channel_max in the AMQP 0-9-1 spec
+ channel_max,
+ vhost,
+ %% client name, version, platform, etc
+ client_properties,
+ %% what lists protocol extensions
+ %% does this client support?
+ capabilities,
+ %% authentication mechanism used
+ %% as a pair of {Name, Module}
+ auth_mechanism,
+ %% authentication mechanism state,
+ %% initialised by rabbit_auth_mechanism:init/1
+ %% implementations
+ auth_state,
+ %% time of connection
+ connected_at}).
+
+-record(throttle, {
+ %% list of active alarms
+ alarmed_by,
+ %% flow | resource
+ last_blocked_by,
+ %% never | timestamp()
+ last_blocked_at
+}).
+
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels, reductions,
+ garbage_collection]).
+
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
+-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
+ garbage_collection]).
+
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, channel_max, client_properties, connected_at]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
+
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
+
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
+%%--------------------------------------------------------------------------
+
+-spec start_link(pid(), any(), rabbit_net:socket()) -> rabbit_types:ok(pid()).
+-spec info_keys() -> rabbit_types:info_keys().
+-spec info(pid()) -> rabbit_types:infos().
+-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+-spec force_event_refresh(pid(), reference()) -> 'ok'.
+-spec shutdown(pid(), string()) -> 'ok'.
+-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
+ IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
+ NodeForWhichAlarmWasSetOrCleared :: node()}.
+-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
+-spec server_properties(rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table().
+
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec init(pid(), pid(), any(), rabbit_net:socket()) -> no_return().
+-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
+ no_return().
+
+-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any().
+-spec system_code_change(_,_,_,_) -> {'ok',_}.
+-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
+-spec system_terminate(_,_,_,_) -> none().
+
+%%--------------------------------------------------------------------------
+
+start_link(HelperSup, Ref, Sock) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref, Sock]),
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connections.
+ gen_event:which_handlers(error_logger),
+
+ {ok, Pid}.
+
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
+init(Parent, HelperSup, Ref, Sock) ->
+ rabbit_net:accept_ack(Ref, Sock),
+ Deb = sys:debug_options([]),
+ start_connection(Parent, HelperSup, Deb, Sock).
+
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
+info_keys() -> ?INFO_KEYS.
+
+info(Pid) ->
+ gen_server:call(Pid, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+force_event_refresh(Pid, Ref) ->
+ gen_server:cast(Pid, {force_event_refresh, Ref}).
+
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
+server_properties(Protocol) ->
+ {ok, Product} = application:get_key(rabbit, description),
+ {ok, Version} = application:get_key(rabbit, vsn),
+
+ %% Get any configuration-specified server properties
+ {ok, RawConfigServerProps} = application:get_env(rabbit,
+ server_properties),
+
+ %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
+ %% from the config and merge them with the generated built-in properties
+ NormalizedConfigServerProps =
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ maybe_list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
+
+ %% Filter duplicated properties in favour of config file provided values
+ lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
+ NormalizedConfigServerProps).
+
+maybe_list_to_binary(V) when is_binary(V) -> V;
+maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
+
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true},
+ {<<"consumer_priorities">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true},
+ {<<"per_consumer_qos">>, bool, true},
+ {<<"direct_reply_to">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
+%%--------------------------------------------------------------------------
+
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
+socket_error(Reason) when is_atom(Reason) ->
+ log(error, "Error on AMQP connection ~p: ~s~n",
+ [self(), rabbit_misc:format_inet_error(Reason)]);
+socket_error(Reason) ->
+ Level =
+ case Reason of
+ {ssl_upgrade_error, closed} ->
+ %% The socket was closed while upgrading to SSL.
+ %% This is presumably a TCP healthcheck, so don't log
+ %% it unless specified otherwise.
+ debug;
+ _ ->
+ error
+ end,
+ log(Level, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+socket_op(Sock, Fun) ->
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(Sock),
+ exit(normal)
+ end.
+
+start_connection(Parent, HelperSup, Deb, Sock) ->
+ process_flag(trap_exit, true),
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Str} -> list_to_binary(Str);
+ {error, enotconn} -> rabbit_net:fast_close(Sock),
+ exit(normal);
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(Sock),
+ exit(normal)
+ end,
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
+ InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ ?store_proc_name(Name),
+ State = #v1{parent = Parent,
+ sock = Sock,
+ connection = #connection{
+ name = Name,
+ log_name = Name,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
+ protocol = none,
+ user = none,
+ timeout_sec = (HandshakeTimeout / 1000),
+ frame_max = InitialFrameMax,
+ vhost = none,
+ client_properties = none,
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none,
+ connected_at = time_compat:os_system_time(
+ milli_seconds)},
+ callback = uninitialized_callback,
+ recv_len = 0,
+ pending_recv = false,
+ connection_state = pre_init,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup = HelperSup,
+ heartbeater = none,
+ channel_sup_sup_pid = none,
+ channel_count = 0,
+ throttle = #throttle{
+ alarmed_by = [],
+ last_blocked_by = none,
+ last_blocked_at = never}},
+ try
+ case run({?MODULE, recvloop,
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}) of
+ %% connection was closed cleanly by the client
+ #v1{connection = #connection{user = #user{username = Username},
+ vhost = VHost}} ->
+ log(info, "closing AMQP connection ~p (~s, vhost: '~s', user: '~s')~n",
+ [self(), dynamic_connection_name(Name), VHost, Username]);
+ %% just to be more defensive
+ _ ->
+ log(info, "closing AMQP connection ~p (~s)~n",
+ [self(), dynamic_connection_name(Name)])
+ end
+ catch
+ Ex ->
+ log_connection_exception(dynamic_connection_name(Name), Ex)
+ after
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(Sock),
+ rabbit_networking:unregister_connection(self()),
+ rabbit_core_metrics:connection_closed(self()),
+ rabbit_event:notify(connection_closed, [{pid, self()}])
+ end,
+ done.
+
+log_connection_exception(Name, Ex) ->
+ Severity = case Ex of
+ connection_closed_with_no_data_received -> debug;
+ {connection_closed_abruptly, _} -> warning;
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end,
+ log_connection_exception(Severity, Name, Ex).
+
+log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
+ %% Long line to avoid extra spaces and line breaks in log
+ log(Severity, "closing AMQP connection ~p (~s):~nmissed heartbeats from client, timeout: ~ps~n",
+ [self(), Name, TimeoutSec]);
+log_connection_exception(Severity, Name, {connection_closed_abruptly,
+ #v1{connection = #connection{user = #user{username = Username},
+ vhost = VHost}}}) ->
+ log(Severity, "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
+ [self(), Name, VHost, Username]);
+%% when client abruptly closes connection before connection.open/authentication/authorization
+%% succeeded, don't log username and vhost as 'none'
+log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
+ log(Severity, "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
+ [self(), Name]);
+%% old exception structure
+log_connection_exception(Severity, Name, connection_closed_abruptly) ->
+ log(Severity, "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
+ [self(), Name]);
+log_connection_exception(Severity, Name, Ex) ->
+ log(Severity, "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), Name, Ex]).
+
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
+ when BufLen < RecvLen ->
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
+ {error, Reason} -> stop(Reason, State)
+ end;
+recvloop(Deb, [B], _BufLen, State) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
+
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
+ connection_state = CS,
+ connection = #connection{
+ name = ConnName}}) ->
+ Recv = rabbit_net:recv(Sock),
+ case CS of
+ pre_init when Buf =:= [] ->
+ %% We only log incoming connections when either the
+ %% first byte was received or there was an error (eg. a
+ %% timeout).
+ %%
+ %% The goal is to not log TCP healthchecks (a connection
+ %% with no data received) unless specified otherwise.
+ log(case Recv of
+ closed -> debug;
+ _ -> info
+ end, "accepting AMQP connection ~p (~s)~n",
+ [self(), ConnName]);
+ _ ->
+ ok
+ end,
+ case Recv of
+ {data, Data} ->
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ State;
+ closed when CS =:= pre_init andalso Buf =:= [] ->
+ stop(tcp_healthcheck, State);
+ closed ->
+ stop(closed, State);
+ {other, {heartbeat_send_error, Reason}} ->
+ %% The only portable way to detect disconnect on blocked
+ %% connection is to wait for heartbeat send failure.
+ stop(Reason, State);
+ {error, Reason} ->
+ stop(Reason, State);
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, {Buf, BufLen, State});
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> State;
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
+ end
+ end.
+
+stop(tcp_healthcheck, State) ->
+ %% The connection was closed before any packet was received. It's
+ %% probably a load-balancer healthcheck: don't consider this a
+ %% failure.
+ maybe_emit_stats(State),
+ throw(connection_closed_with_no_data_received);
+stop(closed, State) ->
+ maybe_emit_stats(State),
+ throw({connection_closed_abruptly, State});
+stop(Reason, State) ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason}).
+
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ State1 = control_throttle(
+ State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}),
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {true, false} -> ok = send_unblocked(State1);
+ {_, _} -> ok
+ end,
+ State1;
+handle_other({channel_closing, ChPid}, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
+ %% http://www.erlang.org/doc/man/sys.html
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ maybe_emit_stats(State),
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
+ stop;
+handle_other(handshake_timeout, State)
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
+ State;
+handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
+ throw({handshake_timeout, State#v1.callback});
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout,
+ State = #v1{connection = #connection{timeout_sec = T}}) ->
+ maybe_emit_stats(State),
+ throw({heartbeat_timeout, T});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> stop;
+ normal -> NewState
+ end;
+handle_other({'$gen_call', From, info}, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ State;
+handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
+ when ?IS_RUNNING(State) ->
+ rabbit_event:notify(
+ connection_created,
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
+handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
+ %% Ignore, we will emit a created event once we start running.
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
+ %% Here we are receiving credit by some channel process.
+ credit_flow:handle_bump_msg(Msg),
+ control_throttle(State);
+handle_other(Other, State) ->
+ %% internal error -> something worth dying for
+ maybe_emit_stats(State),
+ exit({unexpected_message, Other}).
+
+switch_callback(State, Callback, Length) ->
+ State#v1{callback = Callback, recv_len = Length}.
+
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ State1 = State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{
+ last_blocked_at =
+ time_compat:monotonic_time()})},
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {_, _} -> ok
+ end,
+ State1;
+maybe_block(State) ->
+ State.
+
+
+blocked_by_alarm(#v1{connection_state = blocked,
+ throttle = #throttle{alarmed_by = CR}})
+ when CR =/= [] ->
+ true;
+blocked_by_alarm(#v1{}) ->
+ false.
+
+send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol);
+ _ ->
+ ok
+ end.
+
+send_unblocked(#v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
+ _ ->
+ ok
+ end.
+
+update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
+
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
+ timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ clean_up_exclusive_queues(Collector),
+ %% We terminate the connection after the specified interval, but
+ %% no later than ?CLOSING_TIMEOUT seconds.
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
+ State#v1{connection_state = closed}.
+
+%% queue collector will be undefined when connection
+%% tuning was never performed or didn't finish. In such cases
+%% there's also nothing to clean up.
+clean_up_exclusive_queues(undefined) ->
+ ok;
+
+clean_up_exclusive_queues(Collector) ->
+ rabbit_queue_collector:delete_all(Collector).
+
+handle_dependent_exit(ChPid, Reason, State) ->
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, controlled} -> State1;
+ {undefined, uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
+ exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_, controlled} -> maybe_close(control_throttle(State1));
+ {_, uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
+ State2 = handle_exception(
+ State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
+ end.
+
+terminate_channels(#v1{channel_count = 0} = State) ->
+ State;
+terminate_channels(#v1{channel_count = ChannelCount} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(ChannelCount, TimerRef, State).
+
+wait_for_channel_termination(0, TimerRef, State) ->
+ case erlang:cancel_timer(TimerRef) of
+ false -> receive
+ cancel_wait -> State
+ end;
+ _ -> State
+ end;
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ log_name = ConnName,
+ user = User,
+ vhost = VHost},
+ sock = Sock}) ->
+ receive
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ handle_uncontrolled_channel_close(ChPid),
+ wait_for_channel_termination(N-1, TimerRef, State1)
+ end;
+ {'EXIT', Sock, _Reason} ->
+ [channel_cleanup(ChPid, State) || ChPid <- all_channels()],
+ exit(normal);
+ cancel_wait ->
+ exit(channel_termination_timeout)
+ end.
+
+maybe_close(State = #v1{connection_state = closing,
+ channel_count = 0,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
+maybe_close(State) ->
+ State.
+
+termination_kind(normal) -> controlled;
+termination_kind(_) -> uncontrolled.
+
+format_hard_error(#amqp_error{name = N, explanation = E, method = M}) ->
+ io_lib:format("operation ~s caused a connection exception ~s: ~p", [M, N, E]);
+format_hard_error(Reason) ->
+ case io_lib:deep_char_list(Reason) of
+ true -> Reason;
+ false -> rabbit_misc:format("~p", [Reason])
+ end.
+
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ log_name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~s~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, format_hard_error(Reason)]).
+
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log_hard_error(State, Channel, Reason),
+ State;
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ respond_and_close(State, Channel, Protocol, Reason, Reason);
+%% authentication failure
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ capabilities = Capabilities},
+ connection_state = starting},
+ Channel, Reason = #amqp_error{name = access_refused,
+ explanation = ErrMsg}) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, state: ~p):~n~s~n",
+ [self(), ConnName, starting, ErrMsg]),
+ %% respect authentication failure notification capability
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+ _ ->
+ close_connection(terminate_channels(State))
+ end;
+%% when loopback-only user tries to connect from a non-local host
+%% when user tries to access a vhost it has no permissions for
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ user = User},
+ connection_state = opening},
+ Channel, Reason = #amqp_error{name = not_allowed,
+ explanation = ErrMsg}) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, user: '~s', state: ~p):~n~s~n",
+ [self(), ConnName, User#user.username, opening, ErrMsg]),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS = opening},
+ Channel, Reason = #amqp_error{}) ->
+ respond_and_close(State, Channel, Protocol, Reason,
+ {handshake_error, CS, Reason});
+%% when negotiation fails, e.g. due to channel_max being higher than the
+%% maxiumum allowed limit
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ user = User},
+ connection_state = tuning},
+ Channel, Reason = #amqp_error{name = not_allowed,
+ explanation = ErrMsg}) ->
+ log(error,
+ "Error on AMQP connection ~p (~s,"
+ " user: '~s', state: ~p):~n~s~n",
+ [self(), ConnName, User#user.username, tuning, ErrMsg]),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
+
+%%--------------------------------------------------------------------------
+
+create_channel(_Channel,
+ #v1{channel_count = ChannelCount,
+ connection = #connection{channel_max = ChannelMax}})
+ when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has reached the "
+ "negotiated channel_max (~w)",
+ [ChannelCount, ChannelMax], 'none')};
+create_channel(Channel,
+ #v1{sock = Sock,
+ queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ channel_count = ChannelCount,
+ connection =
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State) ->
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
+
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> {undefined, State};
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ {Channel, State#v1{channel_count = ChannelCount - 1}}
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ {method, MethodName, FieldsBin} ->
+ handle_method0(MethodName, FieldsBin, State);
+ _Other -> State
+ end;
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
+ heartbeat -> State;
+ {method, MethodName, FieldsBin} ->
+ handle_method0(MethodName, FieldsBin, State);
+ _Other -> unexpected_frame(Type, 0, Payload, State)
+ end;
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
+
+process_frame(Frame, Channel, State) ->
+ ChKey = {channel, Channel},
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other, State}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}, State1} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
+ {error, Reason} ->
+ handle_exception(State1, Channel, Reason)
+ end
+ end.
+
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ {_, State1} = channel_cleanup(ChPid, State),
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State1);
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame(_Frame, _ChPid, State) ->
+ State.
+
+%%--------------------------------------------------------------------------
+
+%% We allow clients to exceed the frame size a little bit since quite
+%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
+-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
+
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
+ State = #v1{connection = #connection{frame_max = FrameMax}})
+ when FrameMax /= 0 andalso
+ PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
+handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
+ case EndMarker of
+ ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
+ {Rest, switch_callback(State1, frame_header, 7)};
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
+ end;
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_header, Other});
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
+
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handshake({0, 0, 9, 1}, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is the protocol header for 0-9, which we can safely treat as
+%% though it were 0-9-1.
+handshake({1, 1, 0, 9}, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
+%% defines the version as 8-0.
+handshake({1, 1, 8, 0}, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% The 0-8 spec as on the AMQP web site actually has this as the
+%% protocol header; some libraries e.g., py-amqplib, send it when they
+%% want 0-8.
+handshake({1, 1, 9, 1}, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% ... and finally, the 1.0 spec is crystal clear!
+handshake({Id, 1, 0, 0}, State) ->
+ become_1_0(Id, State);
+
+handshake(Vsn, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, Vsn}).
+
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(Protocol),
+ mechanisms = auth_mechanisms_binary(Sock),
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
+
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
+ throw(Exception).
+
+-spec refuse_connection(rabbit_net:socket(), any()) -> no_return().
+
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
+ensure_stats_timer(State = #v1{connection_state = running}) ->
+ rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
+ensure_stats_timer(State) ->
+ State.
+
+%%--------------------------------------------------------------------------
+
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
+ maybe_emit_stats(State),
+ throw({connection_closed_abruptly, State});
+ exit:#amqp_error{method = none} = Reason ->
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
+ Type:Reason ->
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
+ end.
+
+handle_method0(#'connection.start_ok'{mechanism = Mechanism,
+ response = Response,
+ client_properties = ClientProperties},
+ State0 = #v1{connection_state = starting,
+ connection = Connection0,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
+ Connection1 = Connection0#connection{
+ client_properties = ClientProperties,
+ capabilities = Capabilities,
+ auth_mechanism = {Mechanism, AuthMechanism},
+ auth_state = AuthMechanism:init(Sock)},
+ Connection2 = augment_connection_log_name(Connection1),
+ State = State0#v1{connection_state = securing,
+ connection = Connection2},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
+ State = #v1{connection_state = tuning,
+ connection = Connection,
+ helper_sup = SupPid,
+ sock = Sock}) ->
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ Parent = self(),
+ SendFun =
+ fun() ->
+ case catch rabbit_net:send(Sock, Frame) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ Parent ! {heartbeat_send_error, Reason};
+ Unexpected ->
+ Parent ! {heartbeat_send_error, Unexpected}
+ end,
+ ok
+ end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
+
+handle_method0(#'connection.open'{virtual_host = VHost},
+ State = #v1{connection_state = opening,
+ connection = Connection = #connection{
+ log_name = ConnName,
+ user = User = #user{username = Username},
+ protocol = Protocol},
+ helper_sup = SupPid,
+ sock = Sock,
+ throttle = Throttle}) ->
+ ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
+ NewConnection = Connection#connection{vhost = VHost},
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{alarmed_by = Conserve},
+ {ok, ChannelSupSupPid} =
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
+ State1 = control_throttle(
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
+ Infos = [{type, network} | infos(?CREATION_EVENT_KEYS, State1)],
+ rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos),
+ Infos),
+ rabbit_event:notify(connection_created, Infos),
+ maybe_emit_stats(State1),
+ log(info, "connection ~p (~s): user '~s' authenticated and granted access to vhost '~s'~n",
+ [self(), ConnName, Username, VHost]),
+ State1;
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{},
+ State = #v1{connection = #connection{protocol = Protocol},
+ sock = Sock})
+ when ?IS_STOPPING(State) ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ State;
+handle_method0(#'connection.close_ok'{},
+ State = #v1{connection_state = closed}) ->
+ self() ! terminate_connection,
+ State;
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
+ State;
+handle_method0(_Method, #v1{connection_state = S}) ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected method in connection state ~w", [S]).
+
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, Min, ClientValue);
+ ServerValue /= 0 andalso (ClientValue =:= 0 orelse
+ ClientValue > ServerValue) ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
+
+%% keep dialyzer happy
+-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
+ no_return().
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
+
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
+
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+
+auth_mechanism_to_module(TypeBin, Sock) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms(Sock)),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms(Sock) ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ Module:should_offer(Sock), lists:member(Name, Configured)].
+
+auth_mechanisms_binary(Sock) ->
+ list_to_binary(
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
+
+auth_phase(Response,
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = {Name, AuthMechanism},
+ auth_state = AuthState},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Username, Msg, Args} ->
+ auth_fail(Username, Msg, Args, Name, State);
+ {protocol_error, Msg, Args} ->
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ State),
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, Sock) of
+ ok ->
+ notify_auth_result(Username, user_authentication_success,
+ [], State);
+ not_allowed ->
+ auth_fail(Username, "user '~s' can only connect via "
+ "localhost", [Username], Name, State)
+ end,
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User,
+ auth_state = none}}
+ end.
+
+-spec auth_fail
+ (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) ->
+ no_return().
+
+auth_fail(Username, Msg, Args, AuthName,
+ State = #v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities}}) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], State),
+ AmqpError = rabbit_misc:amqp_error(
+ access_refused, "~s login refused: ~s",
+ [AuthName, io_lib:format(Msg, Args)], none),
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ SafeMsg = io_lib:format(
+ "Login was refused using authentication "
+ "mechanism ~s. For details see the broker "
+ "logfile.", [AuthName]),
+ AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError1, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ _ -> ok
+ end,
+ rabbit_misc:protocol_error(AmqpError).
+
+notify_auth_result(Username, AuthResult, ExtraProps, State) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, State)};
+ _ -> {Item, i(Item, State)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
+%%--------------------------------------------------------------------------
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) -> self();
+i(SockStat, S) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end, S);
+i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
+i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
+i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
+i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
+i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
+i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
+i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
+i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
+i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{alarmed_by = Alarms,
+ last_blocked_by = WasBlockedBy,
+ last_blocked_at = T}}) ->
+ case Alarms =:= [] andalso %% not throttled by resource alarms
+ (credit_flow:blocked() %% throttled by flow now
+ orelse %% throttled by flow recently
+ (WasBlockedBy =:= flow andalso T =/= never andalso
+ time_compat:convert_time_unit(time_compat:monotonic_time() - T,
+ native,
+ micro_seconds) < 5000000)) of
+ true -> flow;
+ false -> ConnectionState
+ end;
+i(garbage_collection, _State) ->
+ rabbit_misc:get_gc_info(self());
+i(reductions, _State) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
+ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
+
+socket_info(Get, Select, #v1{sock = Sock}) ->
+ case Get(Sock) of
+ {ok, T} -> case Select(T) of
+ N when is_number(N) -> N;
+ _ -> 0
+ end;
+ {error, _} -> 0
+ end.
+
+ssl_info(F, #v1{sock = Sock}) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Items} ->
+ P = proplists:get_value(protocol, Items),
+ CS = proplists:get_value(cipher_suite, Items),
+ %% The first form is R14.
+ %% The second is R13 - the extra term is exportability (by
+ %% inspection, the docs are wrong).
+ case CS of
+ {K, C, H} -> F({P, {K, C, H}});
+ {K, C, H, _} -> F({P, {K, C, H}})
+ end
+ end.
+
+cert_info(F, #v1{sock = Sock}) ->
+ case rabbit_net:peercert(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Cert} -> list_to_binary(F(Cert))
+ end.
+
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
+emit_stats(State) ->
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
+ = infos(?SIMPLE_METRICS, State),
+ Infos = infos(?OTHER_METRICS, State),
+ rabbit_core_metrics:connection_stats(Pid, Infos),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
+ rabbit_event:notify(connection_stats, Infos ++ I),
+ State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
+ ensure_stats_timer(State1).
+
+%% 1.0 stub
+-spec become_1_0(non_neg_integer(), #v1{}) -> no_return().
+
+become_1_0(Id, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ F = fun (_Deb, Buf, BufLen, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
+ end,
+ State#v1{connection_state = {become, F}}
+ end.
+
+pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ helper_sup = SupPid}) ->
+ {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
+
+respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
+ log_hard_error(State, Channel, LogErr),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State).
+
+send_error_on_channel0_and_close(Channel, Protocol, Reason, State) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ State1 = close_connection(terminate_channels(State)),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol),
+ State1.
+
+augment_connection_log_name(#connection{client_properties = ClientProperties,
+ name = Name} = Connection) ->
+ case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
+ {longstr, UserSpecifiedName} ->
+ LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
+ log(info, "Connection ~p (~s) has a client-provided name: ~s~n", [self(), Name, UserSpecifiedName]),
+ ?store_proc_name(LogName),
+ Connection#connection{log_name = LogName};
+ _ ->
+ Connection
+ end.
+
+dynamic_connection_name(Default) ->
+ case rabbit_misc:get_proc_name() of
+ {ok, Name} ->
+ Name;
+ _ ->
+ Default
+ end.
+
+handle_uncontrolled_channel_close(ChPid) ->
+ rabbit_core_metrics:channel_closed(ChPid),
+ rabbit_event:notify(channel_closed, [{pid, ChPid}]).