summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_connection_tracking.erl13
-rw-r--r--src/rabbit_connection_tracking_handler.erl24
-rw-r--r--src/rabbit_credential_validation.erl53
-rw-r--r--src/rabbit_credential_validator.erl28
-rw-r--r--src/rabbit_credential_validator_accept_everything.erl32
-rw-r--r--src/rabbit_credential_validator_min_password_length.erl56
-rw-r--r--src/rabbit_credential_validator_password_regexp.erl51
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_table.erl9
-rw-r--r--src/rabbit_variable_queue.erl53
10 files changed, 290 insertions, 31 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 460bf964a0..38684482ef 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
- list/0, list/1, list_on_node/1,
+ list/0, list/1, list_on_node/1, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
@@ -217,6 +217,17 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
+-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
+
+list_of_user(Username) ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_connection_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(
+ Tab,
+ #tracked_connection{username = Username, _ = '_'})
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
count_connections_in(VirtualHost) ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 598fe686c3..739b8049c0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -27,6 +27,8 @@
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
+-export([close_connections/3]).
+
-include_lib("rabbit.hrl").
-import(rabbit_misc, [pget/2]).
@@ -77,13 +79,14 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = pget(name, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost]))
- || Conn <- rabbit_connection_tracking:list(VHost)],
+ close_connections(rabbit_connection_tracking:list(VHost),
+ rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
- _Username = pget(name, Details),
- %% TODO: force close and unregister connections from
- %% this user. Moved to rabbitmq/rabbitmq-server#628.
+ Username = pget(name, Details),
+ rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
+ close_connections(rabbit_connection_tracking:list_of_user(Username),
+ rabbit_misc:format("user '~s' is deleted", [Username])),
{ok, State};
%% A node had been deleted from the cluster.
handle_event(#event{type = node_deleted, props = Details}, State) ->
@@ -107,6 +110,17 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+close_connections(Tracked, Message) ->
+ close_connections(Tracked, Message, 0).
+
+close_connections(Tracked, Message, Delay) ->
+ [begin
+ close_connection(Conn, Message),
+ timer:sleep(Delay)
+ end || Conn <- Tracked],
+ ok.
+
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
rabbit_networking:close_connection(Pid, Message);
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
diff --git a/src/rabbit_credential_validation.erl b/src/rabbit_credential_validation.erl
new file mode 100644
index 0000000000..4a629da14f
--- /dev/null
+++ b/src/rabbit_credential_validation.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validation).
+
+-include("rabbit.hrl").
+
+%% used for backwards compatibility
+-define(DEFAULT_BACKEND, rabbit_credential_validator_accept_everything).
+
+%%
+%% API
+%%
+
+-export([validate/2, backend/0]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+%% Validates a username/password pair by delegating to the effective
+%% `rabbit_credential_validator`. Used by `rabbit_auth_backend_internal`.
+%% Note that some validators may choose to only validate passwords.
+%%
+%% Possible return values:
+%%
+%% * ok: provided credentials passed validation.
+%% * {error, Error, Args}: provided password password failed validation.
+
+validate(Username, Password) ->
+ Backend = backend(),
+ Backend:validate(Username, Password).
+
+-spec backend() -> atom().
+
+backend() ->
+ case application:get_env(rabbit, credential_validator) of
+ undefined ->
+ ?DEFAULT_BACKEND;
+ {ok, Proplist} ->
+ proplists:get_value(validation_backend, Proplist, ?DEFAULT_BACKEND)
+ end.
diff --git a/src/rabbit_credential_validator.erl b/src/rabbit_credential_validator.erl
new file mode 100644
index 0000000000..dd12f6d2d6
--- /dev/null
+++ b/src/rabbit_credential_validator.erl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator).
+
+-include("rabbit.hrl").
+
+%% Validates a password. Used by `rabbit_auth_backend_internal`.
+%%
+%% Possible return values:
+%%
+%% * ok: provided password passed validation.
+%% * {error, Error, Args}: provided password password failed validation.
+
+-callback validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
diff --git a/src/rabbit_credential_validator_accept_everything.erl b/src/rabbit_credential_validator_accept_everything.erl
new file mode 100644
index 0000000000..f572d67e7f
--- /dev/null
+++ b/src/rabbit_credential_validator_accept_everything.erl
@@ -0,0 +1,32 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator_accept_everything).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(_Username, _Password) ->
+ ok.
diff --git a/src/rabbit_credential_validator_min_password_length.erl b/src/rabbit_credential_validator_min_password_length.erl
new file mode 100644
index 0000000000..78239fc71b
--- /dev/null
+++ b/src/rabbit_credential_validator_min_password_length.erl
@@ -0,0 +1,56 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator_min_password_length).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%% accommodates default (localhost-only) user credentials,
+%% guest/guest
+-define(DEFAULT_MIN_LENGTH, 5).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+%% for tests
+-export([validate/3]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(Username, Password) ->
+ MinLength = case application:get_env(rabbit, credential_validator) of
+ undefined ->
+ ?DEFAULT_MIN_LENGTH;
+ {ok, Proplist} ->
+ case proplists:get_value(min_length, Proplist) of
+ undefined -> ?DEFAULT_MIN_LENGTH;
+ Value -> rabbit_data_coercion:to_integer(Value)
+ end
+ end,
+ validate(Username, Password, MinLength).
+
+
+-spec validate(rabbit_types:username(), rabbit_types:password(), integer()) -> 'ok' | {'error', string(), [any()]}.
+
+validate(_Username, Password, MinLength) ->
+ case size(Password) >= MinLength of
+ true -> ok;
+ false -> {error, rabbit_misc:format("minimum required password length is ~B", [MinLength])}
+ end.
diff --git a/src/rabbit_credential_validator_password_regexp.erl b/src/rabbit_credential_validator_password_regexp.erl
new file mode 100644
index 0000000000..b516175126
--- /dev/null
+++ b/src/rabbit_credential_validator_password_regexp.erl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+
+%% A `rabbit_credential_validator` implementation that matches
+%% password against a pre-configured regular expression.
+-module(rabbit_credential_validator_password_regexp).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+%% for tests
+-export([validate/3]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(Username, Password) ->
+ {ok, Proplist} = application:get_env(rabbit, credential_validator),
+ Regexp = case proplists:get_value(regexp, Proplist) of
+ undefined -> {error, "rabbit.credential_validator.regexp config key is undefined"};
+ Value -> rabbit_data_coercion:to_list(Value)
+ end,
+ validate(Username, Password, Regexp).
+
+
+-spec validate(rabbit_types:username(), rabbit_types:password(), string()) -> 'ok' | {'error', string(), [any()]}.
+
+validate(_Username, Password, Pattern) ->
+ case re:run(rabbit_data_coercion:to_list(Password), Pattern) of
+ {match, _} -> ok;
+ nomatch -> {error, "provided password does not match the validator regular expression"}
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 54f4180244..262f20913f 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_plugins).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/zip.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3, running_plugins/0]).
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 5154f91494..040075ea87 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -18,7 +18,8 @@
-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0]).
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
+ wait_for_replicated/0]).
%% for testing purposes
-export([definitions/0]).
@@ -31,6 +32,7 @@
-spec create() -> 'ok'.
-spec create_local_copy('disc' | 'ram') -> 'ok'.
-spec wait_for_replicated(retry()) -> 'ok'.
+-spec wait_for_replicated() -> 'ok'.
-spec wait([atom()]) -> 'ok'.
-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
-spec force_load() -> 'ok'.
@@ -79,6 +81,11 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
+%% This arity only exists for backwards compatibility with certain
+%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
+wait_for_replicated() ->
+ wait_for_replicated(false).
+
wait_for_replicated(Retry) ->
wait([Tab || {Tab, TabDef} <- definitions(),
not lists:member({local_content, true}, TabDef)], Retry).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9dbd4fdbf2..5581143e69 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -447,21 +447,28 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
-
%% we define the garbage collector threshold
-%% it needs to tune the GC calls inside `reduce_memory_use`
-%% see: rabbitmq-server-973 and `maybe_execute_gc` function
--define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
--define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
+%% see: rabbitmq-server-973 and rabbitmq-server-964
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
case get(explicit_gc_run_operation_threshold) of
undefined ->
- Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
- ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ Val = explicit_gc_run_operation_threshold_for_mode(Mode),
put(explicit_gc_run_operation_threshold, Val),
Val;
Val -> Val
end).
+explicit_gc_run_operation_threshold_for_mode(Mode) ->
+ {Key, Fallback} = case Mode of
+ lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
+ _ -> {queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
+ end,
+ rabbit_misc:get_env(rabbit, Key, Fallback).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -637,27 +644,27 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- a(reduce_memory_use(maybe_update_rates(State1))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State1))).
batch_publish(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, State1} =
lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
State2 = ui(State1),
- a(reduce_memory_use(maybe_update_rates(State2))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State2))).
publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
{SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+ {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
batch_publish_delivered(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, SeqIds, State1} =
lists:foldl(fun batch_publish_delivered1/2,
{ChPid, Flow, [], State}, Publishes),
State2 = ui(State1),
- {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
+ {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -761,7 +768,7 @@ requeue(AckTags, #vqstate { mode = default,
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
State2),
MsgCount = length(MsgIds2),
- {MsgIds2, a(reduce_memory_use(
+ {MsgIds2, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State3 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -779,7 +786,7 @@ requeue(AckTags, #vqstate { mode = lazy,
{Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
State1),
MsgCount = length(MsgIds1),
- {MsgIds1, a(reduce_memory_use(
+ {MsgIds1, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State2 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -829,7 +836,7 @@ set_ram_duration_target(
(TargetRamCount =/= infinity andalso
TargetRamCount1 >= TargetRamCount) of
true -> State1;
- false -> reduce_memory_use(State1)
+ false -> maybe_reduce_memory_use(State1)
end).
maybe_update_rates(State = #vqstate{ in_counter = InCount,
@@ -911,7 +918,7 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-resume(State) -> a(reduce_memory_use(State)).
+resume(State) -> a(maybe_reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
@@ -2364,12 +2371,12 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
-maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
- case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
- true -> garbage_collect(),
- State#vqstate{memory_reduction_run_count = 0};
- false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
-
+maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
+ mode = Mode}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
+ true -> State1 = reduce_memory_use(State),
+ State1#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
end.
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
@@ -2384,7 +2391,6 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State;
@@ -2444,7 +2450,8 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- maybe_execute_gc(State3).
+ garbage_collect(),
+ State3.
limit_ram_acks(0, State) ->
{0, ui(State)};