diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-08-31 03:31:08 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-08-31 03:31:08 +0300 |
| commit | 8765f2c939aefe864633e697f69878cea6802612 (patch) | |
| tree | 06ef8eec55eaff902ea2b39daa0f21cb90cb503d /src | |
| parent | c12d5a6c9a77a670614b04a64df73263d64789f7 (diff) | |
| parent | 6d9d506f453e38e1134235c57d365c91af5b6cc4 (diff) | |
| download | rabbitmq-server-git-8765f2c939aefe864633e697f69878cea6802612.tar.gz | |
Merge branch 'master' into rabbitmq-server-930
Conflicts:
src/rabbit_control_main.erl
src/rabbit_upgrade_functions.erl
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 337 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mnesia_rename.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_parameter_validation.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 99 |
10 files changed, 631 insertions, 4 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2fa4cdee71..5db2c40b66 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -178,6 +178,11 @@ {mfa, {rabbit_direct, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({connection_tracking, + [{description, "sets up internal storage for node-local connections"}, + {mfa, {rabbit_connection_tracking, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl new file mode 100644 index 0000000000..ab945abc2f --- /dev/null +++ b/src/rabbit_connection_tracking.erl @@ -0,0 +1,337 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-export([boot/0, + ensure_tracked_connections_table_for_node/1, + ensure_per_vhost_tracked_connections_table_for_node/1, + ensure_tracked_connections_table_for_this_node/0, + ensure_per_vhost_tracked_connections_table_for_this_node/0, + tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, + delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, + clear_tracked_connection_tables_for_this_node/0, + register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, + tracked_connection_from_connection_created/1, + tracked_connection_from_connection_state/1, + is_over_connection_limit/1, count_connections_in/1]). + +-include_lib("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +%% +%% API +%% + +-spec boot() -> ok. + +%% Sets up and resets connection tracking tables for this +%% node. +boot() -> + ensure_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for connection tracking on this node: ~p", + [tracked_connection_table_name_for(node())]), + ensure_per_vhost_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", + [tracked_connection_per_vhost_table_name_for(node())]), + clear_tracked_connection_tables_for_this_node(), + ok. + + +-spec ensure_tracked_connections_table_for_this_node() -> ok. + +ensure_tracked_connections_table_for_this_node() -> + ensure_tracked_connections_table_for_node(node()). + + +-spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok. + +ensure_per_vhost_tracked_connections_table_for_this_node() -> + ensure_per_vhost_tracked_connections_table_for_node(node()). + + +-spec ensure_tracked_connections_table_for_node(node()) -> ok. + +ensure_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_connection}, + {attributes, record_info(fields, tracked_connection)}]) of + {atomic, ok} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a tracked connection table for node ~p: ~p", [Node, Error]), + ok + end. + + +-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok. + +ensure_per_vhost_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_vhost_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost}, + {attributes, record_info(fields, tracked_connection_per_vhost)}]) of + {atomic, ok} -> ok; + {aborted, _} -> ok + %% TODO: propagate errors + end. + + +-spec clear_tracked_connection_tables_for_this_node() -> ok. + +clear_tracked_connection_tables_for_this_node() -> + case mnesia:clear_table(tracked_connection_table_name_for(node())) of + {atomic, ok} -> ok; + {aborted, _} -> ok + end, + case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of + {atomic, ok} -> ok; + {aborted, _} -> ok + end. + + +-spec delete_tracked_connections_table_for_node(node()) -> ok. + +delete_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_table_name_for(Node), + case mnesia:delete_table(TableName) of + {atomic, ok} -> ok; + {aborted, {no_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]), + ok + end. + + +-spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok. + +delete_per_vhost_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_vhost_table_name_for(Node), + case mnesia:delete_table(TableName) of + {atomic, ok} -> ok; + {aborted, {no_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]), + ok + end. + + +-spec tracked_connection_table_name_for(node()) -> atom(). + +tracked_connection_table_name_for(Node) -> + list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])). + +-spec tracked_connection_per_vhost_table_name_for(node()) -> atom(). + +tracked_connection_per_vhost_table_name_for(Node) -> + list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])). + + +-spec register_connection(rabbit_types:tracked_connection()) -> ok. + +register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + rabbit_misc:execute_mnesia_transaction( + fun() -> + %% upsert + case mnesia:dirty_read(TableName, ConnId) of + [] -> + mnesia:write(TableName, Conn, write), + mnesia:dirty_update_counter( + PerVhostTableName, VHost, 1); + [_Row] -> + ok + end, + ok + end). + +-spec unregister_connection(rabbit_types:connection_name()) -> ok. + +unregister_connection(ConnId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:dirty_read(TableName, ConnId) of + [] -> ok; + [Row] -> + mnesia:dirty_update_counter( + PerVhostTableName, Row#tracked_connection.vhost, -1), + mnesia:delete({TableName, ConnId}) + end + end). + + +-spec list() -> [rabbit_types:tracked_connection()]. + +list() -> + lists:foldl( + fun (Node, Acc) -> + Tab = tracked_connection_table_name_for(Node), + Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'}) + end, [], rabbit_mnesia:cluster_nodes(running)). + + +-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list(VHost) -> + lists:foldl( + fun (Node, Acc) -> + Tab = tracked_connection_table_name_for(Node), + Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'}) + end, [], rabbit_mnesia:cluster_nodes(running)). + + +-spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node) -> + try mnesia:dirty_match_object( + tracked_connection_table_name_for(Node), + #tracked_connection{_ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + +-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_connection_limit(VirtualHost) -> + case rabbit_vhost_limit:connection_limit(VirtualHost) of + %% no limit configured + undefined -> false; + %% with limit = 0, no connections are allowed + {ok, 0} -> {true, 0}; + {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> + ConnectionCount = count_connections_in(VirtualHost), + case ConnectionCount >= Limit of + false -> false; + true -> {true, Limit} + end; + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; + %% ignore non-integer limits + {ok, _Limit} -> false + end. + + +-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). + +count_connections_in(VirtualHost) -> + lists:foldl(fun (Node, Acc) -> + Tab = tracked_connection_per_vhost_table_name_for(Node), + try + N = case mnesia:transaction( + fun() -> + case mnesia:dirty_read({Tab, VirtualHost}) of + [] -> 0; + [Val] -> Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 + end, + Acc + N + catch _:Err -> + rabbit_log:error( + "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n", + [VirtualHost, Err, Node]), + Acc + end + end, 0, rabbit_mnesia:cluster_nodes(running)). + +%% Returns a #tracked_connection from connection_created +%% event details. +%% +%% @see rabbit_connection_tracking_handler. +tracked_connection_from_connection_created(EventDetails) -> + %% Example event: + %% + %% [{type,network}, + %% {pid,<0.329.0>}, + %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>}, + %% {port,5672}, + %% {peer_port,60998}, + %% {host,{0,0,0,0,0,65535,32512,1}}, + %% {peer_host,{0,0,0,0,0,65535,32512,1}}, + %% {ssl,false}, + %% {peer_cert_subject,''}, + %% {peer_cert_issuer,''}, + %% {peer_cert_validity,''}, + %% {auth_mechanism,<<"PLAIN">>}, + %% {ssl_protocol,''}, + %% {ssl_key_exchange,''}, + %% {ssl_cipher,''}, + %% {ssl_hash,''}, + %% {protocol,{0,9,1}}, + %% {user,<<"guest">>}, + %% {vhost,<<"/">>}, + %% {timeout,14}, + %% {frame_max,131072}, + %% {channel_max,65535}, + %% {client_properties, + %% [{<<"capabilities">>,table, + %% [{<<"publisher_confirms">>,bool,true}, + %% {<<"consumer_cancel_notify">>,bool,true}, + %% {<<"exchange_exchange_bindings">>,bool,true}, + %% {<<"basic.nack">>,bool,true}, + %% {<<"connection.blocked">>,bool,true}, + %% {<<"authentication_failure_close">>,bool,true}]}, + %% {<<"product">>,longstr,<<"Bunny">>}, + %% {<<"platform">>,longstr, + %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>}, + %% {<<"version">>,longstr,<<"2.3.0.pre">>}, + %% {<<"information">>,longstr, + %% <<"http://rubybunny.info">>}]}, + %% {connected_at,1453214290847}] + Name = pget(name, EventDetails), + Node = pget(node, EventDetails), + #tracked_connection{id = {Node, Name}, + name = Name, + node = Node, + vhost = pget(vhost, EventDetails), + username = pget(user, EventDetails), + connected_at = pget(connected_at, EventDetails), + pid = pget(pid, EventDetails), + peer_host = pget(peer_host, EventDetails), + peer_port = pget(peer_port, EventDetails)}. + +tracked_connection_from_connection_state(#connection{ + vhost = VHost, + connected_at = Ts, + peer_host = PeerHost, + peer_port = PeerPort, + user = Username, + name = Name + }) -> + tracked_connection_from_connection_created( + [{name, Name}, + {node, node()}, + {vhost, VHost}, + {user, Username}, + {connected_at, Ts}, + {pid, self()}, + {peer_port, PeerPort}, + {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl new file mode 100644 index 0000000000..fd1df8c88a --- /dev/null +++ b/src/rabbit_connection_tracking_handler.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking_handler). + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "connection tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [rabbit_event, rabbit_node_monitor]}, + {enables, recovery}]}). + + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = connection_created, props = Details}, State) -> + ThisNode = node(), + case pget(node, Details) of + ThisNode -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_created(Details) + ); + _OtherNode -> + %% ignore + ok + end, + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + ThisNode = node(), + case pget(node, Details) of + ThisNode -> + %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, + %% {pid,<0.1774.0>}, + %% {node, rabbit@hostname}] + rabbit_connection_tracking:unregister_connection( + {pget(node, Details), + pget(name, Details)}); + _OtherNode -> + %% ignore + ok + end, + {ok, State}; +handle_event(#event{type = vhost_deleted, props = Details}, State) -> + VHost = pget(name, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), + [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || + #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)], + {ok, State}; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Username = pget(name, Details), + %% TODO: force close and unregister connections from + %% this user. Moved to rabbitmq/rabbitmq-server#628. + {ok, State}; +%% A node had been deleted from the cluster. +handle_event(#event{type = node_deleted, props = Details}, State) -> + Node = pget(node, Details), + rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), + rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node), + rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 1ef2518691..f48f0349aa 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -77,6 +77,8 @@ {list_policies, [?VHOST_DEF]}, {list_operator_policies, [?VHOST_DEF]}, + {set_vhost_limits, [?VHOST_DEF]}, + {clear_vhost_limits, [?VHOST_DEF]}, {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -568,6 +570,16 @@ action(clear_operator_policy, Node, [Key], Opts, Inform) -> Inform("Clearing operator policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]); +action(set_vhost_limits, Node, [Defn], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Setting vhost limits for vhost ~p", [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]), + ok; + +action(clear_vhost_limits, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing vhost ~p limits", [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]); action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 596eb62b03..43dd2c3bb8 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -148,6 +148,7 @@ auto_cluster(TryNodes, NodeType) -> rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster0(Node), init_db_and_upgrade(DiscNodes, NodeType, true), + rabbit_connection_tracking:boot(), rabbit_node_monitor:notify_joined_cluster(); none -> rabbit_log:warning( @@ -194,6 +195,7 @@ join_cluster(DiscoveryNode, NodeType) -> [ClusterNodes, NodeType]), ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), + rabbit_connection_tracking:boot(), rabbit_node_monitor:notify_joined_cluster(), ok; {error, Reason} -> @@ -295,6 +297,9 @@ update_cluster_nodes(DiscoveryNode) -> %% the last or second to last after the node we're removing to go %% down forget_cluster_node(Node, RemoveWhenOffline) -> + forget_cluster_node(Node, RemoveWhenOffline, true). + +forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) -> case lists:member(Node, cluster_nodes(all)) of true -> ok; false -> e(not_a_cluster_node) @@ -306,6 +311,9 @@ forget_cluster_node(Node, RemoveWhenOffline) -> {false, true} -> rabbit_log:info( "Removing node ~p from cluster~n", [Node]), case remove_node_if_mnesia_running(Node) of + ok when EmitNodeDeletedEvent -> + rabbit_event:notify(node_deleted, [{node, Node}]), + ok; ok -> ok; {error, _} = Err -> throw(Err) end @@ -326,7 +334,10 @@ remove_node_offline_node(Node) -> %% they are loaded. rabbit_table:force_load(), rabbit_table:wait_for_replicated(), - forget_cluster_node(Node, false), + %% We skip the 'node_deleted' event because the + %% application is stopped and thus, rabbit_event is not + %% enabled. + forget_cluster_node(Node, false, false), force_load_next_boot() after stop_mnesia() diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index 0945e31522..0c3e7c2366 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -124,7 +124,13 @@ prepare(Node, NodeMapList) -> take_backup(Backup) -> start_mnesia(), - ok = mnesia:backup(Backup), + %% We backup only local tables: in particular, this excludes the + %% connection tracking tables which have no local replica. + LocalTables = mnesia:system_info(local_tables), + {ok, Name, _Nodes} = mnesia:activate_checkpoint([ + {max, LocalTables} + ]), + ok = mnesia:backup_checkpoint(Name, Backup), stop_mnesia(). restore_backup(Backup) -> diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index 90ab1d5286..00e83d2757 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -16,7 +16,7 @@ -module(rabbit_parameter_validation). --export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]). +-export([number/2, integer/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]). number(_Name, Term) when is_number(Term) -> ok; @@ -24,6 +24,12 @@ number(_Name, Term) when is_number(Term) -> number(Name, Term) -> {error, "~s should be number, actually was ~p", [Name, Term]}. +integer(_Name, Term) when is_integer(Term) -> + ok; + +integer(Name, Term) -> + {error, "~s should be number, actually was ~p", [Name, Term]}. + binary(_Name, Term) when is_binary(Term) -> ok; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1a2d3fab21..1b78173ade 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -56,6 +56,7 @@ -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). -rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}). +-rabbit_upgrade({vhost_limits, mnesia, []}). %% ------------------------------------------------------------------- @@ -89,9 +90,22 @@ -spec queue_state() -> 'ok'. -spec recoverable_slaves() -> 'ok'. -spec user_password_hashing() -> 'ok'. +-spec vhost_limits() -> 'ok'. + %%-------------------------------------------------------------------- +%% replaces vhost.dummy (used to avoid having a single-field record +%% which Mnesia doesn't like) with vhost.limits (which is actually +%% used) +vhost_limits() -> + transform( + rabbit_vhost, + fun ({vhost, VHost, _Dummy}) -> + {vhost, VHost, undefined} + end, + [virtual_host, limits]). + %% It's a bad idea to use records or record_info here, even for the %% destination form. Because in the future, the destination form of %% your current transform may not match the record any more, and it diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index df2f8423b4..01f1046fb8 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,14 @@ %%---------------------------------------------------------------------------- --export([add/1, delete/1, exists/1, list/0, with/2, assert/1]). +-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, + set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). + -spec add(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost()) -> 'ok'. +-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. -spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. @@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of false -> throw({error, {no_such_vhost, VHostPath}}) end. +update(VHostPath, Fun) -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [V] -> + V1 = Fun(V), + ok = mnesia:write(rabbit_vhost, V1, write), + V1 + end. + +limits_of(VHostPath) when is_binary(VHostPath) -> + assert(VHostPath), + case mnesia:dirty_read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [#vhost{limits = Limits}] -> + Limits + end; +limits_of(#vhost{virtual_host = Name}) -> + limits_of(Name). + +set_limits(VHost = #vhost{}, undefined) -> + VHost#vhost{limits = undefined}; +set_limits(VHost = #vhost{}, Limits) -> + VHost#vhost{limits = Limits}. + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl new file mode 100644 index 0000000000..2d9a2f075e --- /dev/null +++ b/src/rabbit_vhost_limit.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_limit). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-export([register/0]). +-export([parse_set/2, clear/1]). +-export([validate/5, notify/4, notify_clear/3]). +-export([connection_limit/1]). + +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "vhost limit parameters"}, + {mfa, {rabbit_vhost_limit, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +%%---------------------------------------------------------------------------- + +register() -> + rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE). + +validate(_VHost, <<"vhost-limits">>, Name, Term, _User) -> + rabbit_parameter_validation:proplist( + Name, vhost_limit_validation(), Term). + +notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) -> + rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]), + update_vhost(VHost, Limits). + +notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> + rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]), + update_vhost(VHost, undefined). + +connection_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-connections">>). + +%%---------------------------------------------------------------------------- + +parse_set(VHost, Defn) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set(VHost, rabbit_misc:json_to_term(JSON)); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Defn) -> + rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>, + <<"limits">>, Defn, none). + +clear(VHost) -> + rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>, + <<"limits">>). + +vhost_limit_validation() -> + [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}]. + +update_vhost(VHostName, Limits) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_vhost:update(VHostName, + fun(VHost) -> + rabbit_vhost:set_limits(VHost, Limits) + end) + end), + ok. + +get_limit(VirtualHost, Limit) -> + case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of + [] -> undefined; + [Param] -> case pget(value, Param) of + undefined -> undefined; + Val -> case pget(Limit, Val) of + undefined -> undefined; + %% no limit + N when N < 0 -> undefined; + N when N >= 0 -> {ok, N} + end + end + end. |
