summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_credential_validation.erl53
-rw-r--r--src/rabbit_credential_validator.erl28
-rw-r--r--src/rabbit_credential_validator_accept_everything.erl32
-rw-r--r--src/rabbit_credential_validator_min_password_length.erl56
-rw-r--r--src/rabbit_credential_validator_password_regexp.erl51
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_ssl.erl228
-rw-r--r--src/rabbit_table.erl9
-rw-r--r--src/rabbit_variable_queue.erl53
9 files changed, 268 insertions, 244 deletions
diff --git a/src/rabbit_credential_validation.erl b/src/rabbit_credential_validation.erl
new file mode 100644
index 0000000000..4a629da14f
--- /dev/null
+++ b/src/rabbit_credential_validation.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validation).
+
+-include("rabbit.hrl").
+
+%% used for backwards compatibility
+-define(DEFAULT_BACKEND, rabbit_credential_validator_accept_everything).
+
+%%
+%% API
+%%
+
+-export([validate/2, backend/0]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+%% Validates a username/password pair by delegating to the effective
+%% `rabbit_credential_validator`. Used by `rabbit_auth_backend_internal`.
+%% Note that some validators may choose to only validate passwords.
+%%
+%% Possible return values:
+%%
+%% * ok: provided credentials passed validation.
+%% * {error, Error, Args}: provided password password failed validation.
+
+validate(Username, Password) ->
+ Backend = backend(),
+ Backend:validate(Username, Password).
+
+-spec backend() -> atom().
+
+backend() ->
+ case application:get_env(rabbit, credential_validator) of
+ undefined ->
+ ?DEFAULT_BACKEND;
+ {ok, Proplist} ->
+ proplists:get_value(validation_backend, Proplist, ?DEFAULT_BACKEND)
+ end.
diff --git a/src/rabbit_credential_validator.erl b/src/rabbit_credential_validator.erl
new file mode 100644
index 0000000000..dd12f6d2d6
--- /dev/null
+++ b/src/rabbit_credential_validator.erl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator).
+
+-include("rabbit.hrl").
+
+%% Validates a password. Used by `rabbit_auth_backend_internal`.
+%%
+%% Possible return values:
+%%
+%% * ok: provided password passed validation.
+%% * {error, Error, Args}: provided password password failed validation.
+
+-callback validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
diff --git a/src/rabbit_credential_validator_accept_everything.erl b/src/rabbit_credential_validator_accept_everything.erl
new file mode 100644
index 0000000000..f572d67e7f
--- /dev/null
+++ b/src/rabbit_credential_validator_accept_everything.erl
@@ -0,0 +1,32 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator_accept_everything).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(_Username, _Password) ->
+ ok.
diff --git a/src/rabbit_credential_validator_min_password_length.erl b/src/rabbit_credential_validator_min_password_length.erl
new file mode 100644
index 0000000000..78239fc71b
--- /dev/null
+++ b/src/rabbit_credential_validator_min_password_length.erl
@@ -0,0 +1,56 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_credential_validator_min_password_length).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%% accommodates default (localhost-only) user credentials,
+%% guest/guest
+-define(DEFAULT_MIN_LENGTH, 5).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+%% for tests
+-export([validate/3]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(Username, Password) ->
+ MinLength = case application:get_env(rabbit, credential_validator) of
+ undefined ->
+ ?DEFAULT_MIN_LENGTH;
+ {ok, Proplist} ->
+ case proplists:get_value(min_length, Proplist) of
+ undefined -> ?DEFAULT_MIN_LENGTH;
+ Value -> rabbit_data_coercion:to_integer(Value)
+ end
+ end,
+ validate(Username, Password, MinLength).
+
+
+-spec validate(rabbit_types:username(), rabbit_types:password(), integer()) -> 'ok' | {'error', string(), [any()]}.
+
+validate(_Username, Password, MinLength) ->
+ case size(Password) >= MinLength of
+ true -> ok;
+ false -> {error, rabbit_misc:format("minimum required password length is ~B", [MinLength])}
+ end.
diff --git a/src/rabbit_credential_validator_password_regexp.erl b/src/rabbit_credential_validator_password_regexp.erl
new file mode 100644
index 0000000000..b516175126
--- /dev/null
+++ b/src/rabbit_credential_validator_password_regexp.erl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+
+%% A `rabbit_credential_validator` implementation that matches
+%% password against a pre-configured regular expression.
+-module(rabbit_credential_validator_password_regexp).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_credential_validator).
+
+%%
+%% API
+%%
+
+-export([validate/2]).
+%% for tests
+-export([validate/3]).
+
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
+validate(Username, Password) ->
+ {ok, Proplist} = application:get_env(rabbit, credential_validator),
+ Regexp = case proplists:get_value(regexp, Proplist) of
+ undefined -> {error, "rabbit.credential_validator.regexp config key is undefined"};
+ Value -> rabbit_data_coercion:to_list(Value)
+ end,
+ validate(Username, Password, Regexp).
+
+
+-spec validate(rabbit_types:username(), rabbit_types:password(), string()) -> 'ok' | {'error', string(), [any()]}.
+
+validate(_Username, Password, Pattern) ->
+ case re:run(rabbit_data_coercion:to_list(Password), Pattern) of
+ {match, _} -> ok;
+ nomatch -> {error, "provided password does not match the validator regular expression"}
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index bde4ff5388..b408b7e613 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_plugins).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3]).
-export([ensure/1]).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index ac9fb204d0..6a87d93a29 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,13 +11,11 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_ssl).
--include("rabbit.hrl").
-
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
@@ -27,15 +25,7 @@
-export_type([certificate/0]).
--type certificate() :: binary().
-
--spec peer_cert_issuer(certificate()) -> string().
--spec peer_cert_subject(certificate()) -> string().
--spec peer_cert_validity(certificate()) -> string().
--spec peer_cert_subject_items
- (certificate(), tuple()) -> [string()] | 'not_found'.
--spec peer_cert_auth_name
- (certificate()) -> binary() | 'not_found' | 'unsafe'.
+-type certificate() :: rabbit_cert_info:certificate().
%%--------------------------------------------------------------------------
%% High-level functions used by reader
@@ -43,38 +33,24 @@
%% Return a string describing the certificate's issuer.
peer_cert_issuer(Cert) ->
- cert_info(fun(#'OTPCertificate' {
- tbsCertificate = #'OTPTBSCertificate' {
- issuer = Issuer }}) ->
- format_rdn_sequence(Issuer)
- end, Cert).
+ rabbit_cert_info:issuer(Cert).
%% Return a string describing the certificate's subject, as per RFC4514.
peer_cert_subject(Cert) ->
- cert_info(fun(#'OTPCertificate' {
- tbsCertificate = #'OTPTBSCertificate' {
- subject = Subject }}) ->
- format_rdn_sequence(Subject)
- end, Cert).
+ rabbit_cert_info:subject(Cert).
%% Return the parts of the certificate's subject.
peer_cert_subject_items(Cert, Type) ->
- cert_info(fun(#'OTPCertificate' {
- tbsCertificate = #'OTPTBSCertificate' {
- subject = Subject }}) ->
- find_by_type(Type, Subject)
- end, Cert).
+ rabbit_cert_info:subject_items(Cert, Type).
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
- cert_info(fun(#'OTPCertificate' {
- tbsCertificate = #'OTPTBSCertificate' {
- validity = {'Validity', Start, End} }}) ->
- rabbit_misc:format("~s - ~s", [format_asn1_value(Start),
- format_asn1_value(End)])
- end, Cert).
+ rabbit_cert_info:validity(Cert).
%% Extract a username from the certificate
+-spec peer_cert_auth_name
+ (certificate()) -> binary() | 'not_found' | 'unsafe'.
+
peer_cert_auth_name(Cert) ->
{ok, Mode} = application:get_env(rabbit, ssl_cert_login_from),
peer_cert_auth_name(Mode, Cert).
@@ -106,189 +82,3 @@ auth_config_sane() ->
"disabled, verify=~p~n", [V]),
false
end.
-
-%%--------------------------------------------------------------------------
-
-cert_info(F, Cert) ->
- F(case public_key:pkix_decode_cert(Cert, otp) of
- {ok, DecCert} -> DecCert; %%pre R14B
- DecCert -> DecCert %%R14B onwards
- end).
-
-find_by_type(Type, {rdnSequence, RDNs}) ->
- case [V || #'AttributeTypeAndValue'{type = T, value = V}
- <- lists:flatten(RDNs),
- T == Type] of
- [] -> not_found;
- L -> [format_asn1_value(V) || V <- L]
- end.
-
-%%--------------------------------------------------------------------------
-%% Formatting functions
-%%--------------------------------------------------------------------------
-
-%% Format and rdnSequence as a RFC4514 subject string.
-format_rdn_sequence({rdnSequence, Seq}) ->
- string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",").
-
-%% Format an RDN set.
-format_complex_rdn(RDNs) ->
- string:join([format_rdn(RDN) || RDN <- RDNs], "+").
-
-%% Format an RDN. If the type name is unknown, use the dotted decimal
-%% representation. See RFC4514, section 2.3.
-format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
- FV = escape_rdn_value(format_asn1_value(V)),
- Fmts = [{?'id-at-surname' , "SN"},
- {?'id-at-givenName' , "GIVENNAME"},
- {?'id-at-initials' , "INITIALS"},
- {?'id-at-generationQualifier' , "GENERATIONQUALIFIER"},
- {?'id-at-commonName' , "CN"},
- {?'id-at-localityName' , "L"},
- {?'id-at-stateOrProvinceName' , "ST"},
- {?'id-at-organizationName' , "O"},
- {?'id-at-organizationalUnitName' , "OU"},
- {?'id-at-title' , "TITLE"},
- {?'id-at-countryName' , "C"},
- {?'id-at-serialNumber' , "SERIALNUMBER"},
- {?'id-at-pseudonym' , "PSEUDONYM"},
- {?'id-domainComponent' , "DC"},
- {?'id-emailAddress' , "EMAILADDRESS"},
- {?'street-address' , "STREET"},
- {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl
- case proplists:lookup(T, Fmts) of
- {_, Fmt} ->
- rabbit_misc:format(Fmt ++ "=~s", [FV]);
- none when is_tuple(T) ->
- TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)],
- rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]);
- none ->
- rabbit_misc:format("~p=~s", [T, FV])
- end.
-
-%% Escape a string as per RFC4514.
-escape_rdn_value(V) ->
- escape_rdn_value(V, start).
-
-escape_rdn_value([], _) ->
- [];
-escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# ->
- [$\\, C | escape_rdn_value(S, middle)];
-escape_rdn_value(S, start) ->
- escape_rdn_value(S, middle);
-escape_rdn_value([$ ], middle) ->
- [$\\, $ ];
-escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
- C =:= $<; C =:= $>; C =:= $\\ ->
- [$\\, C | escape_rdn_value(S, middle)];
-escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
- %% Of ASCII characters only U+0000 needs escaping, but for display
- %% purposes it's handy to escape all non-printable chars. All non-ASCII
- %% characters get converted to UTF-8 sequences and then escaped. We've
- %% already got a UTF-8 sequence here, so just escape it.
- rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle);
-escape_rdn_value([C | S], middle) ->
- [C | escape_rdn_value(S, middle)].
-
-%% Get the string representation of an OTPCertificate field.
-format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
- ST =:= universalString; ST =:= utf8String;
- ST =:= bmpString ->
- format_directory_string(ST, S);
-format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
- Min1, Min2, S1, S2, $Z]}) ->
- rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
- [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
-%% We appear to get an untagged value back for an ia5string
-%% (e.g. domainComponent).
-format_asn1_value(V) when is_list(V) ->
- V;
-format_asn1_value(V) when is_binary(V) ->
- %% OTP does not decode some values when combined with an unknown
- %% type. That's probably wrong, so as a last ditch effort let's
- %% try manually decoding. 'DirectoryString' is semi-arbitrary -
- %% but it is the type which covers the various string types we
- %% handle below.
- try
- {ST, S} = public_key:der_decode('DirectoryString', V),
- format_directory_string(ST, S)
- catch _:_ ->
- rabbit_misc:format("~p", [V])
- end;
-format_asn1_value(V) ->
- rabbit_misc:format("~p", [V]).
-
-%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
-%% teletexString TeletexString (SIZE (1..maxSize)),
-%% printableString PrintableString (SIZE (1..maxSize)),
-%% bmpString BMPString (SIZE (1..maxSize)),
-%% universalString UniversalString (SIZE (1..maxSize)),
-%% uTF8String UTF8String (SIZE (1..maxSize)) }
-%%
-%% Precise definitions of printable / teletexString are hard to come
-%% by. This is what I reconstructed:
-%%
-%% printableString:
-%% "intended to represent the limited character sets available to
-%% mainframe input terminals"
-%% A-Z a-z 0-9 ' ( ) + , - . / : = ? [space]
-%% http://msdn.microsoft.com/en-us/library/bb540814(v=vs.85).aspx
-%%
-%% teletexString:
-%% "a sizable volume of software in the world treats TeletexString
-%% (T61String) as a simple 8-bit string with mostly Windows Latin 1
-%% (superset of iso-8859-1) encoding"
-%% http://www.mail-archive.com/asn1@asn1.org/msg00460.html
-%%
-%% (However according to that link X.680 actually defines
-%% TeletexString in some much more involved and crazy way. I suggest
-%% we treat it as ISO-8859-1 since Erlang does not support Windows
-%% Latin 1).
-%%
-%% bmpString:
-%% UCS-2 according to RFC 3641. Hence cannot represent Unicode
-%% characters above 65535 (outside the "Basic Multilingual Plane").
-%%
-%% universalString:
-%% UCS-4 according to RFC 3641.
-%%
-%% utf8String:
-%% UTF-8 according to RFC 3641.
-%%
-%% Within Rabbit we assume UTF-8 encoding. Since printableString is a
-%% subset of ASCII it is also a subset of UTF-8. The others need
-%% converting. Fortunately since the Erlang SSL library does the
-%% decoding for us (albeit into a weird format, see below), we just
-%% need to handle encoding into UTF-8. Note also that utf8Strings come
-%% back as binary.
-%%
-%% Note for testing: the default Ubuntu configuration for openssl will
-%% only create printableString or teletexString types no matter what
-%% you do. Edit string_mask in the [req] section of
-%% /etc/ssl/openssl.cnf to change this (see comments there). You
-%% probably also need to set utf8 = yes to get it to accept UTF-8 on
-%% the command line. Also note I could not get openssl to generate a
-%% universalString.
-
-format_directory_string(printableString, S) -> S;
-format_directory_string(teletexString, S) -> utf8_list_from(S);
-format_directory_string(bmpString, S) -> utf8_list_from(S);
-format_directory_string(universalString, S) -> utf8_list_from(S);
-format_directory_string(utf8String, S) -> binary_to_list(S).
-
-utf8_list_from(S) ->
- binary_to_list(
- unicode:characters_to_binary(flatten_ssl_list(S), utf32, utf8)).
-
-%% The Erlang SSL implementation invents its own representation for
-%% non-ascii strings - looking like [97,{0,0,3,187}] (that's LATIN
-%% SMALL LETTER A followed by GREEK SMALL LETTER LAMDA). We convert
-%% this into a list of unicode characters, which we can tell
-%% unicode:characters_to_binary is utf32.
-
-flatten_ssl_list(L) -> [flatten_ssl_list_item(I) || I <- L].
-
-flatten_ssl_list_item({A, B, C, D}) ->
- A * (1 bsl 24) + B * (1 bsl 16) + C * (1 bsl 8) + D;
-flatten_ssl_list_item(N) when is_number (N) ->
- N.
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 43a744d315..4a9a02e492 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -18,7 +18,8 @@
-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0]).
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
+ wait_for_replicated/0]).
-include("rabbit.hrl").
@@ -28,6 +29,7 @@
-spec create() -> 'ok'.
-spec create_local_copy('disc' | 'ram') -> 'ok'.
-spec wait_for_replicated(retry()) -> 'ok'.
+-spec wait_for_replicated() -> 'ok'.
-spec wait([atom()]) -> 'ok'.
-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
-spec force_load() -> 'ok'.
@@ -64,6 +66,11 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
+%% This arity only exists for backwards compatibility with certain
+%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
+wait_for_replicated() ->
+ wait_for_replicated(false).
+
wait_for_replicated(Retry) ->
wait([Tab || {Tab, TabDef} <- definitions(),
not lists:member({local_content, true}, TabDef)], Retry).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 58f0230420..f7582a622a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -436,21 +436,28 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
-
%% we define the garbage collector threshold
-%% it needs to tune the GC calls inside `reduce_memory_use`
-%% see: rabbitmq-server-973 and `maybe_execute_gc` function
--define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
--define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
+%% see: rabbitmq-server-973 and rabbitmq-server-964
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
case get(explicit_gc_run_operation_threshold) of
undefined ->
- Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
- ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ Val = explicit_gc_run_operation_threshold_for_mode(Mode),
put(explicit_gc_run_operation_threshold, Val),
Val;
Val -> Val
end).
+explicit_gc_run_operation_threshold_for_mode(Mode) ->
+ {Key, Fallback} = case Mode of
+ lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
+ _ -> {queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
+ end,
+ rabbit_misc:get_env(rabbit, Key, Fallback).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -590,27 +597,27 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- a(reduce_memory_use(maybe_update_rates(State1))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State1))).
batch_publish(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, State1} =
lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
State2 = ui(State1),
- a(reduce_memory_use(maybe_update_rates(State2))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State2))).
publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
{SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+ {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
batch_publish_delivered(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, SeqIds, State1} =
lists:foldl(fun batch_publish_delivered1/2,
{ChPid, Flow, [], State}, Publishes),
State2 = ui(State1),
- {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
+ {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -714,7 +721,7 @@ requeue(AckTags, #vqstate { mode = default,
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
State2),
MsgCount = length(MsgIds2),
- {MsgIds2, a(reduce_memory_use(
+ {MsgIds2, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State3 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -732,7 +739,7 @@ requeue(AckTags, #vqstate { mode = lazy,
{Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
State1),
MsgCount = length(MsgIds1),
- {MsgIds1, a(reduce_memory_use(
+ {MsgIds1, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State2 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -782,7 +789,7 @@ set_ram_duration_target(
(TargetRamCount =/= infinity andalso
TargetRamCount1 >= TargetRamCount) of
true -> State1;
- false -> reduce_memory_use(State1)
+ false -> maybe_reduce_memory_use(State1)
end).
maybe_update_rates(State = #vqstate{ in_counter = InCount,
@@ -864,7 +871,7 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-resume(State) -> a(reduce_memory_use(State)).
+resume(State) -> a(maybe_reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
@@ -2336,12 +2343,12 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
-maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
- case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
- true -> garbage_collect(),
- State#vqstate{memory_reduction_run_count = 0};
- false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
-
+maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
+ mode = Mode}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
+ true -> State1 = reduce_memory_use(State),
+ State1#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
end.
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
@@ -2356,7 +2363,6 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State;
@@ -2416,7 +2422,8 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- maybe_execute_gc(State3).
+ garbage_collect(),
+ State3.
limit_ram_acks(0, State) ->
{0, ui(State)};