diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_connection_tracker.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 229 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 98 |
11 files changed, 650 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2fa4cdee71..59ede3c802 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -178,6 +178,11 @@ {mfa, {rabbit_direct, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({connection_tracker, + [{description, "helps track node-local connections"}, + {mfa, {rabbit_connection_tracker, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl new file mode 100644 index 0000000000..0177627d83 --- /dev/null +++ b/src/rabbit_connection_tracker.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracker). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-behaviour(gen_server2). + +%% API +-export([boot/0, start_link/0, reregister/1]). + +%% gen_fsm callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + + +%%%=================================================================== +%%% API +%%%=================================================================== + +boot() -> + {ok, _} = start_link(), + ok. + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +reregister(Node) -> + rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]), + gen_server2:cast({?SERVER, Node}, reregister). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + {ok, {}}. + +handle_call(_Req, _From, State) -> + {noreply, State}. + +handle_cast(reregister, State) -> + Cs = rabbit_networking:connections_local(), + rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]), + case Cs of + [] -> ok; + Cs -> + [reregister_connection(C) || C <- Cs], + ok + end, + rabbit_log:info("Done re-registering client connections"), + {noreply, State}. + +handle_info(_Req, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +reregister_connection(Conn) -> + try + Conn ! reregister + catch _:Error -> + rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error]) + end. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl new file mode 100644 index 0000000000..c65023a2a8 --- /dev/null +++ b/src/rabbit_connection_tracking.erl @@ -0,0 +1,229 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-export([register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, + tracked_connection_from_connection_created/1, + tracked_connection_from_connection_state/1, + is_over_connection_limit/1, count_connections_in/1, + on_node_down/1, on_node_up/1]). + +-include_lib("rabbit.hrl"). + +-define(TABLE, rabbit_tracked_connection). +-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost). +-define(SERVER, ?MODULE). + +%% +%% API +%% + +-spec register_connection(rabbit_types:tracked_connection()) -> ok. + +register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + %% upsert + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> + mnesia:write(?TABLE, Conn, write), + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, VHost, 1); + [_Row] -> + ok + end, + ok + end). + +-spec unregister_connection(rabbit_types:connection_name()) -> ok. + +unregister_connection(ConnId = {_Node, _Name}) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> ok; + [Row] -> + mnesia:dirty_update_counter( + ?PER_VHOST_COUNTER_TABLE, + Row#tracked_connection.vhost, -1), + mnesia:delete({?TABLE, ConnId}) + end + end). + + +-spec list() -> [rabbit_types:tracked_connection()]. + +list() -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}). + + +-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list(VHost) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}). + + +-spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}). + + +-spec on_node_down(node()) -> ok. + +on_node_down(Node) -> + case lists:member(Node, nodes()) of + false -> + Cs = list_on_node(Node), + rabbit_log:info( + "Node ~p is down, unregistering ~p client connections~n", + [Node, length(Cs)]), + [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs], + ok; + true -> rabbit_log:info( + "Keeping ~s connections: the node is already back~n", [Node]) + end. + +-spec on_node_up(node()) -> ok. +on_node_up(Node) -> + rabbit_connection_tracker:reregister(Node), + ok. + +-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). + +is_over_connection_limit(VirtualHost) -> + ConnectionCount = count_connections_in(VirtualHost), + case rabbit_vhost_limit:connection_limit(VirtualHost) of + undefined -> false; + {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of + %% 0 = no limit + {0, _} -> false; + %% the limit hasn't been reached + {_, false} -> false; + {_N, true} -> {true, Limit} + end + end. + + +-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). + +count_connections_in(VirtualHost) -> + try + case mnesia:transaction( + fun() -> + case mnesia:dirty_read( + {?PER_VHOST_COUNTER_TABLE, + VirtualHost}) of + [] -> 0; + [Val] -> + Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 + end + catch + _:Err -> + rabbit_log:error( + "Failed to fetch number of connections in vhost ~p:~n~p~n", + [VirtualHost, Err]), + 0 + end. + +%% Returns a #tracked_connection from connection_created +%% event details. +%% +%% @see rabbit_connection_tracking_handler. +tracked_connection_from_connection_created(EventDetails) -> + %% Example event: + %% + %% [{type,network}, + %% {pid,<0.329.0>}, + %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>}, + %% {port,5672}, + %% {peer_port,60998}, + %% {host,{0,0,0,0,0,65535,32512,1}}, + %% {peer_host,{0,0,0,0,0,65535,32512,1}}, + %% {ssl,false}, + %% {peer_cert_subject,''}, + %% {peer_cert_issuer,''}, + %% {peer_cert_validity,''}, + %% {auth_mechanism,<<"PLAIN">>}, + %% {ssl_protocol,''}, + %% {ssl_key_exchange,''}, + %% {ssl_cipher,''}, + %% {ssl_hash,''}, + %% {protocol,{0,9,1}}, + %% {user,<<"guest">>}, + %% {vhost,<<"/">>}, + %% {timeout,14}, + %% {frame_max,131072}, + %% {channel_max,65535}, + %% {client_properties, + %% [{<<"capabilities">>,table, + %% [{<<"publisher_confirms">>,bool,true}, + %% {<<"consumer_cancel_notify">>,bool,true}, + %% {<<"exchange_exchange_bindings">>,bool,true}, + %% {<<"basic.nack">>,bool,true}, + %% {<<"connection.blocked">>,bool,true}, + %% {<<"authentication_failure_close">>,bool,true}]}, + %% {<<"product">>,longstr,<<"Bunny">>}, + %% {<<"platform">>,longstr, + %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>}, + %% {<<"version">>,longstr,<<"2.3.0.pre">>}, + %% {<<"information">>,longstr, + %% <<"http://rubybunny.info">>}]}, + %% {connected_at,1453214290847}] + Name = proplists:get_value(name, EventDetails), + Node = proplists:get_value(node, EventDetails), + #tracked_connection{id = {Node, Name}, + name = Name, + node = Node, + vhost = proplists:get_value(vhost, EventDetails), + username = proplists:get_value(user, EventDetails), + connected_at = proplists:get_value(connected_at, EventDetails), + pid = proplists:get_value(pid, EventDetails), + peer_host = proplists:get_value(peer_host, EventDetails), + peer_port = proplists:get_value(peer_port, EventDetails)}. + +tracked_connection_from_connection_state(#connection{ + vhost = VHost, + connected_at = Ts, + peer_host = PeerHost, + peer_port = PeerPort, + user = Username, + name = Name + }) -> + tracked_connection_from_connection_created( + [{name, Name}, + {node, node()}, + {vhost, VHost}, + {user, Username}, + {connected_at, Ts}, + {pid, self()}, + {peer_port, PeerPort}, + {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl new file mode 100644 index 0000000000..deaf9bc7f6 --- /dev/null +++ b/src/rabbit_connection_tracking_handler.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking_handler). + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "connection tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [rabbit_event, rabbit_node_monitor]}, + {enables, recovery}]}). + + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = connection_created, props = Details}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_created(Details) + ), + {ok, State}; +%% see rabbit_reader +handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState) + ), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, + %% {pid,<0.1774.0>}, + %% {node, rabbit@hostname}] + rabbit_connection_tracking:unregister_connection( + {proplists:get_value(node, Details), + proplists:get_value(name, Details)}), + {ok, State}; +handle_event(#event{type = vhost_deleted, props = Details}, State) -> + VHost = proplists:get_value(name, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), + case rabbit_connection_tracking:list(VHost) of + [] -> {ok, State}; + Cs -> + [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || #tracked_connection{pid = Pid} <- Cs], + {ok, State} + end; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Username = proplists:get_value(name, Details), + %% TODO: force close and unregister connections from + %% this user. Moved to rabbitmq/rabbitmq-server#628. + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 7f410ac752..c56e519fe5 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -74,6 +74,9 @@ {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, + {set_vhost_limits, [?VHOST_DEF]}, + {clear_vhost_limits, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -544,6 +547,18 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); +action(set_vhost_limits, Node, [Defn], Opts, Inform) -> + Msg = "Setting vhost limits for vhost ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(Msg, [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]), + ok; + +action(clear_vhost_limits, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing vhost ~p limits", [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 26a864f0f5..43d268c374 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -466,12 +466,14 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> {[], true, disc} -> %% First disc node up maybe_force_load(), + ok = rabbit_table:ensure_secondary_indices(), ok; {[_ | _], _, _} -> %% Subsequent node in cluster, catch up maybe_force_load(), ok = rabbit_table:wait_for_replicated(), - ok = rabbit_table:create_local_copy(NodeType) + ok = rabbit_table:create_local_copy(NodeType), + ok = rabbit_table:ensure_secondary_indices() end, ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0322aacfd1..9ed790c75d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok = rabbit_connection_tracking:on_node_down(Node), %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -760,7 +761,8 @@ ensure_keepalive_timer(State) -> handle_live_rabbit(Node) -> ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). + ok = rabbit_mnesia:on_node_up(Node), + ok = rabbit_connection_tracking:on_node_up(Node). maybe_autoheal(State = #state{partitions = []}) -> State; diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 3909096964..60996c1539 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -18,7 +18,8 @@ -export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, force_load/0, is_present/0, is_empty/0, needs_default_data/0, - check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]). + check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0, + ensure_secondary_indices/0, ensure_secondary_indices/2]). -include("rabbit.hrl"). @@ -50,6 +51,7 @@ create() -> Tab, TabDef1, Reason}}) end end, definitions()), + ok = rabbit_table:ensure_secondary_indices(), ok. %% The sequence in which we delete the schema and then the other @@ -63,6 +65,13 @@ create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). +ensure_secondary_indices() -> + ensure_secondary_indices(rabbit_tracked_connection, [vhost, username]), + ok. + +ensure_secondary_indices(Tab, Fields) -> + [mnesia:add_table_index(Tab, Field) || Field <- Fields]. + wait_for_replicated() -> wait([Tab || {Tab, TabDef} <- definitions(), not lists:member({local_content, true}, TabDef)]). @@ -297,9 +306,24 @@ definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}] - ++ gm:table_definitions() - ++ mirrored_supervisor:table_definitions(). + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + + %% Used to track connections across virtual hosts + %% e.g. so that limits can be enforced. + %% + %% All data in this table is transient. + {rabbit_tracked_connection, + [{record_name, tracked_connection}, + {attributes, record_info(fields, tracked_connection)}, + {match, #tracked_connection{_ = '_'}}]}, + + {rabbit_tracked_connection_per_vhost, + [{record_name, tracked_connection_per_vhost}, + {attributes, record_info(fields, tracked_connection_per_vhost)}, + {match, #tracked_connection_per_vhost{_ = '_'}}]} + + ] ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3d624752ea..47053fafe7 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -55,6 +55,9 @@ -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). +-rabbit_upgrade({vhost_limits, mnesia, []}). +-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}). +-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}). %% ------------------------------------------------------------------- @@ -88,9 +91,36 @@ -spec queue_state() -> 'ok'. -spec recoverable_slaves() -> 'ok'. -spec user_password_hashing() -> 'ok'. +-spec vhost_limits() -> 'ok'. +-spec tracked_connection() -> 'ok'. +-spec tracked_connection_per_vhost() -> 'ok'. + %%-------------------------------------------------------------------- +tracked_connection() -> + create(rabbit_tracked_connection, [{record_name, tracked_connection}, + {attributes, [id, node, vhost, name, + pid, protocol, + peer_host, peer_port, + username, connected_at]}]). + +tracked_connection_per_vhost() -> + create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost}, + {attributes, [vhost, connection_count]}]). + +%% replaces vhost.dummy (used to avoid having a single-field record +%% which Mnesia doesn't like) with vhost.limits (which is actually +%% used) +vhost_limits() -> + io:format("vhost_limits vhost_limits vhost_limits~n"), + transform( + rabbit_vhost, + fun ({vhost, VHost, _Dummy}) -> + {vhost, VHost, undefined} + end, + [virtual_host, limits]). + %% It's a bad idea to use records or record_info here, even for the %% destination form. Because in the future, the destination form of %% your current transform may not match the record any more, and it @@ -510,6 +540,11 @@ create(Tab, TabDef) -> {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. +create(Tab, TabDef, SecondaryIndices) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + [mnesia:add_table_index(Tab, Idx) || Idx <- SecondaryIndices], + ok. + %% Dumb replacement for rabbit_exchange:declare that does not require %% the exchange type registry or worker pool to be running by dint of %% not validating anything and assuming the exchange type does not @@ -518,3 +553,7 @@ create(Tab, TabDef) -> declare_exchange(XName, Type) -> X = {exchange, XName, Type, true, false, false, []}, ok = mnesia:dirty_write(rabbit_durable_exchange, X). + +add_indices(Tab, FieldList) -> + [mnesia:add_table_index(Tab, Field) || Field <- FieldList], + ok. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index df2f8423b4..01f1046fb8 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,14 @@ %%---------------------------------------------------------------------------- --export([add/1, delete/1, exists/1, list/0, with/2, assert/1]). +-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, + set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). + -spec add(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost()) -> 'ok'. +-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. -spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. @@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of false -> throw({error, {no_such_vhost, VHostPath}}) end. +update(VHostPath, Fun) -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [V] -> + V1 = Fun(V), + ok = mnesia:write(rabbit_vhost, V1, write), + V1 + end. + +limits_of(VHostPath) when is_binary(VHostPath) -> + assert(VHostPath), + case mnesia:dirty_read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [#vhost{limits = Limits}] -> + Limits + end; +limits_of(#vhost{virtual_host = Name}) -> + limits_of(Name). + +set_limits(VHost = #vhost{}, undefined) -> + VHost#vhost{limits = undefined}; +set_limits(VHost = #vhost{}, Limits) -> + VHost#vhost{limits = Limits}. + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl new file mode 100644 index 0000000000..dd7ce51089 --- /dev/null +++ b/src/rabbit_vhost_limit.erl @@ -0,0 +1,98 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_limit). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-export([register/0]). +-export([parse_set/2, clear/1]). +-export([validate/5, notify/4, notify_clear/3]). +-export([connection_limit/1]). + +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "vhost limit parameters"}, + {mfa, {rabbit_vhost_limit, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +%%---------------------------------------------------------------------------- + +register() -> + rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE). + +validate(_VHost, <<"vhost-limits">>, Name, Term, _User) -> + rabbit_parameter_validation:proplist( + Name, vhost_limit_validation(), Term). + +notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) -> + rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]), + update_vhost(VHost, Limits). + +notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> + rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]), + update_vhost(VHost, undefined). + +connection_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-connections">>). + +%%---------------------------------------------------------------------------- + +parse_set(VHost, Defn) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set(VHost, rabbit_misc:json_to_term(JSON)); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Defn) -> + rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>, + <<"limits">>, Defn, none). + +clear(VHost) -> + rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>, + <<"limits">>). + +vhost_limit_validation() -> + [{<<"max-connections">>, fun rabbit_parameter_validation:number/2, mandatory}]. + +update_vhost(VHostName, Limits) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_vhost:update(VHostName, + fun(VHost) -> + rabbit_vhost:set_limits(VHost, Limits) + end) + end), + ok. + +get_limit(VirtualHost, Limit) -> + case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of + [] -> undefined; + [Param] -> case pget(value, Param) of + undefined -> undefined; + Val -> case pget(Limit, Val) of + undefined -> undefined; + N when N =< 0 -> undefined; + N when N > 0 -> {ok, N} + end + end + end. |
