diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-07-22 05:33:09 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-07-22 05:33:09 +0300 |
| commit | 9adb6cd23905ce01d987ec3dfdef515768b4d208 (patch) | |
| tree | c337af63b4e2c882bc1f82213988b348341291bc /src | |
| parent | 6ad34a433978b1cb6eaf4db481ac55e4382b3ef4 (diff) | |
| download | rabbitmq-server-git-9adb6cd23905ce01d987ec3dfdef515768b4d208.tar.gz | |
Keep track of connections, introduce per-vhost limits
Fixes #500, #627.
Squashed commit of the following:
commit 88036dccbb28828ceed39d793b13a2d3d3b99b80
Author: Michael Klishin <mklishin@pivotal.io>
Date: Thu Jul 21 03:31:25 2016 +0300
Refactor
commit fc84b7a23735352da4cf95726b430fad984b837d
Merge: df745e2 df28c63
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 20 18:30:19 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit df745e2544824b882d174b99d1d4470d05ac78c8
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 20 18:04:59 2016 +0300
Force close connections when vhost is deleted
Fixes #627, related to #500.
commit 2167f8ffebe9473af482816822bb30a0694a1f3e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 20 16:00:35 2016 +0300
Add tests for per-vhost connection limits
commit 2a032a3ac9cc3b01b07692456590e213e5d28806
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 20 01:53:07 2016 +0300
Rename a few tests
commit 86ce592db1516bb216d6dc45326ea80f55d14a30
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 20 01:44:10 2016 +0300
Tests for connection re-registration idempotency
commit a774c7bebe0d91c18af1a2035697c431dca28d89
Author: Michael Klishin <mklishin@pivotal.io>
Date: Tue Jul 19 04:05:20 2016 +0300
Ask nodes that come back to re-register their connections
Depending on the partition handling mode used there may or may not
be any clients still connected. We make sure that registration
and deregistration functions are idempotent and assume there
may be connections on the node that has come back.
Point of improvement: when a node comes back up, N-1 nodes
will tell it to re-register connections. It could be fewer
than N-1, ideally just 1.
commit 24e4c0e690f192d138e70000ada6335671275f0b
Author: Michael Klishin <mklishin@pivotal.io>
Date: Mon Jul 18 17:05:17 2016 +0300
Fix boot step
commit 62da3c6a73b96d4a07e69a9b1f5519ae708817a6
Author: Michael Klishin <mklishin@pivotal.io>
Date: Mon Jul 18 11:16:21 2016 +0300
Compile
commit b656f9e96bc4a55072c2092289c52e972c44f754
Merge: f2831e1 492406e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Thu Jul 14 15:25:49 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit f2831e14cd12fc242b5280326ce1cf28a1fc9766
Merge: e5858e9 7b10a4e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Thu Jul 7 13:45:31 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit e5858e971825d50032e41794f68692c3d9ffa381
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 6 12:32:56 2016 +0300
Towards working connection re-registration after (inter-node) network splits
commit 548df732f17d4e30268cef4c1b2046b8c03613ef
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jul 6 12:32:07 2016 +0300
Make network split simulation work as expected
commit 4028c660b96b31123f3a932d17d7b8a23b08cfb6
Author: Michael Klishin <mklishin@pivotal.io>
Date: Tue Jul 5 14:43:37 2016 +0300
Close connections using rabbit_ct_client_helpers
Per discussion with @dumbbell.
commit 26fecc97aa6c4368ecc7dba4464cca8f9ea08cfa
Author: Michael Klishin <mklishin@pivotal.io>
Date: Tue Jul 5 04:17:52 2016 +0300
Extract connection limit partition tests into a separate suite
commit 8a466f1b61e3cb07ba639d5f09d260223c0ff0a4
Author: Michael Klishin <mklishin@pivotal.io>
Date: Tue Jul 5 04:17:41 2016 +0300
Better logging
commit b06de9b26ee13742290f13b32e293a990c3d5192
Author: Michael Klishin <mklishin@pivotal.io>
Date: Mon Jul 4 02:54:54 2016 +0300
Modify a test so that it (expectedly) fails
commit 078a78ae00a88566e2b7068a63457e58e149e09e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Mon Jul 4 02:44:58 2016 +0300
Towards covering node termination/unavailability in connection tracking
commit ab99361041fd58371ddd0c1a76ab2a37a3c47142
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sun Jul 3 15:25:10 2016 +0300
These are moved to rabbit_ct_broker_helpers
commit 520b6ef2b268e263fe4a0d33fbc578343c7ebf83
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sun Jul 3 03:54:52 2016 +0300
{allow,block}_traffic_between/2 are moved to rabbit_ct_broker_helpers
commit b842eaa616d55ca813012d6afc3e3d9d85acb46f
Merge: 26eb1fa d4f031e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sun Jul 3 03:14:27 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 26eb1fa0ede083f67f7f7177064f0274ebcd8530
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sun Jul 3 02:39:09 2016 +0300
dist_proxy helpers moved to rabbit_ct_broker_helpers
commit 3d741f445be053222eaa73c973257114c17aea1c
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sun Jul 3 01:28:44 2016 +0300
Cluster node shutdown test
commit 57c7129edf583120d3f20702ea68a8c2a73cf136
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 23:01:46 2016 +0300
Refactor
commit b736b30724828027d77a34cddf9f4bcb17b1773d
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 22:49:42 2016 +0300
More tests
commit dc1cb5f0797cba5840fc1fa5e98a0d688383c713
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 22:27:16 2016 +0300
More tests
commit e94edfed7a19add20545f40cd9dc78562546e163
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 17:08:34 2016 +0300
Initial per-vhost connection limit tests
commit 15b7b4e271eedea9ce02bcbbd766609fa9fe970d
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 15:04:57 2016 +0300
Adapt to master, compile
commit dc7f3337a8a0a268c42857becdd92640deddb1a4
Merge: e4884ff bb1fa55
Author: Michael Klishin <mklishin@pivotal.io>
Date: Sat Jul 2 02:44:18 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit e4884ffb29452fcdbd11ace1ac7b4d1b7d506b03
Merge: 71e2710 f0f43f8
Author: Michael Klishin <mklishin@pivotal.io>
Date: Wed Jun 29 14:27:40 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 71e2710948d3a531c5426b18d63367fddf98ff55
Merge: b1ec9f3 704a2b5
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Thu Mar 31 01:55:29 2016 +0300
Merge branch 'master' into rabbitmq-server-500
Conflicts:
src/rabbit_control_main.erl
src/rabbit_types.erl
commit b1ec9f30c4b896f227d3e8800d60b2934996f39e
Author: Michael Klishin <mklishin@pivotal.io>
Date: Mon Feb 15 13:51:37 2016 +0300
Stub out event handlers for #627 and #628
commit f3cfb57e2e83436e51d7d065edd04cc9197b6539
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Sat Feb 13 01:33:50 2016 +0300
Use a counter column to track number of connections per vhost
Limit query time is now 50-70 microseconds for
50M connections.
commit e9132f11253972ff6b0cc09529e9cd39ba67c140
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Fri Feb 12 06:23:51 2016 +0300
Ignore ./debug
commit 976e3ae10ba7e81c3ead1d2bd1fc852b0f9e3004
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Fri Feb 12 06:20:01 2016 +0300
Switch to ets:select_count/2
commit ec23cf15ad840da46c321e965081815173989197
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Thu Feb 11 05:11:08 2016 +0300
Enforce max connection limit
Also introduce `rabbitmqctl clear_vhost_limits`
and fix rabbitmqctl(1).
commit ba20887832170f9c9b88e19cad06e0995f887b5b
Merge: 49a1886 8974581
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Thu Feb 11 02:16:24 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 49a18867faf6e38c08ad12635e716d7ff53b0529
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Feb 10 16:45:00 2016 +0300
Spelling
commit 723e6e4e412b81e47f3a9db5451a1342debcf79e
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Feb 10 16:31:34 2016 +0300
Create secondary indices on rabbit_tracked_connection.vhost and username
commit b468c0fa05ce51cebff19098b500ccc217542f9f
Merge: 6940d05 0120438
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Feb 10 12:23:38 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 6940d059cf3f844b2aab9cfe00652f23f9a6ad06
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Mon Feb 8 01:30:03 2016 +0300
Spam
commit 032c2a67f59889dbec6a3ca6c5af23920a0d7cb2
Merge: 46da39c 2374ae8
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Fri Feb 5 23:48:38 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 46da39c5f5e801939a61b476e577636d30eb6e54
Merge: 655e351 05361e6
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Feb 3 11:20:08 2016 +0300
Merge branch 'master' into rabbitmq-server-500
commit 655e3512a031c5f57c8c09c5432a5d7671acc6af
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Feb 3 11:19:23 2016 +0300
Store and delete tracked connections in a table
commit 4e849cf936f414a11bf5da3135e7a79c75413642
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Tue Jan 19 17:56:14 2016 +0300
Compile
commit 504adde27cfbdb4f81ddd0e4e25abda23b5f138e
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Tue Jan 19 17:55:08 2016 +0300
Switch to a handler for connection tracking (WIP)
commit 3e1d2b4f65608e2c853ad7be0580050ee38fa4e7
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Tue Jan 19 14:46:09 2016 +0300
Migrations for virtual host limits and tracked connections
commit 7499020af6fa8f2685cbee37ad77f189af6ee1e6
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Fri Jan 8 19:40:36 2016 +0300
Compile
commit f3a11012f05d64d6d62fd5ec45a38fafaef47c49
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Fri Jan 8 19:14:12 2016 +0300
Switch rabbitmqctl set_vhost_limits to use JSON payload values
Just like policies do.
commit 7fc5f1a074ab5a34c33792a8ba25aa107eb0d993
Author: Michael Klishin <michael@clojurewerkz.org>
Date: Wed Jan 6 19:07:50 2016 +0300
Stub out set_vhost_limits in ctl
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_connection_tracker.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 229 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 98 |
11 files changed, 650 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2fa4cdee71..59ede3c802 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -178,6 +178,11 @@ {mfa, {rabbit_direct, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({connection_tracker, + [{description, "helps track node-local connections"}, + {mfa, {rabbit_connection_tracker, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl new file mode 100644 index 0000000000..0177627d83 --- /dev/null +++ b/src/rabbit_connection_tracker.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracker). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-behaviour(gen_server2). + +%% API +-export([boot/0, start_link/0, reregister/1]). + +%% gen_fsm callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + + +%%%=================================================================== +%%% API +%%%=================================================================== + +boot() -> + {ok, _} = start_link(), + ok. + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +reregister(Node) -> + rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]), + gen_server2:cast({?SERVER, Node}, reregister). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + {ok, {}}. + +handle_call(_Req, _From, State) -> + {noreply, State}. + +handle_cast(reregister, State) -> + Cs = rabbit_networking:connections_local(), + rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]), + case Cs of + [] -> ok; + Cs -> + [reregister_connection(C) || C <- Cs], + ok + end, + rabbit_log:info("Done re-registering client connections"), + {noreply, State}. + +handle_info(_Req, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +reregister_connection(Conn) -> + try + Conn ! reregister + catch _:Error -> + rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error]) + end. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl new file mode 100644 index 0000000000..c65023a2a8 --- /dev/null +++ b/src/rabbit_connection_tracking.erl @@ -0,0 +1,229 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-export([register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, + tracked_connection_from_connection_created/1, + tracked_connection_from_connection_state/1, + is_over_connection_limit/1, count_connections_in/1, + on_node_down/1, on_node_up/1]). + +-include_lib("rabbit.hrl"). + +-define(TABLE, rabbit_tracked_connection). +-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost). +-define(SERVER, ?MODULE). + +%% +%% API +%% + +-spec register_connection(rabbit_types:tracked_connection()) -> ok. + +register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + %% upsert + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> + mnesia:write(?TABLE, Conn, write), + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, VHost, 1); + [_Row] -> + ok + end, + ok + end). + +-spec unregister_connection(rabbit_types:connection_name()) -> ok. + +unregister_connection(ConnId = {_Node, _Name}) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> ok; + [Row] -> + mnesia:dirty_update_counter( + ?PER_VHOST_COUNTER_TABLE, + Row#tracked_connection.vhost, -1), + mnesia:delete({?TABLE, ConnId}) + end + end). + + +-spec list() -> [rabbit_types:tracked_connection()]. + +list() -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}). + + +-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list(VHost) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}). + + +-spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}). + + +-spec on_node_down(node()) -> ok. + +on_node_down(Node) -> + case lists:member(Node, nodes()) of + false -> + Cs = list_on_node(Node), + rabbit_log:info( + "Node ~p is down, unregistering ~p client connections~n", + [Node, length(Cs)]), + [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs], + ok; + true -> rabbit_log:info( + "Keeping ~s connections: the node is already back~n", [Node]) + end. + +-spec on_node_up(node()) -> ok. +on_node_up(Node) -> + rabbit_connection_tracker:reregister(Node), + ok. + +-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). + +is_over_connection_limit(VirtualHost) -> + ConnectionCount = count_connections_in(VirtualHost), + case rabbit_vhost_limit:connection_limit(VirtualHost) of + undefined -> false; + {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of + %% 0 = no limit + {0, _} -> false; + %% the limit hasn't been reached + {_, false} -> false; + {_N, true} -> {true, Limit} + end + end. + + +-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). + +count_connections_in(VirtualHost) -> + try + case mnesia:transaction( + fun() -> + case mnesia:dirty_read( + {?PER_VHOST_COUNTER_TABLE, + VirtualHost}) of + [] -> 0; + [Val] -> + Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 + end + catch + _:Err -> + rabbit_log:error( + "Failed to fetch number of connections in vhost ~p:~n~p~n", + [VirtualHost, Err]), + 0 + end. + +%% Returns a #tracked_connection from connection_created +%% event details. +%% +%% @see rabbit_connection_tracking_handler. +tracked_connection_from_connection_created(EventDetails) -> + %% Example event: + %% + %% [{type,network}, + %% {pid,<0.329.0>}, + %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>}, + %% {port,5672}, + %% {peer_port,60998}, + %% {host,{0,0,0,0,0,65535,32512,1}}, + %% {peer_host,{0,0,0,0,0,65535,32512,1}}, + %% {ssl,false}, + %% {peer_cert_subject,''}, + %% {peer_cert_issuer,''}, + %% {peer_cert_validity,''}, + %% {auth_mechanism,<<"PLAIN">>}, + %% {ssl_protocol,''}, + %% {ssl_key_exchange,''}, + %% {ssl_cipher,''}, + %% {ssl_hash,''}, + %% {protocol,{0,9,1}}, + %% {user,<<"guest">>}, + %% {vhost,<<"/">>}, + %% {timeout,14}, + %% {frame_max,131072}, + %% {channel_max,65535}, + %% {client_properties, + %% [{<<"capabilities">>,table, + %% [{<<"publisher_confirms">>,bool,true}, + %% {<<"consumer_cancel_notify">>,bool,true}, + %% {<<"exchange_exchange_bindings">>,bool,true}, + %% {<<"basic.nack">>,bool,true}, + %% {<<"connection.blocked">>,bool,true}, + %% {<<"authentication_failure_close">>,bool,true}]}, + %% {<<"product">>,longstr,<<"Bunny">>}, + %% {<<"platform">>,longstr, + %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>}, + %% {<<"version">>,longstr,<<"2.3.0.pre">>}, + %% {<<"information">>,longstr, + %% <<"http://rubybunny.info">>}]}, + %% {connected_at,1453214290847}] + Name = proplists:get_value(name, EventDetails), + Node = proplists:get_value(node, EventDetails), + #tracked_connection{id = {Node, Name}, + name = Name, + node = Node, + vhost = proplists:get_value(vhost, EventDetails), + username = proplists:get_value(user, EventDetails), + connected_at = proplists:get_value(connected_at, EventDetails), + pid = proplists:get_value(pid, EventDetails), + peer_host = proplists:get_value(peer_host, EventDetails), + peer_port = proplists:get_value(peer_port, EventDetails)}. + +tracked_connection_from_connection_state(#connection{ + vhost = VHost, + connected_at = Ts, + peer_host = PeerHost, + peer_port = PeerPort, + user = Username, + name = Name + }) -> + tracked_connection_from_connection_created( + [{name, Name}, + {node, node()}, + {vhost, VHost}, + {user, Username}, + {connected_at, Ts}, + {pid, self()}, + {peer_port, PeerPort}, + {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl new file mode 100644 index 0000000000..deaf9bc7f6 --- /dev/null +++ b/src/rabbit_connection_tracking_handler.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking_handler). + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "connection tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [rabbit_event, rabbit_node_monitor]}, + {enables, recovery}]}). + + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = connection_created, props = Details}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_created(Details) + ), + {ok, State}; +%% see rabbit_reader +handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState) + ), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, + %% {pid,<0.1774.0>}, + %% {node, rabbit@hostname}] + rabbit_connection_tracking:unregister_connection( + {proplists:get_value(node, Details), + proplists:get_value(name, Details)}), + {ok, State}; +handle_event(#event{type = vhost_deleted, props = Details}, State) -> + VHost = proplists:get_value(name, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), + case rabbit_connection_tracking:list(VHost) of + [] -> {ok, State}; + Cs -> + [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || #tracked_connection{pid = Pid} <- Cs], + {ok, State} + end; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Username = proplists:get_value(name, Details), + %% TODO: force close and unregister connections from + %% this user. Moved to rabbitmq/rabbitmq-server#628. + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 7f410ac752..c56e519fe5 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -74,6 +74,9 @@ {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, + {set_vhost_limits, [?VHOST_DEF]}, + {clear_vhost_limits, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -544,6 +547,18 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); +action(set_vhost_limits, Node, [Defn], Opts, Inform) -> + Msg = "Setting vhost limits for vhost ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(Msg, [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]), + ok; + +action(clear_vhost_limits, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing vhost ~p limits", [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 26a864f0f5..43d268c374 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -466,12 +466,14 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> {[], true, disc} -> %% First disc node up maybe_force_load(), + ok = rabbit_table:ensure_secondary_indices(), ok; {[_ | _], _, _} -> %% Subsequent node in cluster, catch up maybe_force_load(), ok = rabbit_table:wait_for_replicated(), - ok = rabbit_table:create_local_copy(NodeType) + ok = rabbit_table:create_local_copy(NodeType), + ok = rabbit_table:ensure_secondary_indices() end, ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0322aacfd1..9ed790c75d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok = rabbit_connection_tracking:on_node_down(Node), %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -760,7 +761,8 @@ ensure_keepalive_timer(State) -> handle_live_rabbit(Node) -> ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). + ok = rabbit_mnesia:on_node_up(Node), + ok = rabbit_connection_tracking:on_node_up(Node). maybe_autoheal(State = #state{partitions = []}) -> State; diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 3909096964..60996c1539 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -18,7 +18,8 @@ -export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, force_load/0, is_present/0, is_empty/0, needs_default_data/0, - check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]). + check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0, + ensure_secondary_indices/0, ensure_secondary_indices/2]). -include("rabbit.hrl"). @@ -50,6 +51,7 @@ create() -> Tab, TabDef1, Reason}}) end end, definitions()), + ok = rabbit_table:ensure_secondary_indices(), ok. %% The sequence in which we delete the schema and then the other @@ -63,6 +65,13 @@ create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). +ensure_secondary_indices() -> + ensure_secondary_indices(rabbit_tracked_connection, [vhost, username]), + ok. + +ensure_secondary_indices(Tab, Fields) -> + [mnesia:add_table_index(Tab, Field) || Field <- Fields]. + wait_for_replicated() -> wait([Tab || {Tab, TabDef} <- definitions(), not lists:member({local_content, true}, TabDef)]). @@ -297,9 +306,24 @@ definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}] - ++ gm:table_definitions() - ++ mirrored_supervisor:table_definitions(). + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + + %% Used to track connections across virtual hosts + %% e.g. so that limits can be enforced. + %% + %% All data in this table is transient. + {rabbit_tracked_connection, + [{record_name, tracked_connection}, + {attributes, record_info(fields, tracked_connection)}, + {match, #tracked_connection{_ = '_'}}]}, + + {rabbit_tracked_connection_per_vhost, + [{record_name, tracked_connection_per_vhost}, + {attributes, record_info(fields, tracked_connection_per_vhost)}, + {match, #tracked_connection_per_vhost{_ = '_'}}]} + + ] ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3d624752ea..47053fafe7 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -55,6 +55,9 @@ -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). +-rabbit_upgrade({vhost_limits, mnesia, []}). +-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}). +-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}). %% ------------------------------------------------------------------- @@ -88,9 +91,36 @@ -spec queue_state() -> 'ok'. -spec recoverable_slaves() -> 'ok'. -spec user_password_hashing() -> 'ok'. +-spec vhost_limits() -> 'ok'. +-spec tracked_connection() -> 'ok'. +-spec tracked_connection_per_vhost() -> 'ok'. + %%-------------------------------------------------------------------- +tracked_connection() -> + create(rabbit_tracked_connection, [{record_name, tracked_connection}, + {attributes, [id, node, vhost, name, + pid, protocol, + peer_host, peer_port, + username, connected_at]}]). + +tracked_connection_per_vhost() -> + create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost}, + {attributes, [vhost, connection_count]}]). + +%% replaces vhost.dummy (used to avoid having a single-field record +%% which Mnesia doesn't like) with vhost.limits (which is actually +%% used) +vhost_limits() -> + io:format("vhost_limits vhost_limits vhost_limits~n"), + transform( + rabbit_vhost, + fun ({vhost, VHost, _Dummy}) -> + {vhost, VHost, undefined} + end, + [virtual_host, limits]). + %% It's a bad idea to use records or record_info here, even for the %% destination form. Because in the future, the destination form of %% your current transform may not match the record any more, and it @@ -510,6 +540,11 @@ create(Tab, TabDef) -> {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. +create(Tab, TabDef, SecondaryIndices) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + [mnesia:add_table_index(Tab, Idx) || Idx <- SecondaryIndices], + ok. + %% Dumb replacement for rabbit_exchange:declare that does not require %% the exchange type registry or worker pool to be running by dint of %% not validating anything and assuming the exchange type does not @@ -518,3 +553,7 @@ create(Tab, TabDef) -> declare_exchange(XName, Type) -> X = {exchange, XName, Type, true, false, false, []}, ok = mnesia:dirty_write(rabbit_durable_exchange, X). + +add_indices(Tab, FieldList) -> + [mnesia:add_table_index(Tab, Field) || Field <- FieldList], + ok. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index df2f8423b4..01f1046fb8 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,14 @@ %%---------------------------------------------------------------------------- --export([add/1, delete/1, exists/1, list/0, with/2, assert/1]). +-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, + set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). + -spec add(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost()) -> 'ok'. +-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. -spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. @@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of false -> throw({error, {no_such_vhost, VHostPath}}) end. +update(VHostPath, Fun) -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [V] -> + V1 = Fun(V), + ok = mnesia:write(rabbit_vhost, V1, write), + V1 + end. + +limits_of(VHostPath) when is_binary(VHostPath) -> + assert(VHostPath), + case mnesia:dirty_read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [#vhost{limits = Limits}] -> + Limits + end; +limits_of(#vhost{virtual_host = Name}) -> + limits_of(Name). + +set_limits(VHost = #vhost{}, undefined) -> + VHost#vhost{limits = undefined}; +set_limits(VHost = #vhost{}, Limits) -> + VHost#vhost{limits = Limits}. + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl new file mode 100644 index 0000000000..dd7ce51089 --- /dev/null +++ b/src/rabbit_vhost_limit.erl @@ -0,0 +1,98 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_limit). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-export([register/0]). +-export([parse_set/2, clear/1]). +-export([validate/5, notify/4, notify_clear/3]). +-export([connection_limit/1]). + +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "vhost limit parameters"}, + {mfa, {rabbit_vhost_limit, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +%%---------------------------------------------------------------------------- + +register() -> + rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE). + +validate(_VHost, <<"vhost-limits">>, Name, Term, _User) -> + rabbit_parameter_validation:proplist( + Name, vhost_limit_validation(), Term). + +notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) -> + rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]), + update_vhost(VHost, Limits). + +notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> + rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]), + update_vhost(VHost, undefined). + +connection_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-connections">>). + +%%---------------------------------------------------------------------------- + +parse_set(VHost, Defn) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set(VHost, rabbit_misc:json_to_term(JSON)); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Defn) -> + rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>, + <<"limits">>, Defn, none). + +clear(VHost) -> + rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>, + <<"limits">>). + +vhost_limit_validation() -> + [{<<"max-connections">>, fun rabbit_parameter_validation:number/2, mandatory}]. + +update_vhost(VHostName, Limits) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_vhost:update(VHostName, + fun(VHost) -> + rabbit_vhost:set_limits(VHost, Limits) + end) + end), + ok. + +get_limit(VirtualHost, Limit) -> + case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of + [] -> undefined; + [Param] -> case pget(value, Param) of + undefined -> undefined; + Val -> case pget(Limit, Val) of + undefined -> undefined; + N when N =< 0 -> undefined; + N when N > 0 -> {ok, N} + end + end + end. |
