summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-07-22 05:33:09 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-07-22 05:33:09 +0300
commit9adb6cd23905ce01d987ec3dfdef515768b4d208 (patch)
treec337af63b4e2c882bc1f82213988b348341291bc /src
parent6ad34a433978b1cb6eaf4db481ac55e4382b3ef4 (diff)
downloadrabbitmq-server-git-9adb6cd23905ce01d987ec3dfdef515768b4d208.tar.gz
Keep track of connections, introduce per-vhost limits
Fixes #500, #627. Squashed commit of the following: commit 88036dccbb28828ceed39d793b13a2d3d3b99b80 Author: Michael Klishin <mklishin@pivotal.io> Date: Thu Jul 21 03:31:25 2016 +0300 Refactor commit fc84b7a23735352da4cf95726b430fad984b837d Merge: df745e2 df28c63 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 20 18:30:19 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit df745e2544824b882d174b99d1d4470d05ac78c8 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 20 18:04:59 2016 +0300 Force close connections when vhost is deleted Fixes #627, related to #500. commit 2167f8ffebe9473af482816822bb30a0694a1f3e Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 20 16:00:35 2016 +0300 Add tests for per-vhost connection limits commit 2a032a3ac9cc3b01b07692456590e213e5d28806 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 20 01:53:07 2016 +0300 Rename a few tests commit 86ce592db1516bb216d6dc45326ea80f55d14a30 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 20 01:44:10 2016 +0300 Tests for connection re-registration idempotency commit a774c7bebe0d91c18af1a2035697c431dca28d89 Author: Michael Klishin <mklishin@pivotal.io> Date: Tue Jul 19 04:05:20 2016 +0300 Ask nodes that come back to re-register their connections Depending on the partition handling mode used there may or may not be any clients still connected. We make sure that registration and deregistration functions are idempotent and assume there may be connections on the node that has come back. Point of improvement: when a node comes back up, N-1 nodes will tell it to re-register connections. It could be fewer than N-1, ideally just 1. commit 24e4c0e690f192d138e70000ada6335671275f0b Author: Michael Klishin <mklishin@pivotal.io> Date: Mon Jul 18 17:05:17 2016 +0300 Fix boot step commit 62da3c6a73b96d4a07e69a9b1f5519ae708817a6 Author: Michael Klishin <mklishin@pivotal.io> Date: Mon Jul 18 11:16:21 2016 +0300 Compile commit b656f9e96bc4a55072c2092289c52e972c44f754 Merge: f2831e1 492406e Author: Michael Klishin <mklishin@pivotal.io> Date: Thu Jul 14 15:25:49 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit f2831e14cd12fc242b5280326ce1cf28a1fc9766 Merge: e5858e9 7b10a4e Author: Michael Klishin <mklishin@pivotal.io> Date: Thu Jul 7 13:45:31 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit e5858e971825d50032e41794f68692c3d9ffa381 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 6 12:32:56 2016 +0300 Towards working connection re-registration after (inter-node) network splits commit 548df732f17d4e30268cef4c1b2046b8c03613ef Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jul 6 12:32:07 2016 +0300 Make network split simulation work as expected commit 4028c660b96b31123f3a932d17d7b8a23b08cfb6 Author: Michael Klishin <mklishin@pivotal.io> Date: Tue Jul 5 14:43:37 2016 +0300 Close connections using rabbit_ct_client_helpers Per discussion with @dumbbell. commit 26fecc97aa6c4368ecc7dba4464cca8f9ea08cfa Author: Michael Klishin <mklishin@pivotal.io> Date: Tue Jul 5 04:17:52 2016 +0300 Extract connection limit partition tests into a separate suite commit 8a466f1b61e3cb07ba639d5f09d260223c0ff0a4 Author: Michael Klishin <mklishin@pivotal.io> Date: Tue Jul 5 04:17:41 2016 +0300 Better logging commit b06de9b26ee13742290f13b32e293a990c3d5192 Author: Michael Klishin <mklishin@pivotal.io> Date: Mon Jul 4 02:54:54 2016 +0300 Modify a test so that it (expectedly) fails commit 078a78ae00a88566e2b7068a63457e58e149e09e Author: Michael Klishin <mklishin@pivotal.io> Date: Mon Jul 4 02:44:58 2016 +0300 Towards covering node termination/unavailability in connection tracking commit ab99361041fd58371ddd0c1a76ab2a37a3c47142 Author: Michael Klishin <mklishin@pivotal.io> Date: Sun Jul 3 15:25:10 2016 +0300 These are moved to rabbit_ct_broker_helpers commit 520b6ef2b268e263fe4a0d33fbc578343c7ebf83 Author: Michael Klishin <mklishin@pivotal.io> Date: Sun Jul 3 03:54:52 2016 +0300 {allow,block}_traffic_between/2 are moved to rabbit_ct_broker_helpers commit b842eaa616d55ca813012d6afc3e3d9d85acb46f Merge: 26eb1fa d4f031e Author: Michael Klishin <mklishin@pivotal.io> Date: Sun Jul 3 03:14:27 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 26eb1fa0ede083f67f7f7177064f0274ebcd8530 Author: Michael Klishin <mklishin@pivotal.io> Date: Sun Jul 3 02:39:09 2016 +0300 dist_proxy helpers moved to rabbit_ct_broker_helpers commit 3d741f445be053222eaa73c973257114c17aea1c Author: Michael Klishin <mklishin@pivotal.io> Date: Sun Jul 3 01:28:44 2016 +0300 Cluster node shutdown test commit 57c7129edf583120d3f20702ea68a8c2a73cf136 Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 23:01:46 2016 +0300 Refactor commit b736b30724828027d77a34cddf9f4bcb17b1773d Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 22:49:42 2016 +0300 More tests commit dc1cb5f0797cba5840fc1fa5e98a0d688383c713 Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 22:27:16 2016 +0300 More tests commit e94edfed7a19add20545f40cd9dc78562546e163 Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 17:08:34 2016 +0300 Initial per-vhost connection limit tests commit 15b7b4e271eedea9ce02bcbbd766609fa9fe970d Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 15:04:57 2016 +0300 Adapt to master, compile commit dc7f3337a8a0a268c42857becdd92640deddb1a4 Merge: e4884ff bb1fa55 Author: Michael Klishin <mklishin@pivotal.io> Date: Sat Jul 2 02:44:18 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit e4884ffb29452fcdbd11ace1ac7b4d1b7d506b03 Merge: 71e2710 f0f43f8 Author: Michael Klishin <mklishin@pivotal.io> Date: Wed Jun 29 14:27:40 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 71e2710948d3a531c5426b18d63367fddf98ff55 Merge: b1ec9f3 704a2b5 Author: Michael Klishin <michael@clojurewerkz.org> Date: Thu Mar 31 01:55:29 2016 +0300 Merge branch 'master' into rabbitmq-server-500 Conflicts: src/rabbit_control_main.erl src/rabbit_types.erl commit b1ec9f30c4b896f227d3e8800d60b2934996f39e Author: Michael Klishin <mklishin@pivotal.io> Date: Mon Feb 15 13:51:37 2016 +0300 Stub out event handlers for #627 and #628 commit f3cfb57e2e83436e51d7d065edd04cc9197b6539 Author: Michael Klishin <michael@clojurewerkz.org> Date: Sat Feb 13 01:33:50 2016 +0300 Use a counter column to track number of connections per vhost Limit query time is now 50-70 microseconds for 50M connections. commit e9132f11253972ff6b0cc09529e9cd39ba67c140 Author: Michael Klishin <michael@clojurewerkz.org> Date: Fri Feb 12 06:23:51 2016 +0300 Ignore ./debug commit 976e3ae10ba7e81c3ead1d2bd1fc852b0f9e3004 Author: Michael Klishin <michael@clojurewerkz.org> Date: Fri Feb 12 06:20:01 2016 +0300 Switch to ets:select_count/2 commit ec23cf15ad840da46c321e965081815173989197 Author: Michael Klishin <michael@clojurewerkz.org> Date: Thu Feb 11 05:11:08 2016 +0300 Enforce max connection limit Also introduce `rabbitmqctl clear_vhost_limits` and fix rabbitmqctl(1). commit ba20887832170f9c9b88e19cad06e0995f887b5b Merge: 49a1886 8974581 Author: Michael Klishin <michael@clojurewerkz.org> Date: Thu Feb 11 02:16:24 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 49a18867faf6e38c08ad12635e716d7ff53b0529 Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Feb 10 16:45:00 2016 +0300 Spelling commit 723e6e4e412b81e47f3a9db5451a1342debcf79e Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Feb 10 16:31:34 2016 +0300 Create secondary indices on rabbit_tracked_connection.vhost and username commit b468c0fa05ce51cebff19098b500ccc217542f9f Merge: 6940d05 0120438 Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Feb 10 12:23:38 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 6940d059cf3f844b2aab9cfe00652f23f9a6ad06 Author: Michael Klishin <michael@clojurewerkz.org> Date: Mon Feb 8 01:30:03 2016 +0300 Spam commit 032c2a67f59889dbec6a3ca6c5af23920a0d7cb2 Merge: 46da39c 2374ae8 Author: Michael Klishin <michael@clojurewerkz.org> Date: Fri Feb 5 23:48:38 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 46da39c5f5e801939a61b476e577636d30eb6e54 Merge: 655e351 05361e6 Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Feb 3 11:20:08 2016 +0300 Merge branch 'master' into rabbitmq-server-500 commit 655e3512a031c5f57c8c09c5432a5d7671acc6af Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Feb 3 11:19:23 2016 +0300 Store and delete tracked connections in a table commit 4e849cf936f414a11bf5da3135e7a79c75413642 Author: Michael Klishin <michael@clojurewerkz.org> Date: Tue Jan 19 17:56:14 2016 +0300 Compile commit 504adde27cfbdb4f81ddd0e4e25abda23b5f138e Author: Michael Klishin <michael@clojurewerkz.org> Date: Tue Jan 19 17:55:08 2016 +0300 Switch to a handler for connection tracking (WIP) commit 3e1d2b4f65608e2c853ad7be0580050ee38fa4e7 Author: Michael Klishin <michael@clojurewerkz.org> Date: Tue Jan 19 14:46:09 2016 +0300 Migrations for virtual host limits and tracked connections commit 7499020af6fa8f2685cbee37ad77f189af6ee1e6 Author: Michael Klishin <michael@clojurewerkz.org> Date: Fri Jan 8 19:40:36 2016 +0300 Compile commit f3a11012f05d64d6d62fd5ec45a38fafaef47c49 Author: Michael Klishin <michael@clojurewerkz.org> Date: Fri Jan 8 19:14:12 2016 +0300 Switch rabbitmqctl set_vhost_limits to use JSON payload values Just like policies do. commit 7fc5f1a074ab5a34c33792a8ba25aa107eb0d993 Author: Michael Klishin <michael@clojurewerkz.org> Date: Wed Jan 6 19:07:50 2016 +0300 Stub out set_vhost_limits in ctl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_connection_tracker.erl99
-rw-r--r--src/rabbit_connection_tracking.erl229
-rw-r--r--src/rabbit_connection_tracking_handler.erl101
-rw-r--r--src/rabbit_control_main.erl15
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_node_monitor.erl4
-rw-r--r--src/rabbit_table.erl32
-rw-r--r--src/rabbit_upgrade_functions.erl39
-rw-r--r--src/rabbit_vhost.erl31
-rw-r--r--src/rabbit_vhost_limit.erl98
11 files changed, 650 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2fa4cdee71..59ede3c802 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -178,6 +178,11 @@
{mfa, {rabbit_direct, boot, []}},
{requires, log_relay}]}).
+-rabbit_boot_step({connection_tracker,
+ [{description, "helps track node-local connections"},
+ {mfa, {rabbit_connection_tracker, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay}]}).
diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl
new file mode 100644
index 0000000000..0177627d83
--- /dev/null
+++ b/src/rabbit_connection_tracker.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracker).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+
+-behaviour(gen_server2).
+
+%% API
+-export([boot/0, start_link/0, reregister/1]).
+
+%% gen_fsm callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+boot() ->
+ {ok, _} = start_link(),
+ ok.
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+reregister(Node) ->
+ rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]),
+ gen_server2:cast({?SERVER, Node}, reregister).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ {ok, {}}.
+
+handle_call(_Req, _From, State) ->
+ {noreply, State}.
+
+handle_cast(reregister, State) ->
+ Cs = rabbit_networking:connections_local(),
+ rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]),
+ case Cs of
+ [] -> ok;
+ Cs ->
+ [reregister_connection(C) || C <- Cs],
+ ok
+ end,
+ rabbit_log:info("Done re-registering client connections"),
+ {noreply, State}.
+
+handle_info(_Req, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+reregister_connection(Conn) ->
+ try
+ Conn ! reregister
+ catch _:Error ->
+ rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error])
+ end.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
new file mode 100644
index 0000000000..c65023a2a8
--- /dev/null
+++ b/src/rabbit_connection_tracking.erl
@@ -0,0 +1,229 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+
+-export([register_connection/1, unregister_connection/1,
+ list/0, list/1, list_on_node/1,
+ tracked_connection_from_connection_created/1,
+ tracked_connection_from_connection_state/1,
+ is_over_connection_limit/1, count_connections_in/1,
+ on_node_down/1, on_node_up/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(TABLE, rabbit_tracked_connection).
+-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost).
+-define(SERVER, ?MODULE).
+
+%%
+%% API
+%%
+
+-spec register_connection(rabbit_types:tracked_connection()) -> ok.
+
+register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ %% upsert
+ case mnesia:dirty_read(?TABLE, ConnId) of
+ [] ->
+ mnesia:write(?TABLE, Conn, write),
+ mnesia:dirty_update_counter(
+ rabbit_tracked_connection_per_vhost, VHost, 1);
+ [_Row] ->
+ ok
+ end,
+ ok
+ end).
+
+-spec unregister_connection(rabbit_types:connection_name()) -> ok.
+
+unregister_connection(ConnId = {_Node, _Name}) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:dirty_read(?TABLE, ConnId) of
+ [] -> ok;
+ [Row] ->
+ mnesia:dirty_update_counter(
+ ?PER_VHOST_COUNTER_TABLE,
+ Row#tracked_connection.vhost, -1),
+ mnesia:delete({?TABLE, ConnId})
+ end
+ end).
+
+
+-spec list() -> [rabbit_types:tracked_connection()].
+
+list() ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}).
+
+
+-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list(VHost) ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}).
+
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node) ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}).
+
+
+-spec on_node_down(node()) -> ok.
+
+on_node_down(Node) ->
+ case lists:member(Node, nodes()) of
+ false ->
+ Cs = list_on_node(Node),
+ rabbit_log:info(
+ "Node ~p is down, unregistering ~p client connections~n",
+ [Node, length(Cs)]),
+ [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs],
+ ok;
+ true -> rabbit_log:info(
+ "Keeping ~s connections: the node is already back~n", [Node])
+ end.
+
+-spec on_node_up(node()) -> ok.
+on_node_up(Node) ->
+ rabbit_connection_tracker:reregister(Node),
+ ok.
+
+-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean().
+
+is_over_connection_limit(VirtualHost) ->
+ ConnectionCount = count_connections_in(VirtualHost),
+ case rabbit_vhost_limit:connection_limit(VirtualHost) of
+ undefined -> false;
+ {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of
+ %% 0 = no limit
+ {0, _} -> false;
+ %% the limit hasn't been reached
+ {_, false} -> false;
+ {_N, true} -> {true, Limit}
+ end
+ end.
+
+
+-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
+
+count_connections_in(VirtualHost) ->
+ try
+ case mnesia:transaction(
+ fun() ->
+ case mnesia:dirty_read(
+ {?PER_VHOST_COUNTER_TABLE,
+ VirtualHost}) of
+ [] -> 0;
+ [Val] ->
+ Val#tracked_connection_per_vhost.connection_count
+ end
+ end) of
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
+ end
+ catch
+ _:Err ->
+ rabbit_log:error(
+ "Failed to fetch number of connections in vhost ~p:~n~p~n",
+ [VirtualHost, Err]),
+ 0
+ end.
+
+%% Returns a #tracked_connection from connection_created
+%% event details.
+%%
+%% @see rabbit_connection_tracking_handler.
+tracked_connection_from_connection_created(EventDetails) ->
+ %% Example event:
+ %%
+ %% [{type,network},
+ %% {pid,<0.329.0>},
+ %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
+ %% {port,5672},
+ %% {peer_port,60998},
+ %% {host,{0,0,0,0,0,65535,32512,1}},
+ %% {peer_host,{0,0,0,0,0,65535,32512,1}},
+ %% {ssl,false},
+ %% {peer_cert_subject,''},
+ %% {peer_cert_issuer,''},
+ %% {peer_cert_validity,''},
+ %% {auth_mechanism,<<"PLAIN">>},
+ %% {ssl_protocol,''},
+ %% {ssl_key_exchange,''},
+ %% {ssl_cipher,''},
+ %% {ssl_hash,''},
+ %% {protocol,{0,9,1}},
+ %% {user,<<"guest">>},
+ %% {vhost,<<"/">>},
+ %% {timeout,14},
+ %% {frame_max,131072},
+ %% {channel_max,65535},
+ %% {client_properties,
+ %% [{<<"capabilities">>,table,
+ %% [{<<"publisher_confirms">>,bool,true},
+ %% {<<"consumer_cancel_notify">>,bool,true},
+ %% {<<"exchange_exchange_bindings">>,bool,true},
+ %% {<<"basic.nack">>,bool,true},
+ %% {<<"connection.blocked">>,bool,true},
+ %% {<<"authentication_failure_close">>,bool,true}]},
+ %% {<<"product">>,longstr,<<"Bunny">>},
+ %% {<<"platform">>,longstr,
+ %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
+ %% {<<"version">>,longstr,<<"2.3.0.pre">>},
+ %% {<<"information">>,longstr,
+ %% <<"http://rubybunny.info">>}]},
+ %% {connected_at,1453214290847}]
+ Name = proplists:get_value(name, EventDetails),
+ Node = proplists:get_value(node, EventDetails),
+ #tracked_connection{id = {Node, Name},
+ name = Name,
+ node = Node,
+ vhost = proplists:get_value(vhost, EventDetails),
+ username = proplists:get_value(user, EventDetails),
+ connected_at = proplists:get_value(connected_at, EventDetails),
+ pid = proplists:get_value(pid, EventDetails),
+ peer_host = proplists:get_value(peer_host, EventDetails),
+ peer_port = proplists:get_value(peer_port, EventDetails)}.
+
+tracked_connection_from_connection_state(#connection{
+ vhost = VHost,
+ connected_at = Ts,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ user = Username,
+ name = Name
+ }) ->
+ tracked_connection_from_connection_created(
+ [{name, Name},
+ {node, node()},
+ {vhost, VHost},
+ {user, Username},
+ {connected_at, Ts},
+ {pid, self()},
+ {peer_port, PeerPort},
+ {peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
new file mode 100644
index 0000000000..deaf9bc7f6
--- /dev/null
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking_handler).
+
+%% This module keeps track of connection creation and termination events
+%% on its local node. The primary goal here is to decouple connection
+%% tracking from rabbit_reader in rabbit_common.
+%%
+%% Events from other nodes are ignored.
+
+%% This module keeps track of connection creation and termination events
+%% on its local node. The primary goal here is to decouple connection
+%% tracking from rabbit_reader in rabbit_common.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "connection tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [rabbit_event, rabbit_node_monitor]},
+ {enables, recovery}]}).
+
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = connection_created, props = Details}, State) ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
+ ),
+ {ok, State};
+%% see rabbit_reader
+handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState)
+ ),
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ rabbit_connection_tracking:unregister_connection(
+ {proplists:get_value(node, Details),
+ proplists:get_value(name, Details)}),
+ {ok, State};
+handle_event(#event{type = vhost_deleted, props = Details}, State) ->
+ VHost = proplists:get_value(name, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
+ case rabbit_connection_tracking:list(VHost) of
+ [] -> {ok, State};
+ Cs ->
+ [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || #tracked_connection{pid = Pid} <- Cs],
+ {ok, State}
+ end;
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Username = proplists:get_value(name, Details),
+ %% TODO: force close and unregister connections from
+ %% this user. Moved to rabbitmq/rabbitmq-server#628.
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f410ac752..c56e519fe5 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -74,6 +74,9 @@
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
+ {set_vhost_limits, [?VHOST_DEF]},
+ {clear_vhost_limits, [?VHOST_DEF]},
+
{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -544,6 +547,18 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+action(set_vhost_limits, Node, [Defn], Opts, Inform) ->
+ Msg = "Setting vhost limits for vhost ~p",
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform(Msg, [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]),
+ ok;
+
+action(clear_vhost_limits, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing vhost ~p limits", [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]);
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 26a864f0f5..43d268c374 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -466,12 +466,14 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
{[], true, disc} ->
%% First disc node up
maybe_force_load(),
+ ok = rabbit_table:ensure_secondary_indices(),
ok;
{[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
maybe_force_load(),
ok = rabbit_table:wait_for_replicated(),
- ok = rabbit_table:create_local_copy(NodeType)
+ ok = rabbit_table:create_local_copy(NodeType),
+ ok = rabbit_table:ensure_secondary_indices()
end,
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0322aacfd1..9ed790c75d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
+ ok = rabbit_connection_tracking:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -760,7 +761,8 @@ ensure_keepalive_timer(State) ->
handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node).
+ ok = rabbit_mnesia:on_node_up(Node),
+ ok = rabbit_connection_tracking:on_node_up(Node).
maybe_autoheal(State = #state{partitions = []}) ->
State;
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 3909096964..60996c1539 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -18,7 +18,8 @@
-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]).
+ check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0,
+ ensure_secondary_indices/0, ensure_secondary_indices/2]).
-include("rabbit.hrl").
@@ -50,6 +51,7 @@ create() ->
Tab, TabDef1, Reason}})
end
end, definitions()),
+ ok = rabbit_table:ensure_secondary_indices(),
ok.
%% The sequence in which we delete the schema and then the other
@@ -63,6 +65,13 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
+ensure_secondary_indices() ->
+ ensure_secondary_indices(rabbit_tracked_connection, [vhost, username]),
+ ok.
+
+ensure_secondary_indices(Tab, Fields) ->
+ [mnesia:add_table_index(Tab, Field) || Field <- Fields].
+
wait_for_replicated() ->
wait([Tab || {Tab, TabDef} <- definitions(),
not lists:member({local_content, true}, TabDef)]).
@@ -297,9 +306,24 @@ definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
- ++ gm:table_definitions()
- ++ mirrored_supervisor:table_definitions().
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
+
+ %% Used to track connections across virtual hosts
+ %% e.g. so that limits can be enforced.
+ %%
+ %% All data in this table is transient.
+ {rabbit_tracked_connection,
+ [{record_name, tracked_connection},
+ {attributes, record_info(fields, tracked_connection)},
+ {match, #tracked_connection{_ = '_'}}]},
+
+ {rabbit_tracked_connection_per_vhost,
+ [{record_name, tracked_connection_per_vhost},
+ {attributes, record_info(fields, tracked_connection_per_vhost)},
+ {match, #tracked_connection_per_vhost{_ = '_'}}]}
+
+ ] ++ gm:table_definitions()
+ ++ mirrored_supervisor:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3d624752ea..47053fafe7 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -55,6 +55,9 @@
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
+-rabbit_upgrade({vhost_limits, mnesia, []}).
+-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}).
+-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}).
%% -------------------------------------------------------------------
@@ -88,9 +91,36 @@
-spec queue_state() -> 'ok'.
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
+-spec vhost_limits() -> 'ok'.
+-spec tracked_connection() -> 'ok'.
+-spec tracked_connection_per_vhost() -> 'ok'.
+
%%--------------------------------------------------------------------
+tracked_connection() ->
+ create(rabbit_tracked_connection, [{record_name, tracked_connection},
+ {attributes, [id, node, vhost, name,
+ pid, protocol,
+ peer_host, peer_port,
+ username, connected_at]}]).
+
+tracked_connection_per_vhost() ->
+ create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost},
+ {attributes, [vhost, connection_count]}]).
+
+%% replaces vhost.dummy (used to avoid having a single-field record
+%% which Mnesia doesn't like) with vhost.limits (which is actually
+%% used)
+vhost_limits() ->
+ io:format("vhost_limits vhost_limits vhost_limits~n"),
+ transform(
+ rabbit_vhost,
+ fun ({vhost, VHost, _Dummy}) ->
+ {vhost, VHost, undefined}
+ end,
+ [virtual_host, limits]).
+
%% It's a bad idea to use records or record_info here, even for the
%% destination form. Because in the future, the destination form of
%% your current transform may not match the record any more, and it
@@ -510,6 +540,11 @@ create(Tab, TabDef) ->
{atomic, ok} = mnesia:create_table(Tab, TabDef),
ok.
+create(Tab, TabDef, SecondaryIndices) ->
+ {atomic, ok} = mnesia:create_table(Tab, TabDef),
+ [mnesia:add_table_index(Tab, Idx) || Idx <- SecondaryIndices],
+ ok.
+
%% Dumb replacement for rabbit_exchange:declare that does not require
%% the exchange type registry or worker pool to be running by dint of
%% not validating anything and assuming the exchange type does not
@@ -518,3 +553,7 @@ create(Tab, TabDef) ->
declare_exchange(XName, Type) ->
X = {exchange, XName, Type, true, false, false, []},
ok = mnesia:dirty_write(rabbit_durable_exchange, X).
+
+add_indices(Tab, FieldList) ->
+ [mnesia:add_table_index(Tab, Field) || Field <- FieldList],
+ ok.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index df2f8423b4..01f1046fb8 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,11 +20,14 @@
%%----------------------------------------------------------------------------
--export([add/1, delete/1, exists/1, list/0, with/2, assert/1]).
+-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
+ set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
+
-spec add(rabbit_types:vhost()) -> 'ok'.
-spec delete(rabbit_types:vhost()) -> 'ok'.
+-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
-spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
@@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of
false -> throw({error, {no_such_vhost, VHostPath}})
end.
+update(VHostPath, Fun) ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [V] ->
+ V1 = Fun(V),
+ ok = mnesia:write(rabbit_vhost, V1, write),
+ V1
+ end.
+
+limits_of(VHostPath) when is_binary(VHostPath) ->
+ assert(VHostPath),
+ case mnesia:dirty_read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [#vhost{limits = Limits}] ->
+ Limits
+ end;
+limits_of(#vhost{virtual_host = Name}) ->
+ limits_of(Name).
+
+set_limits(VHost = #vhost{}, undefined) ->
+ VHost#vhost{limits = undefined};
+set_limits(VHost = #vhost{}, Limits) ->
+ VHost#vhost{limits = Limits}.
+
%%----------------------------------------------------------------------------
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
new file mode 100644
index 0000000000..dd7ce51089
--- /dev/null
+++ b/src/rabbit_vhost_limit.erl
@@ -0,0 +1,98 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_limit).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-export([register/0]).
+-export([parse_set/2, clear/1]).
+-export([validate/5, notify/4, notify_clear/3]).
+-export([connection_limit/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "vhost limit parameters"},
+ {mfa, {rabbit_vhost_limit, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+%%----------------------------------------------------------------------------
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE).
+
+validate(_VHost, <<"vhost-limits">>, Name, Term, _User) ->
+ rabbit_parameter_validation:proplist(
+ Name, vhost_limit_validation(), Term).
+
+notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) ->
+ rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]),
+ update_vhost(VHost, Limits).
+
+notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
+ rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]),
+ update_vhost(VHost, undefined).
+
+connection_limit(VirtualHost) ->
+ get_limit(VirtualHost, <<"max-connections">>).
+
+%%----------------------------------------------------------------------------
+
+parse_set(VHost, Defn) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set(VHost, rabbit_misc:json_to_term(JSON));
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Defn) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>,
+ <<"limits">>, Defn, none).
+
+clear(VHost) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>,
+ <<"limits">>).
+
+vhost_limit_validation() ->
+ [{<<"max-connections">>, fun rabbit_parameter_validation:number/2, mandatory}].
+
+update_vhost(VHostName, Limits) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_vhost:update(VHostName,
+ fun(VHost) ->
+ rabbit_vhost:set_limits(VHost, Limits)
+ end)
+ end),
+ ok.
+
+get_limit(VirtualHost, Limit) ->
+ case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of
+ [] -> undefined;
+ [Param] -> case pget(value, Param) of
+ undefined -> undefined;
+ Val -> case pget(Limit, Val) of
+ undefined -> undefined;
+ N when N =< 0 -> undefined;
+ N when N > 0 -> {ok, N}
+ end
+ end
+ end.