summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_connection_tracker.erl99
-rw-r--r--src/rabbit_connection_tracking.erl229
-rw-r--r--src/rabbit_connection_tracking_handler.erl101
-rw-r--r--src/rabbit_control_main.erl15
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_node_monitor.erl4
-rw-r--r--src/rabbit_table.erl32
-rw-r--r--src/rabbit_upgrade_functions.erl39
-rw-r--r--src/rabbit_vhost.erl31
-rw-r--r--src/rabbit_vhost_limit.erl98
11 files changed, 650 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2fa4cdee71..59ede3c802 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -178,6 +178,11 @@
{mfa, {rabbit_direct, boot, []}},
{requires, log_relay}]}).
+-rabbit_boot_step({connection_tracker,
+ [{description, "helps track node-local connections"},
+ {mfa, {rabbit_connection_tracker, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay}]}).
diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl
new file mode 100644
index 0000000000..0177627d83
--- /dev/null
+++ b/src/rabbit_connection_tracker.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracker).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+
+-behaviour(gen_server2).
+
+%% API
+-export([boot/0, start_link/0, reregister/1]).
+
+%% gen_fsm callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+boot() ->
+ {ok, _} = start_link(),
+ ok.
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+reregister(Node) ->
+ rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]),
+ gen_server2:cast({?SERVER, Node}, reregister).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ {ok, {}}.
+
+handle_call(_Req, _From, State) ->
+ {noreply, State}.
+
+handle_cast(reregister, State) ->
+ Cs = rabbit_networking:connections_local(),
+ rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]),
+ case Cs of
+ [] -> ok;
+ Cs ->
+ [reregister_connection(C) || C <- Cs],
+ ok
+ end,
+ rabbit_log:info("Done re-registering client connections"),
+ {noreply, State}.
+
+handle_info(_Req, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+reregister_connection(Conn) ->
+ try
+ Conn ! reregister
+ catch _:Error ->
+ rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error])
+ end.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
new file mode 100644
index 0000000000..c65023a2a8
--- /dev/null
+++ b/src/rabbit_connection_tracking.erl
@@ -0,0 +1,229 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+
+-export([register_connection/1, unregister_connection/1,
+ list/0, list/1, list_on_node/1,
+ tracked_connection_from_connection_created/1,
+ tracked_connection_from_connection_state/1,
+ is_over_connection_limit/1, count_connections_in/1,
+ on_node_down/1, on_node_up/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(TABLE, rabbit_tracked_connection).
+-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost).
+-define(SERVER, ?MODULE).
+
+%%
+%% API
+%%
+
+-spec register_connection(rabbit_types:tracked_connection()) -> ok.
+
+register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ %% upsert
+ case mnesia:dirty_read(?TABLE, ConnId) of
+ [] ->
+ mnesia:write(?TABLE, Conn, write),
+ mnesia:dirty_update_counter(
+ rabbit_tracked_connection_per_vhost, VHost, 1);
+ [_Row] ->
+ ok
+ end,
+ ok
+ end).
+
+-spec unregister_connection(rabbit_types:connection_name()) -> ok.
+
+unregister_connection(ConnId = {_Node, _Name}) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:dirty_read(?TABLE, ConnId) of
+ [] -> ok;
+ [Row] ->
+ mnesia:dirty_update_counter(
+ ?PER_VHOST_COUNTER_TABLE,
+ Row#tracked_connection.vhost, -1),
+ mnesia:delete({?TABLE, ConnId})
+ end
+ end).
+
+
+-spec list() -> [rabbit_types:tracked_connection()].
+
+list() ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}).
+
+
+-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list(VHost) ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}).
+
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node) ->
+ mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}).
+
+
+-spec on_node_down(node()) -> ok.
+
+on_node_down(Node) ->
+ case lists:member(Node, nodes()) of
+ false ->
+ Cs = list_on_node(Node),
+ rabbit_log:info(
+ "Node ~p is down, unregistering ~p client connections~n",
+ [Node, length(Cs)]),
+ [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs],
+ ok;
+ true -> rabbit_log:info(
+ "Keeping ~s connections: the node is already back~n", [Node])
+ end.
+
+-spec on_node_up(node()) -> ok.
+on_node_up(Node) ->
+ rabbit_connection_tracker:reregister(Node),
+ ok.
+
+-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean().
+
+is_over_connection_limit(VirtualHost) ->
+ ConnectionCount = count_connections_in(VirtualHost),
+ case rabbit_vhost_limit:connection_limit(VirtualHost) of
+ undefined -> false;
+ {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of
+ %% 0 = no limit
+ {0, _} -> false;
+ %% the limit hasn't been reached
+ {_, false} -> false;
+ {_N, true} -> {true, Limit}
+ end
+ end.
+
+
+-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
+
+count_connections_in(VirtualHost) ->
+ try
+ case mnesia:transaction(
+ fun() ->
+ case mnesia:dirty_read(
+ {?PER_VHOST_COUNTER_TABLE,
+ VirtualHost}) of
+ [] -> 0;
+ [Val] ->
+ Val#tracked_connection_per_vhost.connection_count
+ end
+ end) of
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
+ end
+ catch
+ _:Err ->
+ rabbit_log:error(
+ "Failed to fetch number of connections in vhost ~p:~n~p~n",
+ [VirtualHost, Err]),
+ 0
+ end.
+
+%% Returns a #tracked_connection from connection_created
+%% event details.
+%%
+%% @see rabbit_connection_tracking_handler.
+tracked_connection_from_connection_created(EventDetails) ->
+ %% Example event:
+ %%
+ %% [{type,network},
+ %% {pid,<0.329.0>},
+ %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
+ %% {port,5672},
+ %% {peer_port,60998},
+ %% {host,{0,0,0,0,0,65535,32512,1}},
+ %% {peer_host,{0,0,0,0,0,65535,32512,1}},
+ %% {ssl,false},
+ %% {peer_cert_subject,''},
+ %% {peer_cert_issuer,''},
+ %% {peer_cert_validity,''},
+ %% {auth_mechanism,<<"PLAIN">>},
+ %% {ssl_protocol,''},
+ %% {ssl_key_exchange,''},
+ %% {ssl_cipher,''},
+ %% {ssl_hash,''},
+ %% {protocol,{0,9,1}},
+ %% {user,<<"guest">>},
+ %% {vhost,<<"/">>},
+ %% {timeout,14},
+ %% {frame_max,131072},
+ %% {channel_max,65535},
+ %% {client_properties,
+ %% [{<<"capabilities">>,table,
+ %% [{<<"publisher_confirms">>,bool,true},
+ %% {<<"consumer_cancel_notify">>,bool,true},
+ %% {<<"exchange_exchange_bindings">>,bool,true},
+ %% {<<"basic.nack">>,bool,true},
+ %% {<<"connection.blocked">>,bool,true},
+ %% {<<"authentication_failure_close">>,bool,true}]},
+ %% {<<"product">>,longstr,<<"Bunny">>},
+ %% {<<"platform">>,longstr,
+ %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
+ %% {<<"version">>,longstr,<<"2.3.0.pre">>},
+ %% {<<"information">>,longstr,
+ %% <<"http://rubybunny.info">>}]},
+ %% {connected_at,1453214290847}]
+ Name = proplists:get_value(name, EventDetails),
+ Node = proplists:get_value(node, EventDetails),
+ #tracked_connection{id = {Node, Name},
+ name = Name,
+ node = Node,
+ vhost = proplists:get_value(vhost, EventDetails),
+ username = proplists:get_value(user, EventDetails),
+ connected_at = proplists:get_value(connected_at, EventDetails),
+ pid = proplists:get_value(pid, EventDetails),
+ peer_host = proplists:get_value(peer_host, EventDetails),
+ peer_port = proplists:get_value(peer_port, EventDetails)}.
+
+tracked_connection_from_connection_state(#connection{
+ vhost = VHost,
+ connected_at = Ts,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ user = Username,
+ name = Name
+ }) ->
+ tracked_connection_from_connection_created(
+ [{name, Name},
+ {node, node()},
+ {vhost, VHost},
+ {user, Username},
+ {connected_at, Ts},
+ {pid, self()},
+ {peer_port, PeerPort},
+ {peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
new file mode 100644
index 0000000000..deaf9bc7f6
--- /dev/null
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking_handler).
+
+%% This module keeps track of connection creation and termination events
+%% on its local node. The primary goal here is to decouple connection
+%% tracking from rabbit_reader in rabbit_common.
+%%
+%% Events from other nodes are ignored.
+
+%% This module keeps track of connection creation and termination events
+%% on its local node. The primary goal here is to decouple connection
+%% tracking from rabbit_reader in rabbit_common.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "connection tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [rabbit_event, rabbit_node_monitor]},
+ {enables, recovery}]}).
+
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = connection_created, props = Details}, State) ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
+ ),
+ {ok, State};
+%% see rabbit_reader
+handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState)
+ ),
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ rabbit_connection_tracking:unregister_connection(
+ {proplists:get_value(node, Details),
+ proplists:get_value(name, Details)}),
+ {ok, State};
+handle_event(#event{type = vhost_deleted, props = Details}, State) ->
+ VHost = proplists:get_value(name, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
+ case rabbit_connection_tracking:list(VHost) of
+ [] -> {ok, State};
+ Cs ->
+ [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || #tracked_connection{pid = Pid} <- Cs],
+ {ok, State}
+ end;
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Username = proplists:get_value(name, Details),
+ %% TODO: force close and unregister connections from
+ %% this user. Moved to rabbitmq/rabbitmq-server#628.
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f410ac752..c56e519fe5 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -74,6 +74,9 @@
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
+ {set_vhost_limits, [?VHOST_DEF]},
+ {clear_vhost_limits, [?VHOST_DEF]},
+
{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -544,6 +547,18 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+action(set_vhost_limits, Node, [Defn], Opts, Inform) ->
+ Msg = "Setting vhost limits for vhost ~p",
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform(Msg, [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]),
+ ok;
+
+action(clear_vhost_limits, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing vhost ~p limits", [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]);
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 26a864f0f5..43d268c374 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -466,12 +466,14 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
{[], true, disc} ->
%% First disc node up
maybe_force_load(),
+ ok = rabbit_table:ensure_secondary_indices(),
ok;
{[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
maybe_force_load(),
ok = rabbit_table:wait_for_replicated(),
- ok = rabbit_table:create_local_copy(NodeType)
+ ok = rabbit_table:create_local_copy(NodeType),
+ ok = rabbit_table:ensure_secondary_indices()
end,
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0322aacfd1..9ed790c75d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
+ ok = rabbit_connection_tracking:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -760,7 +761,8 @@ ensure_keepalive_timer(State) ->
handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node).
+ ok = rabbit_mnesia:on_node_up(Node),
+ ok = rabbit_connection_tracking:on_node_up(Node).
maybe_autoheal(State = #state{partitions = []}) ->
State;
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 3909096964..60996c1539 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -18,7 +18,8 @@
-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]).
+ check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0,
+ ensure_secondary_indices/0, ensure_secondary_indices/2]).
-include("rabbit.hrl").
@@ -50,6 +51,7 @@ create() ->
Tab, TabDef1, Reason}})
end
end, definitions()),
+ ok = rabbit_table:ensure_secondary_indices(),
ok.
%% The sequence in which we delete the schema and then the other
@@ -63,6 +65,13 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
+ensure_secondary_indices() ->
+ ensure_secondary_indices(rabbit_tracked_connection, [vhost, username]),
+ ok.
+
+ensure_secondary_indices(Tab, Fields) ->
+ [mnesia:add_table_index(Tab, Field) || Field <- Fields].
+
wait_for_replicated() ->
wait([Tab || {Tab, TabDef} <- definitions(),
not lists:member({local_content, true}, TabDef)]).
@@ -297,9 +306,24 @@ definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
- ++ gm:table_definitions()
- ++ mirrored_supervisor:table_definitions().
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
+
+ %% Used to track connections across virtual hosts
+ %% e.g. so that limits can be enforced.
+ %%
+ %% All data in this table is transient.
+ {rabbit_tracked_connection,
+ [{record_name, tracked_connection},
+ {attributes, record_info(fields, tracked_connection)},
+ {match, #tracked_connection{_ = '_'}}]},
+
+ {rabbit_tracked_connection_per_vhost,
+ [{record_name, tracked_connection_per_vhost},
+ {attributes, record_info(fields, tracked_connection_per_vhost)},
+ {match, #tracked_connection_per_vhost{_ = '_'}}]}
+
+ ] ++ gm:table_definitions()
+ ++ mirrored_supervisor:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3d624752ea..47053fafe7 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -55,6 +55,9 @@
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
+-rabbit_upgrade({vhost_limits, mnesia, []}).
+-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}).
+-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}).
%% -------------------------------------------------------------------
@@ -88,9 +91,36 @@
-spec queue_state() -> 'ok'.
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
+-spec vhost_limits() -> 'ok'.
+-spec tracked_connection() -> 'ok'.
+-spec tracked_connection_per_vhost() -> 'ok'.
+
%%--------------------------------------------------------------------
+tracked_connection() ->
+ create(rabbit_tracked_connection, [{record_name, tracked_connection},
+ {attributes, [id, node, vhost, name,
+ pid, protocol,
+ peer_host, peer_port,
+ username, connected_at]}]).
+
+tracked_connection_per_vhost() ->
+ create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost},
+ {attributes, [vhost, connection_count]}]).
+
+%% replaces vhost.dummy (used to avoid having a single-field record
+%% which Mnesia doesn't like) with vhost.limits (which is actually
+%% used)
+vhost_limits() ->
+ io:format("vhost_limits vhost_limits vhost_limits~n"),
+ transform(
+ rabbit_vhost,
+ fun ({vhost, VHost, _Dummy}) ->
+ {vhost, VHost, undefined}
+ end,
+ [virtual_host, limits]).
+
%% It's a bad idea to use records or record_info here, even for the
%% destination form. Because in the future, the destination form of
%% your current transform may not match the record any more, and it
@@ -510,6 +540,11 @@ create(Tab, TabDef) ->
{atomic, ok} = mnesia:create_table(Tab, TabDef),
ok.
+create(Tab, TabDef, SecondaryIndices) ->
+ {atomic, ok} = mnesia:create_table(Tab, TabDef),
+ [mnesia:add_table_index(Tab, Idx) || Idx <- SecondaryIndices],
+ ok.
+
%% Dumb replacement for rabbit_exchange:declare that does not require
%% the exchange type registry or worker pool to be running by dint of
%% not validating anything and assuming the exchange type does not
@@ -518,3 +553,7 @@ create(Tab, TabDef) ->
declare_exchange(XName, Type) ->
X = {exchange, XName, Type, true, false, false, []},
ok = mnesia:dirty_write(rabbit_durable_exchange, X).
+
+add_indices(Tab, FieldList) ->
+ [mnesia:add_table_index(Tab, Field) || Field <- FieldList],
+ ok.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index df2f8423b4..01f1046fb8 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,11 +20,14 @@
%%----------------------------------------------------------------------------
--export([add/1, delete/1, exists/1, list/0, with/2, assert/1]).
+-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
+ set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
+
-spec add(rabbit_types:vhost()) -> 'ok'.
-spec delete(rabbit_types:vhost()) -> 'ok'.
+-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
-spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
@@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of
false -> throw({error, {no_such_vhost, VHostPath}})
end.
+update(VHostPath, Fun) ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [V] ->
+ V1 = Fun(V),
+ ok = mnesia:write(rabbit_vhost, V1, write),
+ V1
+ end.
+
+limits_of(VHostPath) when is_binary(VHostPath) ->
+ assert(VHostPath),
+ case mnesia:dirty_read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [#vhost{limits = Limits}] ->
+ Limits
+ end;
+limits_of(#vhost{virtual_host = Name}) ->
+ limits_of(Name).
+
+set_limits(VHost = #vhost{}, undefined) ->
+ VHost#vhost{limits = undefined};
+set_limits(VHost = #vhost{}, Limits) ->
+ VHost#vhost{limits = Limits}.
+
%%----------------------------------------------------------------------------
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
new file mode 100644
index 0000000000..dd7ce51089
--- /dev/null
+++ b/src/rabbit_vhost_limit.erl
@@ -0,0 +1,98 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_limit).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-export([register/0]).
+-export([parse_set/2, clear/1]).
+-export([validate/5, notify/4, notify_clear/3]).
+-export([connection_limit/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "vhost limit parameters"},
+ {mfa, {rabbit_vhost_limit, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+%%----------------------------------------------------------------------------
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE).
+
+validate(_VHost, <<"vhost-limits">>, Name, Term, _User) ->
+ rabbit_parameter_validation:proplist(
+ Name, vhost_limit_validation(), Term).
+
+notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) ->
+ rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]),
+ update_vhost(VHost, Limits).
+
+notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
+ rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]),
+ update_vhost(VHost, undefined).
+
+connection_limit(VirtualHost) ->
+ get_limit(VirtualHost, <<"max-connections">>).
+
+%%----------------------------------------------------------------------------
+
+parse_set(VHost, Defn) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set(VHost, rabbit_misc:json_to_term(JSON));
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Defn) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>,
+ <<"limits">>, Defn, none).
+
+clear(VHost) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>,
+ <<"limits">>).
+
+vhost_limit_validation() ->
+ [{<<"max-connections">>, fun rabbit_parameter_validation:number/2, mandatory}].
+
+update_vhost(VHostName, Limits) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_vhost:update(VHostName,
+ fun(VHost) ->
+ rabbit_vhost:set_limits(VHost, Limits)
+ end)
+ end),
+ ok.
+
+get_limit(VirtualHost, Limit) ->
+ case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of
+ [] -> undefined;
+ [Param] -> case pget(value, Param) of
+ undefined -> undefined;
+ Val -> case pget(Limit, Val) of
+ undefined -> undefined;
+ N when N =< 0 -> undefined;
+ N when N > 0 -> {ok, N}
+ end
+ end
+ end.