summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-12-26 01:46:29 +0300
committerMichael Klishin <michael@clojurewerkz.org>2021-12-26 01:46:29 +0300
commit8a0ad561829aa2baf581e49d7be802be86b1aa5c (patch)
treeff4331925254ef51d3604e31445d42942ed23916
parent40db6563b8ae7c07d86f0e7710fccde1433029ef (diff)
parent6835a5b9a7717c49168564069e086fb963dd612e (diff)
downloadrabbitmq-server-git-8a0ad561829aa2baf581e49d7be802be86b1aa5c.tar.gz
Merge branch 'master' into delegate_opt
-rw-r--r--.github/workflows/test.yaml1
-rw-r--r--WORKSPACE.bazel4
-rw-r--r--deps/rabbit/BUILD.bazel3
-rw-r--r--deps/rabbit/src/rabbit.erl2
-rw-r--r--deps/rabbit/src/rabbit_access_control.erl65
-rw-r--r--deps/rabbit/src/rabbit_auth_backend_internal.erl196
-rw-r--r--deps/rabbit/src/rabbit_channel.erl2
-rw-r--r--deps/rabbit/src/rabbit_definitions.erl4
-rw-r--r--deps/rabbit/src/rabbit_disk_monitor.erl197
-rw-r--r--deps/rabbit/src/rabbit_file.erl15
-rw-r--r--deps/rabbit/src/rabbit_health_check.erl6
-rw-r--r--deps/rabbit/src/rabbit_osiris_metrics.erl2
-rw-r--r--deps/rabbit/src/rabbit_policies.erl4
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl26
-rw-r--r--deps/rabbit/test/definition_import_SUITE.erl38
-rw-r--r--deps/rabbit/test/definition_import_SUITE_data/case18.json46
-rw-r--r--deps/rabbit/test/definition_import_SUITE_data/failing_case17.json19
-rw-r--r--deps/rabbit/test/definition_import_SUITE_data/failing_case19.json46
-rw-r--r--deps/rabbit/test/publisher_confirms_parallel_SUITE.erl12
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl2
-rw-r--r--deps/rabbit/test/unit_disk_monitor_SUITE.erl11
-rw-r--r--deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl2
-rw-r--r--deps/rabbit/test/unit_operator_policy_SUITE.erl55
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/README.md40
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema49
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl27
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl2
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl10
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets21
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl109
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl10
-rw-r--r--deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl21
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl31
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml4
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java8
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java59
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java10
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java25
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl27
-rw-r--r--deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml6
-rw-r--r--tools/erlang_ls.bzl2
42 files changed, 997 insertions, 224 deletions
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 9efb138102..ce66e26c73 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -13,6 +13,7 @@ on:
- '*.bzl'
- '*.bazel'
- .github/workflows/test.yaml
+ pull_request:
jobs:
test:
name: Test
diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel
index b6c0d665f4..00e4d4bebd 100644
--- a/WORKSPACE.bazel
+++ b/WORKSPACE.bazel
@@ -34,13 +34,13 @@ buildbuddy(
git_repository(
name = "rbe_23",
- commit = "d2b454dc5138a2a92de45a0a672241a4fbb5a1e5",
+ commit = "b21c066e426de48e526cc0f8c5158b7024d04e85",
remote = "https://github.com/rabbitmq/rbe-erlang-platform.git",
)
git_repository(
name = "rbe_24",
- commit = "a087892ef4202dc3245b64d36d5921491848315f",
+ commit = "c8cbf65e2facbe464ebbcee7b6cf6f7a2d422ded",
remote = "https://github.com/rabbitmq/rbe-erlang-platform.git",
)
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 01be7ae4e2..f607f38246 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -703,9 +703,6 @@ suites = [
additional_hdrs = [
"src/rabbit_fifo.hrl",
],
- erlc_opts = [
- "-I deps/rabbit", # allow rabbit_fifo.hrl to be included at src/rabbit_fifo.hrl
- ],
runtime_deps = [
"@meck//:bazel_erlang_lib",
"@ra//:bazel_erlang_lib",
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 32e07d095b..48ef9ec439 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -500,7 +500,7 @@ stop_and_halt() ->
%% init:stop() will be called regardless of any errors.
try
AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
- ?LOG_ERROR(
+ ?LOG_INFO(
lists:flatten(
["Halting Erlang VM with the following applications:~n",
[" ~p~n" || _ <- AppsLeft]]),
diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl
index 5411969759..d9670d87ec 100644
--- a/deps/rabbit/src/rabbit_access_control.erl
+++ b/deps/rabbit/src/rabbit_access_control.erl
@@ -38,35 +38,42 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
%% extra auth properties like MQTT client id are in AuthProps
{ok, Modules} = application:get_env(rabbit, auth_backends),
- R = lists:foldl(
- fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) ->
- %% It is possible to specify authn/authz within the cache module settings,
- %% so we have to do both auth steps here
- %% See this rabbitmq-users discussion:
- %% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion
- try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps);
- ({ModN, ModZs}, {refused, _, _, _}) ->
- %% Different modules for authN vs authZ. So authenticate
- %% with authN module, then if that succeeds do
- %% passwordless (i.e pre-authenticated) login with authZ.
- try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps);
- (Mod, {refused, _, _, _}) ->
- %% Same module for authN and authZ. Just take the result
- %% it gives us
- case try_authenticate(Mod, Username, AuthProps) of
- {ok, ModNUser = #auth_user{username = Username2, impl = Impl}} ->
- rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]),
- user(ModNUser, {ok, [{Mod, Impl}], []});
- Else ->
- rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]),
- Else
- end;
- (_, {ok, User}) ->
- %% We've successfully authenticated. Skip to the end...
- {ok, User}
- end,
- {refused, Username, "No modules checked '~s'", [Username]}, Modules),
- R.
+ try
+ lists:foldl(
+ fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) ->
+ %% It is possible to specify authn/authz within the cache module settings,
+ %% so we have to do both auth steps here
+ %% See this rabbitmq-users discussion:
+ %% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion
+ try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps);
+ ({ModN, ModZs}, {refused, _, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ.
+ try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps);
+ (Mod, {refused, _, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ case try_authenticate(Mod, Username, AuthProps) of
+ {ok, ModNUser = #auth_user{username = Username2, impl = Impl}} ->
+ rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]),
+ user(ModNUser, {ok, [{Mod, Impl}], []});
+ Else ->
+ rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]),
+ Else
+ end;
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
+ {ok, User}
+ end,
+ {refused, Username, "No modules checked '~s'", [Username]}, Modules)
+ catch
+ Type:Error:Stacktrace ->
+ rabbit_log:debug("User '~s' authentication failed with ~s:~p:~n~p", [Username, Type, Error, Stacktrace]),
+ {refused, Username, "User '~s' authentication failed with internal error. "
+ "Enable debug logs to see the real error.", [Username]}
+
+ end.
try_authenticate_and_try_authorize(ModN, ModZs0, Username, AuthProps) ->
ModZs = case ModZs0 of
diff --git a/deps/rabbit/src/rabbit_auth_backend_internal.erl b/deps/rabbit/src/rabbit_auth_backend_internal.erl
index a46beaaf83..9295b65a4c 100644
--- a/deps/rabbit/src/rabbit_auth_backend_internal.erl
+++ b/deps/rabbit/src/rabbit_auth_backend_internal.erl
@@ -14,12 +14,15 @@
-export([user_login_authentication/2, user_login_authorization/2,
check_vhost_access/3, check_resource_access/4, check_topic_access/4]).
--export([add_user/3, delete_user/2, lookup_user/1, exists/1,
+-export([add_user/3, add_user/4, add_user/5, delete_user/2, lookup_user/1, exists/1,
change_password/3, clear_password/2,
hash_password/2, change_password_hash/2, change_password_hash/3,
set_tags/3, set_permissions/6, clear_permissions/3,
set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4,
- add_user_sans_validation/3, put_user/2, put_user/3]).
+ add_user_sans_validation/3, put_user/2, put_user/3,
+ update_user/5,
+ update_user_with_hash/5,
+ add_user_sans_validation/6]).
-export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1,
is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]).
@@ -208,14 +211,56 @@ add_user(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun add_user_sans_validation/3).
+-spec add_user(rabbit_types:username(), rabbit_types:password(),
+ rabbit_types:username(), [atom()]) -> 'ok' | {'error', string()}.
+
+add_user(Username, Password, ActingUser, Tags) ->
+ add_user(Username, Password, ActingUser, undefined, Tags).
+
+add_user(Username, Password, ActingUser, Limits, Tags) ->
+ validate_and_alternate_credentials(Username, Password, ActingUser,
+ add_user_sans_validation(Limits, Tags)).
+
add_user_sans_validation(Username, Password, ActingUser) ->
+ add_user_sans_validation(Username, Password, ActingUser, undefined, []).
+
+add_user_sans_validation(Limits, Tags) ->
+ fun(Username, Password, ActingUser) ->
+ add_user_sans_validation(Username, Password, ActingUser, Limits, Tags)
+ end.
+
+add_user_sans_validation(Username, Password, ActingUser, Limits, Tags) ->
rabbit_log:debug("Asked to create a new user '~s', password length in bytes: ~p", [Username, bit_size(Password)]),
%% hash_password will pick the hashing function configured for us
%% but we also need to store a hint as part of the record, so we
%% retrieve it here one more time
HashingMod = rabbit_password:hashing_mod(),
PasswordHash = hash_password(HashingMod, Password),
- User = internal_user:create_user(Username, PasswordHash, HashingMod),
+ User0 = internal_user:create_user(Username, PasswordHash, HashingMod),
+ ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
+ User1 = internal_user:set_tags(User0, ConvertedTags),
+ User = case Limits of
+ undefined -> User1;
+ Term -> internal_user:update_limits(add, User1, Term)
+ end,
+ add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser).
+
+add_user_sans_validation(Username, PasswordHash, HashingAlgorithm, Tags, Limits, ActingUser) ->
+ rabbit_log:debug("Asked to create a new user '~s' with password hash", [Username]),
+ ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
+ HashingMod = rabbit_password:hashing_mod(),
+ User0 = internal_user:create_user(Username, PasswordHash, HashingMod),
+ User1 = internal_user:set_tags(
+ internal_user:set_password_hash(User0,
+ PasswordHash, HashingAlgorithm),
+ ConvertedTags),
+ User = case Limits of
+ undefined -> User1;
+ Term -> internal_user:update_limits(add, User1, Term)
+ end,
+ add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser).
+
+add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser) ->
try
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -229,6 +274,14 @@ add_user_sans_validation(Username, Password, ActingUser) ->
rabbit_log:info("Created user '~s'", [Username]),
rabbit_event:notify(user_created, [{name, Username},
{user_who_performed_action, ActingUser}]),
+ case ConvertedTags of
+ [] -> ok;
+ _ -> notify_user_tags_set(Username, ConvertedTags, ActingUser)
+ end,
+ case Limits of
+ undefined -> ok;
+ _ -> notify_limit_set(Username, ActingUser, Limits)
+ end,
R
catch
throw:{error, {user_already_exists, _}} = Error ->
@@ -322,6 +375,42 @@ change_password_sans_validation(Username, Password, ActingUser) ->
erlang:raise(Class, Error, Stacktrace)
end.
+update_user(Username, Password, Tags, Limits, ActingUser) ->
+ validate_and_alternate_credentials(Username, Password, ActingUser,
+ update_user_sans_validation(Tags, Limits)).
+
+update_user_sans_validation(Tags, Limits) ->
+ fun(Username, Password, ActingUser) ->
+ try
+ rabbit_log:debug("Asked to change password of user '~s', new password length in bytes: ~p", [Username, bit_size(Password)]),
+ HashingAlgorithm = rabbit_password:hashing_mod(),
+
+ rabbit_log:debug("Asked to set user tags for user '~s' to ~p", [Username, Tags]),
+
+ ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
+ R = update_user_with_hash(Username,
+ hash_password(rabbit_password:hashing_mod(),
+ Password),
+ HashingAlgorithm,
+ ConvertedTags,
+ Limits),
+ rabbit_log:info("Successfully changed password for user '~s'", [Username]),
+ rabbit_event:notify(user_password_changed,
+ [{name, Username},
+ {user_who_performed_action, ActingUser}]),
+
+ notify_user_tags_set(Username, ConvertedTags, ActingUser),
+ R
+ catch
+ throw:{error, {no_such_user, _}} = Error ->
+ rabbit_log:warning("Failed to change password for user '~s': the user does not exist", [Username]),
+ throw(Error);
+ Class:Error:Stacktrace ->
+ rabbit_log:warning("Failed to change password for user '~s': ~p", [Username, Error]),
+ erlang:raise(Class, Error, Stacktrace)
+ end
+ end.
+
-spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
clear_password(Username, ActingUser) ->
@@ -346,10 +435,22 @@ change_password_hash(Username, PasswordHash) ->
change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
- update_user(Username, fun(User) ->
- internal_user:set_password_hash(User,
- PasswordHash, HashingAlgorithm)
- end).
+ update_user_with_hash(Username, PasswordHash, HashingAlgorithm, [], undefined).
+
+update_user_with_hash(Username, PasswordHash, HashingAlgorithm, ConvertedTags, Limits) ->
+ update_user(Username,
+ fun(User0) ->
+ User1 = internal_user:set_password_hash(User0,
+ PasswordHash, HashingAlgorithm),
+ User2 = case Limits of
+ undefined -> User1;
+ _ -> internal_user:update_limits(add, User1, Limits)
+ end,
+ case ConvertedTags of
+ [] -> User2;
+ _ -> internal_user:set_tags(User2, ConvertedTags)
+ end
+ end).
-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
@@ -360,9 +461,7 @@ set_tags(Username, Tags, ActingUser) ->
R = update_user(Username, fun(User) ->
internal_user:set_tags(User, ConvertedTags)
end),
- rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]),
- rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
- {user_who_performed_action, ActingUser}]),
+ notify_user_tags_set(Username, ConvertedTags, ActingUser),
R
catch
throw:{error, {no_such_user, _}} = Error ->
@@ -373,6 +472,11 @@ set_tags(Username, Tags, ActingUser) ->
erlang:raise(Class, Error, Stacktrace)
end .
+notify_user_tags_set(Username, ConvertedTags, ActingUser) ->
+ rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]),
+ rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
+ {user_who_performed_action, ActingUser}]).
+
-spec set_permissions
(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
regexp(), rabbit_types:username()) ->
@@ -648,13 +752,27 @@ put_user(User, Version, ActingUser) ->
rabbit_credential_validation:validate(Username, Password) =:= ok
end,
+ Limits = case rabbit_feature_flags:is_enabled(user_limits) of
+ false ->
+ undefined;
+ true ->
+ case maps:get(limits, User, undefined) of
+ undefined ->
+ undefined;
+ Term ->
+ case validate_user_limits(Term) of
+ ok -> Term;
+ Error -> throw(Error)
+ end
+ end
+ end,
case exists(Username) of
true ->
case {HasPassword, HasPasswordHash} of
{true, false} ->
- update_user_password(PassedCredentialValidation, Username, Password, Tags, ActingUser);
+ update_user_password(PassedCredentialValidation, Username, Password, Tags, Limits, ActingUser);
{false, true} ->
- update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser);
+ update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version);
{true, true} ->
throw({error, both_password_and_password_hash_are_provided});
%% clear password, update tags if needed
@@ -665,63 +783,54 @@ put_user(User, Version, ActingUser) ->
false ->
case {HasPassword, HasPasswordHash} of
{true, false} ->
- create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, ActingUser);
+ create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, Limits, ActingUser);
{false, true} ->
- create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, ActingUser);
+ create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, Limits, ActingUser);
{true, true} ->
throw({error, both_password_and_password_hash_are_provided});
{false, false} ->
%% this user won't be able to sign in using
%% a username/password pair but can be used for x509 certificate authentication,
%% with authn backends such as HTTP or LDAP and so on.
- create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, ActingUser)
+ create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, Limits, ActingUser)
end
end.
-update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, ActingUser) ->
- rabbit_auth_backend_internal:change_password(Username, Password, ActingUser),
- rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
-update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _ActingUser) ->
+update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, Limits, ActingUser) ->
+ %% change_password, set_tags and limits
+ rabbit_auth_backend_internal:update_user(Username, Password, Tags, Limits, ActingUser);
+update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _Limits, _ActingUser) ->
%% we don't log here because
%% rabbit_auth_backend_internal will do it
throw({error, credential_validation_failed}).
-update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser) ->
+update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version) ->
%% when a hash this provided, credential validation
%% is not applied
HashingAlgorithm = hashing_algorithm(User, Version),
Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
- rabbit_auth_backend_internal:change_password_hash(
- Username, Hash, HashingAlgorithm),
- rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser).
-
-create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, ActingUser) ->
- rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
- rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
-create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, ActingUser) ->
- rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
- rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
+ ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
+ rabbit_auth_backend_internal:update_user_with_hash(
+ Username, Hash, HashingAlgorithm, ConvertedTags, Limits).
+
+create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, Limits, ActingUser) ->
+ rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags);
+create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, Limits, ActingUser) ->
+ rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags),
preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser);
-create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _) ->
+create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _, _) ->
%% we don't log here because
%% rabbit_auth_backend_internal will do it
throw({error, credential_validation_failed}).
-create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, ActingUser) ->
+create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, Limits, ActingUser) ->
%% when a hash this provided, credential validation
%% is not applied
HashingAlgorithm = hashing_algorithm(User, Version),
Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
- %% first we create a user with dummy credentials and no
- %% validation applied, then we update password hash
- TmpPassword = rabbit_guid:binary(rabbit_guid:gen_secure(), "tmp"),
- rabbit_auth_backend_internal:add_user_sans_validation(Username, TmpPassword, ActingUser),
-
- rabbit_auth_backend_internal:change_password_hash(
- Username, Hash, HashingAlgorithm),
- rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
+ rabbit_auth_backend_internal:add_user_sans_validation(Username, Hash, HashingAlgorithm, Tags, Limits, ActingUser),
preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser).
preconfigure_permissions(_Username, undefined, _ActingUser) ->
@@ -756,8 +865,7 @@ set_user_limits(Username, Definition, ActingUser) when is_map(Definition) ->
end.
validate_parameters_and_update_limit(Username, Term, ActingUser) ->
- case flatten_errors(rabbit_parameter_validation:proplist(
- <<"user-limits">>, user_limit_validation(), Term)) of
+ case validate_user_limits(Term) of
ok ->
update_user(Username, fun(User) ->
internal_user:update_limits(add, User, Term)
@@ -767,6 +875,10 @@ validate_parameters_and_update_limit(Username, Term, ActingUser) ->
{error_string, rabbit_misc:format(Reason, Arguments)}
end.
+validate_user_limits(Term) ->
+ flatten_errors(rabbit_parameter_validation:proplist(
+ <<"user-limits">>, user_limit_validation(), Term)).
+
user_limit_validation() ->
[{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
{<<"max-channels">>, fun rabbit_parameter_validation:integer/2, optional}].
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 7fee29f2c3..cd8ebe4446 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1306,7 +1306,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
- case DoConfirm orelse Mandatory of
+ case DoConfirm of
false -> {undefined, State0};
true -> rabbit_global_counters:messages_received_confirm(amqp091, 1),
SeqNo = State0#ch.publish_seqno,
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl
index 8faf2f808a..152a6be5f2 100644
--- a/deps/rabbit/src/rabbit_definitions.erl
+++ b/deps/rabbit/src/rabbit_definitions.erl
@@ -464,6 +464,10 @@ add_policy(Param, Username) ->
add_policy(VHost, Param, Username) ->
Key = maps:get(name, Param, undefined),
+ case Key of
+ undefined -> exit(rabbit_misc:format("policy in virtual host '~s' has undefined name", [VHost]));
+ _ -> ok
+ end,
case rabbit_policy:set(
VHost, Key, maps:get(pattern, Param, undefined),
case maps:get(definition, Param, undefined) of
diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl
index 76edecf5d8..28b4cb3eba 100644
--- a/deps/rabbit/src/rabbit_disk_monitor.erl
+++ b/deps/rabbit/src/rabbit_disk_monitor.erl
@@ -33,6 +33,7 @@
get_disk_free/0, set_enabled/1]).
-define(SERVER, ?MODULE).
+-define(ETS_NAME, ?MODULE).
-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000).
-define(DEFAULT_DISK_FREE_LIMIT, 50000000).
@@ -73,51 +74,42 @@
%%----------------------------------------------------------------------------
-spec get_disk_free_limit() -> integer().
-
get_disk_free_limit() ->
- gen_server:call(?MODULE, get_disk_free_limit, infinity).
+ safe_ets_lookup(disk_free_limit, ?DEFAULT_DISK_FREE_LIMIT).
-spec set_disk_free_limit(disk_free_limit()) -> 'ok'.
-
set_disk_free_limit(Limit) ->
- gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
+ gen_server:call(?MODULE, {set_disk_free_limit, Limit}).
-spec get_min_check_interval() -> integer().
-
get_min_check_interval() ->
- gen_server:call(?MODULE, get_min_check_interval, infinity).
+ safe_ets_lookup(min_check_interval, ?DEFAULT_MIN_DISK_CHECK_INTERVAL).
-spec set_min_check_interval(integer()) -> 'ok'.
-
set_min_check_interval(Interval) ->
- gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity).
+ gen_server:call(?MODULE, {set_min_check_interval, Interval}).
-spec get_max_check_interval() -> integer().
-
get_max_check_interval() ->
- gen_server:call(?MODULE, get_max_check_interval, infinity).
+ safe_ets_lookup(max_check_interval, ?DEFAULT_MAX_DISK_CHECK_INTERVAL).
-spec set_max_check_interval(integer()) -> 'ok'.
-
set_max_check_interval(Interval) ->
- gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
+ gen_server:call(?MODULE, {set_max_check_interval, Interval}).
-spec get_disk_free() -> (integer() | 'unknown').
-
get_disk_free() ->
- gen_server:call(?MODULE, get_disk_free, infinity).
+ safe_ets_lookup(disk_free, unknown).
-spec set_enabled(string()) -> 'ok'.
-
set_enabled(Enabled) ->
- gen_server:call(?MODULE, {set_enabled, Enabled}, infinity).
+ gen_server:call(?MODULE, {set_enabled, Enabled}).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error().
-
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
@@ -125,18 +117,16 @@ init([Limit]) ->
Dir = dir(),
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
- State = #state{dir = Dir,
- min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
- max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
- alarmed = false,
- enabled = true,
- limit = Limit,
- retries = Retries,
- interval = Interval},
- {ok, enable(State)}.
-
-handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
- {reply, Limit, State};
+ ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
+ State0 = #state{dir = Dir,
+ alarmed = false,
+ enabled = true,
+ limit = Limit,
+ retries = Retries,
+ interval = Interval},
+ State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0),
+ State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1),
+ {ok, enable(State2)}.
handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
rabbit_log:info("Cannot set disk free limit: "
@@ -146,20 +136,14 @@ handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
-handle_call(get_min_check_interval, _From, State) ->
- {reply, State#state.min_interval, State};
-
handle_call(get_max_check_interval, _From, State) ->
{reply, State#state.max_interval, State};
handle_call({set_min_check_interval, MinInterval}, _From, State) ->
- {reply, ok, State#state{min_interval = MinInterval}};
+ {reply, ok, set_min_check_interval(MinInterval, State)};
handle_call({set_max_check_interval, MaxInterval}, _From, State) ->
- {reply, ok, State#state{max_interval = MaxInterval}};
-
-handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
- {reply, Actual, State};
+ {reply, ok, set_max_check_interval(MaxInterval, State)};
handle_call({set_enabled, _Enabled = true}, _From, State) ->
start_timer(set_disk_limits(State, State#state.limit)),
@@ -194,14 +178,36 @@ code_change(_OldVsn, State, _Extra) ->
%% Server Internals
%%----------------------------------------------------------------------------
+safe_ets_lookup(Key, Default) ->
+ try
+ case ets:lookup(?ETS_NAME, Key) of
+ [{Key, Value}] ->
+ Value;
+ [] ->
+ Default
+ end
+ catch
+ error:badarg ->
+ Default
+ end.
+
% the partition / drive containing this directory will be monitored
dir() -> rabbit_mnesia:dir().
+set_min_check_interval(MinInterval, State) ->
+ ets:insert(?ETS_NAME, {min_check_interval, MinInterval}),
+ State#state{min_interval = MinInterval}.
+
+set_max_check_interval(MaxInterval, State) ->
+ ets:insert(?ETS_NAME, {max_check_interval, MaxInterval}),
+ State#state{max_interval = MaxInterval}.
+
set_disk_limits(State, Limit0) ->
Limit = interpret_limit(Limit0),
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB",
[trunc(Limit / 1000000)]),
+ ets:insert(?ETS_NAME, {disk_free_limit, Limit}),
internal_update(State1).
internal_update(State = #state { limit = Limit,
@@ -219,7 +225,8 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
- State #state {alarmed = NewAlarmed, actual = CurrentFree}.
+ ets:insert(?ETS_NAME, {disk_free, CurrentFree}),
+ State#state{alarmed = NewAlarmed, actual = CurrentFree}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
@@ -227,11 +234,89 @@ get_disk_free(Dir) ->
get_disk_free(Dir, {unix, Sun})
when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
Df = os:find_executable("df"),
- parse_free_unix(rabbit_misc:os_cmd(Df ++ " -k " ++ Dir));
+ parse_free_unix(run_cmd(Df ++ " -k " ++ Dir));
get_disk_free(Dir, {unix, _}) ->
Df = os:find_executable("df"),
- parse_free_unix(rabbit_misc:os_cmd(Df ++ " -kP " ++ Dir));
+ parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
+ % Dir:
+ % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia"
+ case win32_get_drive_letter(Dir) of
+ error ->
+ rabbit_log:warning("Expected the mnesia directory absolute "
+ "path to start with a drive letter like "
+ "'C:'. The path is: '~p'", [Dir]),
+ case win32_get_disk_free_dir(Dir) of
+ {ok, Free} ->
+ Free;
+ _ -> exit(could_not_determine_disk_free)
+ end;
+ DriveLetter ->
+ case win32_get_disk_free_fsutil(DriveLetter) of
+ {ok, Free0} -> Free0;
+ error ->
+ case win32_get_disk_free_pwsh(DriveLetter) of
+ {ok, Free1} -> Free1;
+ _ -> exit(could_not_determine_disk_free)
+ end
+ end
+ end.
+
+parse_free_unix(Str) ->
+ case string:tokens(Str, "\n") of
+ [_, S | _] -> case string:tokens(S, " \t") of
+ [_, _, _, Free | _] -> list_to_integer(Free) * 1024;
+ _ -> exit({unparseable, Str})
+ end;
+ _ -> exit({unparseable, Str})
+ end.
+
+win32_get_drive_letter([DriveLetter, $:, $/ | _]) when
+ (DriveLetter >= $a andalso DriveLetter =< $z) orelse
+ (DriveLetter >= $A andalso DriveLetter =< $Z) ->
+ DriveLetter;
+win32_get_drive_letter(_) ->
+ error.
+
+win32_get_disk_free_fsutil(DriveLetter) when
+ (DriveLetter >= $a andalso DriveLetter =< $z) orelse
+ (DriveLetter >= $A andalso DriveLetter =< $Z) ->
+ % DriveLetter $c
+ FsutilCmd = "fsutil.exe volume diskfree " ++ [DriveLetter] ++ ":",
+
+ % C:\windows\system32>fsutil volume diskfree c:
+ % Total free bytes : 812,733,878,272 (756.9 GB)
+ % Total bytes : 1,013,310,287,872 (943.7 GB)
+ % Total quota free bytes : 812,733,878,272 (756.9 GB)
+ case run_cmd(FsutilCmd) of
+ {error, timeout} ->
+ error;
+ FsutilResult ->
+ case string:slice(FsutilResult, 0, 5) of
+ "Error" ->
+ error;
+ "Total" ->
+ FirstLine = hd(string:tokens(FsutilResult, "\r\n")),
+ {match, [FreeStr]} = re:run(FirstLine, "(\\d+,?)+", [{capture, first, list}]),
+ {ok, list_to_integer(lists:flatten(string:tokens(FreeStr, ",")))}
+ end
+ end.
+
+win32_get_disk_free_pwsh(DriveLetter) when
+ (DriveLetter >= $a andalso DriveLetter =< $z) orelse
+ (DriveLetter >= $A andalso DriveLetter =< $Z) ->
+ % DriveLetter $c
+ PoshCmd = "powershell.exe -NoLogo -NoProfile -NonInteractive -Command (Get-PSDrive " ++ [DriveLetter] ++ ").Free",
+ case run_cmd(PoshCmd) of
+ {error, timeout} ->
+ error;
+ PoshResultStr ->
+ % Note: remove \r\n
+ PoshResult = string:slice(PoshResultStr, 0, length(PoshResultStr) - 2),
+ {ok, list_to_integer(PoshResult)}
+ end.
+
+win32_get_disk_free_dir(Dir) ->
%% On Windows, the Win32 API enforces a limit of 260 characters
%% (MAX_PATH). If we call `dir` with a path longer than that, it
%% fails with "File not found". Starting with Windows 10 version
@@ -253,22 +338,11 @@ get_disk_free(Dir, {win32, _}) ->
%% See the following page to learn more about this:
%% https://ss64.com/nt/syntax-filenames.html
RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all),
- parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ RawDir ++ "\"")).
-
-parse_free_unix(Str) ->
- case string:tokens(Str, "\n") of
- [_, S | _] -> case string:tokens(S, " \t") of
- [_, _, _, Free | _] -> list_to_integer(Free) * 1024;
- _ -> exit({unparseable, Str})
- end;
- _ -> exit({unparseable, Str})
- end.
-
-parse_free_win32(CommandResult) ->
+ CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""),
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
{match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
[{capture, all_but_first, list}]),
- list_to_integer(lists:reverse(Free)).
+ {ok, list_to_integer(lists:reverse(Free))}.
interpret_limit({mem_relative, Relative})
when is_number(Relative) ->
@@ -318,3 +392,20 @@ enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries}
erlang:send_after(Interval, self(), try_enable),
State#state{enabled = false}
end.
+
+run_cmd(Cmd) ->
+ Pid = self(),
+ Ref = make_ref(),
+ CmdFun = fun() ->
+ CmdResult = rabbit_misc:os_cmd(Cmd),
+ Pid ! {Pid, Ref, CmdResult}
+ end,
+ CmdPid = spawn(CmdFun),
+ receive
+ {Pid, Ref, CmdResult} ->
+ CmdResult
+ after 5000 ->
+ exit(CmdPid, kill),
+ rabbit_log:error("Command timed out: '~s'", [Cmd]),
+ {error, timeout}
+ end.
diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl
index 925a5356d7..2060c354c5 100644
--- a/deps/rabbit/src/rabbit_file.erl
+++ b/deps/rabbit/src/rabbit_file.erl
@@ -40,7 +40,7 @@ is_file(File) ->
is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
-is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
+is_dir_no_handle(Dir) -> is_dir_internal(file:read_file_info(Dir, [raw])).
is_dir_internal({ok, #file_info{type=directory}}) -> true;
is_dir_internal(_) -> false.
@@ -83,14 +83,23 @@ wildcard(Pattern, Dir) ->
list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end).
read_file_info(File) ->
- with_handle(fun () -> prim_file:read_file_info(File) end).
+ with_handle(fun () -> file:read_file_info(File, [raw]) end).
-spec read_term_file
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()).
read_term_file(File) ->
try
- {ok, Data} = with_handle(fun () -> prim_file:read_file(File) end),
+ F = fun() ->
+ {ok, FInfo} = file:read_file_info(File, [raw]),
+ {ok, Fd} = file:open(File, [read, raw, binary]),
+ try
+ file:read(Fd, FInfo#file_info.size)
+ after
+ file:close(Fd)
+ end
+ end,
+ {ok, Data} = with_handle(F),
{ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
TokenGroups = group_tokens(Tokens),
{ok, [begin
diff --git a/deps/rabbit/src/rabbit_health_check.erl b/deps/rabbit/src/rabbit_health_check.erl
index a454c252fd..b04c4f9853 100644
--- a/deps/rabbit/src/rabbit_health_check.erl
+++ b/deps/rabbit/src/rabbit_health_check.erl
@@ -64,7 +64,11 @@ node_health_check(rabbit_node_monitor) ->
end;
node_health_check(alarms) ->
- case proplists:get_value(alarms, rabbit:status()) of
+ % Note:
+ % Removed call to rabbit:status/0 here due to a memory leak on win32,
+ % plus it uses an excessive amount of resources
+ % Alternative to https://github.com/rabbitmq/rabbitmq-server/pull/3893
+ case rabbit:alarms() of
[] ->
ok;
Alarms ->
diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl
index e93d81dc47..710ce1b65e 100644
--- a/deps/rabbit/src/rabbit_osiris_metrics.erl
+++ b/deps/rabbit/src/rabbit_osiris_metrics.erl
@@ -78,6 +78,8 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
%% down `rabbit_sup` and the whole `rabbit` app.
[]
end,
+
+
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, COffs},
diff --git a/deps/rabbit/src/rabbit_policies.erl b/deps/rabbit/src/rabbit_policies.erl
index 062635c5b4..e23d12d81a 100644
--- a/deps/rabbit/src/rabbit_policies.erl
+++ b/deps/rabbit/src/rabbit_policies.erl
@@ -176,4 +176,6 @@ merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
-merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
+merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal);
+%% use operator policy value for booleans
+merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal.
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 5c23fbb51d..f88822a54f 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -846,18 +846,22 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% This can happen during recovery
%% we need to re-initialise the queue record
%% if the stream id is a match
- [Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
- case amqqueue:get_type_state(Q) of
- #{name := S} when S == StreamId ->
- rabbit_log:debug("~s: initializing queue record for stream id ~s",
- [?MODULE, StreamId]),
- _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ case mnesia:dirty_read(rabbit_durable_queue, QName) of
+ [] ->
+ %% queue not found at all, it must have been deleted
ok;
- _ ->
- ok
- end,
-
- send_self_command({mnesia_updated, StreamId, Args});
+ [Q] ->
+ case amqqueue:get_type_state(Q) of
+ #{name := S} when S == StreamId ->
+ rabbit_log:debug("~s: initializing queue record for stream id ~s",
+ [?MODULE, StreamId]),
+ _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ ok;
+ _ ->
+ ok
+ end,
+ send_self_command({mnesia_updated, StreamId, Args})
+ end;
_ ->
send_self_command({mnesia_updated, StreamId, Args})
catch _:E ->
diff --git a/deps/rabbit/test/definition_import_SUITE.erl b/deps/rabbit/test/definition_import_SUITE.erl
index d893862daf..ba3cb979d1 100644
--- a/deps/rabbit/test/definition_import_SUITE.erl
+++ b/deps/rabbit/test/definition_import_SUITE.erl
@@ -46,7 +46,10 @@ groups() ->
import_case13,
import_case14,
import_case15,
- import_case16
+ import_case16,
+ import_case17,
+ import_case18,
+ import_case19
]},
{boot_time_import_using_classic_source, [], [
@@ -236,6 +239,36 @@ import_case16(Config) ->
{skip, "Should not run in mixed version environments"}
end.
+import_case17(Config) -> import_invalid_file_case(Config, "failing_case17").
+
+import_case18(Config) ->
+ case rabbit_ct_helpers:is_mixed_versions() of
+ false ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, user_limits) of
+ ok ->
+ import_file_case(Config, "case18"),
+ User = <<"limited_guest">>,
+ UserIsImported =
+ fun () ->
+ case user_lookup(Config, User) of
+ {error, not_found} -> false;
+ _ -> true
+ end
+ end,
+ rabbit_ct_helpers:await_condition(UserIsImported, 20000),
+ {ok, UserRec} = user_lookup(Config, User),
+ ?assertEqual(#{<<"max-connections">> => 2}, internal_user:get_limits(UserRec)),
+ ok;
+ Skip ->
+ Skip
+ end;
+ _ ->
+ %% skip the test in mixed version mode
+ {skip, "Should not run in mixed version environments"}
+ end.
+
+import_case19(Config) -> import_invalid_file_case(Config, "failing_case19").
+
export_import_round_trip_case1(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false ->
@@ -382,3 +415,6 @@ queue_lookup(Config, VHost, Name) ->
vhost_lookup(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]).
+
+user_lookup(Config, User) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, lookup_user, [User]).
diff --git a/deps/rabbit/test/definition_import_SUITE_data/case18.json b/deps/rabbit/test/definition_import_SUITE_data/case18.json
new file mode 100644
index 0000000000..9e0f755beb
--- /dev/null
+++ b/deps/rabbit/test/definition_import_SUITE_data/case18.json
@@ -0,0 +1,46 @@
+{
+ "bindings": [],
+ "exchanges": [],
+ "global_parameters": [
+ {
+ "name": "cluster_name",
+ "value": "rabbitmq@localhost"
+ }
+ ],
+ "parameters": [],
+ "permissions": [
+ {
+ "configure": ".*",
+ "read": ".*",
+ "user": "guest",
+ "vhost": "/",
+ "write": ".*"
+ }
+ ],
+ "policies": [],
+ "queues": [],
+ "rabbit_version": "3.9.1",
+ "rabbitmq_version": "3.9.1",
+ "topic_permissions": [],
+ "users": [
+ {
+ "hashing_algorithm": "rabbit_password_hashing_sha256",
+ "limits": {"max-connections" : 2},
+ "name": "limited_guest",
+ "password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6",
+ "tags": [
+ "administrator"
+ ]
+ }
+ ],
+ "vhosts": [
+ {
+ "limits": [],
+ "name": "/"
+ },
+ {
+ "limits": [],
+ "name": "tagged"
+ }
+ ]
+}
diff --git a/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json b/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json
new file mode 100644
index 0000000000..4776408833
--- /dev/null
+++ b/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json
@@ -0,0 +1,19 @@
+{
+ "vhosts": [
+ {
+ "name": "\/"
+ }
+ ],
+ "policies": [
+ {
+ "vhost": "\/",
+ "pattern": "^project-nd-ns-",
+ "apply-to": "queues",
+ "definition": {
+ "expires": 120000,
+ "max-length": 10000
+ },
+ "priority": 1
+ }
+ ]
+}
diff --git a/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json b/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json
new file mode 100644
index 0000000000..ab9d355538
--- /dev/null
+++ b/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json
@@ -0,0 +1,46 @@
+{
+ "bindings": [],
+ "exchanges": [],
+ "global_parameters": [
+ {
+ "name": "cluster_name",
+ "value": "rabbitmq@localhost"
+ }
+ ],
+ "parameters": [],
+ "permissions": [
+ {
+ "configure": ".*",
+ "read": ".*",
+ "user": "guest",
+ "vhost": "/",
+ "write": ".*"
+ }
+ ],
+ "policies": [],
+ "queues": [],
+ "rabbit_version": "3.9.1",
+ "rabbitmq_version": "3.9.1",
+ "topic_permissions": [],
+ "users": [
+ {
+ "hashing_algorithm": "rabbit_password_hashing_sha256",
+ "limits": {"max-connections" : "twomincepies"},
+ "name": "limited_guest",
+ "password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6",
+ "tags": [
+ "administrator"
+ ]
+ }
+ ],
+ "vhosts": [
+ {
+ "limits": [],
+ "name": "/"
+ },
+ {
+ "limits": [],
+ "name": "tagged"
+ }
+ ]
+}
diff --git a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl
index 30bc5f8ba6..6d2e515da3 100644
--- a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl
+++ b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl
@@ -29,6 +29,7 @@ groups() ->
confirm_nowait,
confirm_ack,
confirm_acks,
+ confirm_after_mandatory_bug,
confirm_mandatory_unroutable,
confirm_unroutable_message],
[
@@ -187,6 +188,17 @@ confirm_acks(Config) ->
publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>, <<"msg4">>]),
receive_many(lists:seq(1, 4)).
+confirm_after_mandatory_bug(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName,
+ mandatory = true}, #amqp_msg{payload = <<"msg1">>}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ publish(Ch, QName, [<<"msg2">>]),
+ true = amqp_channel:wait_for_confirms(Ch, 1),
+ ok.
+
%% For unroutable messages, the broker will issue a confirm once the exchange verifies a message
%% won't route to any queue (returns an empty list of queues).
%% If the message is also published as mandatory, the basic.return is sent to the client before
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 9cb401fc8e..cf853bca98 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -11,7 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
--include("src/rabbit_fifo.hrl").
+-include_lib("rabbit/src/rabbit_fifo.hrl").
%%%===================================================================
%%% Common Test callbacks
diff --git a/deps/rabbit/test/unit_disk_monitor_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_SUITE.erl
index 71393d5d35..7a746349ca 100644
--- a/deps/rabbit/test/unit_disk_monitor_SUITE.erl
+++ b/deps/rabbit/test/unit_disk_monitor_SUITE.erl
@@ -67,6 +67,12 @@ set_disk_free_limit_command(Config) ->
?MODULE, set_disk_free_limit_command1, [Config]).
set_disk_free_limit_command1(_Config) ->
+ F = fun () ->
+ DiskFree = rabbit_disk_monitor:get_disk_free(),
+ DiskFree =/= unknown
+ end,
+ rabbit_ct_helpers:await_condition(F),
+
%% Use an integer
rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1}),
disk_free_limit_to_total_memory_ratio_is(1),
@@ -84,7 +90,8 @@ set_disk_free_limit_command1(_Config) ->
passed.
disk_free_limit_to_total_memory_ratio_is(MemRatio) ->
+ DiskFreeLimit = rabbit_disk_monitor:get_disk_free_limit(),
ExpectedLimit = MemRatio * vm_memory_monitor:get_total_memory(),
% Total memory is unstable, so checking order
- true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2,
- true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98.
+ true = ExpectedLimit/DiskFreeLimit < 1.2,
+ true = ExpectedLimit/DiskFreeLimit > 0.98.
diff --git a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl
index 80fa3d2e09..ae16cbb379 100644
--- a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl
+++ b/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl
@@ -88,7 +88,7 @@ disk_monitor_enable1(_Config) ->
application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
- undefined = rabbit_disk_monitor:get_disk_free(),
+ unknown = rabbit_disk_monitor:get_disk_free(),
Cmd = case os:type() of
{win32, _} -> " Le volume dans le lecteur C n’a pas de nom.\n"
" Le numéro de série du volume est 707D-5BDC\n"
diff --git a/deps/rabbit/test/unit_operator_policy_SUITE.erl b/deps/rabbit/test/unit_operator_policy_SUITE.erl
index 804da2d1c9..dedd6c82af 100644
--- a/deps/rabbit/test/unit_operator_policy_SUITE.erl
+++ b/deps/rabbit/test/unit_operator_policy_SUITE.erl
@@ -21,7 +21,8 @@ all() ->
groups() ->
[
{parallel_tests, [parallel], [
- merge_operator_policy_definitions
+ merge_operator_policy_definitions,
+ conflict_resolution_for_booleans
]}
].
@@ -102,6 +103,54 @@ merge_operator_policy_definitions(_Config) ->
[{definition, [
{<<"message-ttl">>, 3000}
]}])
- ),
+ ).
+
- passed.
+ conflict_resolution_for_booleans(_Config) ->
+ ?assertEqual(
+ [
+ {<<"remote-dc-replicate">>, true}
+ ],
+ rabbit_policy:merge_operator_definitions(
+ #{definition => #{
+ <<"remote-dc-replicate">> => true
+ }},
+ [{definition, [
+ {<<"remote-dc-replicate">>, true}
+ ]}])),
+
+ ?assertEqual(
+ [
+ {<<"remote-dc-replicate">>, false}
+ ],
+ rabbit_policy:merge_operator_definitions(
+ #{definition => #{
+ <<"remote-dc-replicate">> => false
+ }},
+ [{definition, [
+ {<<"remote-dc-replicate">>, false}
+ ]}])),
+
+ ?assertEqual(
+ [
+ {<<"remote-dc-replicate">>, true}
+ ],
+ rabbit_policy:merge_operator_definitions(
+ #{definition => #{
+ <<"remote-dc-replicate">> => false
+ }},
+ [{definition, [
+ {<<"remote-dc-replicate">>, true}
+ ]}])),
+
+ ?assertEqual(
+ [
+ {<<"remote-dc-replicate">>, false}
+ ],
+ rabbit_policy:merge_operator_definitions(
+ #{definition => #{
+ <<"remote-dc-replicate">> => true
+ }},
+ [{definition, [
+ {<<"remote-dc-replicate">>, false}
+ ]}])). \ No newline at end of file
diff --git a/deps/rabbitmq_auth_backend_oauth2/README.md b/deps/rabbitmq_auth_backend_oauth2/README.md
index bb698ecc09..b923a47fb6 100644
--- a/deps/rabbitmq_auth_backend_oauth2/README.md
+++ b/deps/rabbitmq_auth_backend_oauth2/README.md
@@ -139,6 +139,46 @@ In that case, the configuration will look like this:
NOTE: `jwks_url` takes precedence over `signing_keys` if both are provided.
+### Variables Configurable in rabbitmq.conf
+
+| Key | Documentation
+|------------------------------------------|-----------
+| `auth_oauth2.resource_server_id` | [The Resource Server ID](#resource-server-id-and-scope-prefixes)
+| `auth_oauth2.additional_scopes_key` | Configure the plugin to also look in other fields (maps to `additional_rabbitmq_scopes` in the old format).
+| `auth_oauth2.default_key` | ID of the default signing key.
+| `auth_oauth2.signing_keys` | Paths to signing key files.
+| `auth_oauth2.jwks_url` | The URL of key server. According to the [JWT Specification](https://datatracker.ietf.org/doc/html/rfc7515#section-4.1.2) key server URL must be https.
+| `auth_oauth2.https.cacertfile` | Path to a file containing PEM-encoded CA certificates. The CA certificates are used during key server [peer verification](https://rabbitmq.com/ssl.html#peer-verification).
+| `auth_oauth2.https.depth` | The maximum number of non-self-issued intermediate certificates that may follow the peer certificate in a valid [certification path](https://rabbitmq.com/ssl.html#peer-verification-depth). Default is 10.
+| `auth_oauth2.https.peer_verification` | Should [peer verification](https://rabbitmq.com/ssl.html#peer-verification) be enabled. Available values: `verify_none`, `verify_peer`. Default is `verify_none`. It is recommended to configure `verify_peer`. Peer verification requires a certain amount of setup and is more secure.
+| `auth_oauth2.https.fail_if_no_peer_cert` | Used together with `auth_oauth2.https.peer_verification = verify_peer`. When set to `true`, TLS connection will be rejected if client fails to provide a certificate. Default is `false`.
+| `auth_oauth2.https.hostname_verification`| Enable wildcard-aware hostname verification for key server. Available values: `wildcard`, `none`. Default is `none`.
+| `auth_oauth2.algorithms` | Restrict [the usable algorithms](https://github.com/potatosalad/erlang-jose#algorithm-support).
+
+For example:
+
+Configure with key files
+```
+auth_oauth2.resource_server_id = new_resource_server_id
+auth_oauth2.additional_scopes_key = my_custom_scope_key
+auth_oauth2.default_key = id1
+auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem
+auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem
+auth_oauth2.algorithms.1 = HS256
+auth_oauth2.algorithms.2 = RS256
+```
+Configure with key server
+```
+auth_oauth2.resource_server_id = new_resource_server_id
+auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json
+auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
+auth_oauth2.https.peer_verification = verify_peer
+auth_oauth2.https.depth = 5
+auth_oauth2.https.fail_if_no_peer_cert = true
+auth_oauth2.https.hostname_verification = wildcard
+auth_oauth2.algorithms.1 = HS256
+auth_oauth2.algorithms.2 = RS256
+```
### Resource Server ID and Scope Prefixes
OAuth 2.0 (and thus UAA-provided) tokens use scopes to communicate what set of permissions particular
diff --git a/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema b/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema
index 0feb73d9aa..1c8593e434 100644
--- a/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema
+++ b/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema
@@ -77,3 +77,52 @@
end, Settings),
maps:from_list(SigningKeys)
end}.
+
+{mapping,
+ "auth_oauth2.jwks_url",
+ "rabbitmq_auth_backend_oauth2.key_config.jwks_url",
+ [{datatype, string}, {validators, ["uri", "https_uri"]}]}.
+
+{mapping,
+ "auth_oauth2.https.peer_verification",
+ "rabbitmq_auth_backend_oauth2.key_config.peer_verification",
+ [{datatype, {enum, [verify_peer, verify_none]}}]}.
+
+{mapping,
+ "auth_oauth2.https.cacertfile",
+ "rabbitmq_auth_backend_oauth2.key_config.cacertfile",
+ [{datatype, file}, {validators, ["file_accessible"]}]}.
+
+{mapping,
+ "auth_oauth2.https.depth",
+ "rabbitmq_auth_backend_oauth2.key_config.depth",
+ [{datatype, integer}]}.
+
+{mapping,
+ "auth_oauth2.https.hostname_verification",
+ "rabbitmq_auth_backend_oauth2.key_config.hostname_verification",
+ [{datatype, {enum, [wildcard, none]}}]}.
+
+{mapping,
+ "auth_oauth2.https.crl_check",
+ "rabbitmq_auth_backend_oauth2.key_config.crl_check",
+ [{datatype, {enum, [true, false, peer, best_effort]}}]}.
+
+{mapping,
+ "auth_oauth2.https.fail_if_no_peer_cert",
+ "rabbitmq_auth_backend_oauth2.key_config.fail_if_no_peer_cert",
+ [{datatype, {enum, [true, false]}}]}.
+
+{validator, "https_uri", "According to the JWT Specification, Key Server URL must be https.",
+ fun(Uri) -> string:nth_lexeme(Uri, 1, "://") == "https" end}.
+
+{mapping,
+ "auth_oauth2.algorithms.$algorithm",
+ "rabbitmq_auth_backend_oauth2.key_config.algorithms",
+ [{datatype, string}]}.
+
+{translation, "rabbitmq_auth_backend_oauth2.key_config.algorithms",
+ fun(Conf) ->
+ Settings = cuttlefish_variable:filter_by_prefix("auth_oauth2.algorithms", Conf),
+ [list_to_binary(V) || {_, V} <- Settings]
+ end}.
diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl
new file mode 100644
index 0000000000..d34d9d5d99
--- /dev/null
+++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl
@@ -0,0 +1,27 @@
+-module(uaa_jwks).
+-export([get/1]).
+
+-spec get(string() | binary()) -> {ok, term()} | {error, term()}.
+get(JwksUrl) ->
+ httpc:request(get, {JwksUrl, []}, [{ssl, ssl_options()}, {timeout, 60000}], []).
+
+-spec ssl_options() -> list().
+ssl_options() ->
+ UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []),
+ PeerVerification = proplists:get_value(peer_verification, UaaEnv, verify_none),
+ CaCertFile = proplists:get_value(cacertfile, UaaEnv),
+ Depth = proplists:get_value(depth, UaaEnv, 10),
+ FailIfNoPeerCert = proplists:get_value(fail_if_no_peer_cert, UaaEnv, false),
+ CrlCheck = proplists:get_value(crl_check, UaaEnv, false),
+ SslOpts0 = [{verify, PeerVerification},
+ {cacertfile, CaCertFile},
+ {depth, Depth},
+ {fail_if_no_peer_cert, FailIfNoPeerCert},
+ {crl_check, CrlCheck},
+ {crl_cache, {ssl_crl_cache, {internal, [{http, 10000}]}}}],
+ case proplists:get_value(hostname_verification, UaaEnv, none) of
+ wildcard ->
+ [{customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} | SslOpts0];
+ none ->
+ SslOpts0
+ end. \ No newline at end of file
diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl
index c004c6070c..6721bbf5e7 100644
--- a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl
+++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl
@@ -58,7 +58,7 @@ update_jwks_signing_keys() ->
undefined ->
{error, no_jwks_url};
JwksUrl ->
- case httpc:request(JwksUrl) of
+ case uaa_jwks:get(JwksUrl) of
{ok, {_, _, JwksBody}} ->
KeyList = maps:get(<<"keys">>, jose:decode(erlang:iolist_to_binary(JwksBody)), []),
Keys = maps:from_list(lists:map(fun(Key) -> {maps:get(<<"kid">>, Key, undefined), {json, Key}} end, KeyList)),
diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl
index bb73cf9ae4..aa1cdd8241 100644
--- a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl
+++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl
@@ -24,7 +24,15 @@ decode(Token) ->
end.
decode_and_verify(Jwk, Token) ->
- case jose_jwt:verify(Jwk, Token) of
+ UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []),
+ Verify =
+ case proplists:get_value(algorithms, UaaEnv) of
+ undefined ->
+ jose_jwt:verify(Jwk, Token);
+ Algs ->
+ jose_jwt:verify_strict(Jwk, Algs, Token)
+ end,
+ case Verify of
{true, #jose_jwt{fields = Fields}, _} -> {true, Fields};
{false, #jose_jwt{fields = Fields}, _} -> {false, Fields}
end.
diff --git a/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets b/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets
index 2b7018fdd8..27976d3abc 100644
--- a/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets
+++ b/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets
@@ -4,7 +4,16 @@
auth_oauth2.additional_scopes_key = my_custom_scope_key
auth_oauth2.default_key = id1
auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem
- auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem",
+ auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem
+ auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json
+ auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
+ auth_oauth2.https.peer_verification = verify_none
+ auth_oauth2.https.depth = 5
+ auth_oauth2.https.fail_if_no_peer_cert = false
+ auth_oauth2.https.hostname_verification = wildcard
+ auth_oauth2.https.crl_check = true
+ auth_oauth2.algorithms.1 = HS256
+ auth_oauth2.algorithms.2 = RS256",
[
{rabbitmq_auth_backend_oauth2, [
{resource_server_id,<<"new_resource_server_id">>},
@@ -16,7 +25,15 @@
<<"id1">> => {pem, <<"I'm not a certificate">>},
<<"id2">> => {pem, <<"I'm not a certificate">>}
}
- }
+ },
+ {jwks_url, "https://my-jwt-issuer/jwks.json"},
+ {cacertfile, "test/config_schema_SUITE_data/certs/cacert.pem"},
+ {peer_verification, verify_none},
+ {depth, 5},
+ {fail_if_no_peer_cert, false},
+ {hostname_verification, wildcard},
+ {crl_check, true},
+ {algorithms, [<<"HS256">>, <<"RS256">>]}
]
}
]}
diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl
index 4823a68e27..2ae29a2761 100644
--- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl
+++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl
@@ -21,7 +21,9 @@
all() ->
[
{group, happy_path},
- {group, unhappy_path}
+ {group, unhappy_path},
+ {group, unvalidated_jwks_server},
+ {group, no_peer_verification}
].
groups() ->
@@ -34,6 +36,7 @@ groups() ->
test_successful_connection_with_complex_claim_as_a_list,
test_successful_connection_with_complex_claim_as_a_binary,
test_successful_connection_with_keycloak_token,
+ test_successful_connection_with_algorithm_restriction,
test_successful_token_refresh
]},
{unhappy_path, [], [
@@ -41,9 +44,12 @@ groups() ->
test_failed_connection_with_a_non_token,
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
test_failed_connection_with_a_token_with_insufficient_resource_permission,
+ test_failed_connection_with_algorithm_restriction,
test_failed_token_refresh_case1,
test_failed_token_refresh_case2
- ]}
+ ]},
+ {unvalidated_jwks_server, [], [test_failed_connection_with_unvalidated_jwks_server]},
+ {no_peer_verification, [], [{group, happy_path}, {group, unhappy_path}]}
].
%%
@@ -69,23 +75,35 @@ end_per_suite(Config) ->
fun stop_jwks_server/1
] ++ rabbit_ct_broker_helpers:teardown_steps()).
+init_per_group(no_peer_verification, Config) ->
+ add_vhosts(Config),
+ KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(non_strict_jwks_url, Config)}, {peer_verification, verify_none}]),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
+ rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig});
init_per_group(_Group, Config) ->
- %% The broker is managed by {init,end}_per_testcase().
- lists:foreach(fun(Value) ->
- rabbit_ct_broker_helpers:add_vhost(Config, Value)
- end,
- [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]),
+ add_vhosts(Config),
Config.
+end_per_group(no_peer_verification, Config) ->
+ delete_vhosts(Config),
+ KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(strict_jwks_url, Config)}, {peer_verification, verify_peer}]),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
+ rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig});
+
end_per_group(_Group, Config) ->
- %% The broker is managed by {init,end}_per_testcase().
- lists:foreach(fun(Value) ->
- rabbit_ct_broker_helpers:delete_vhost(Config, Value)
- end,
- [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]),
+ delete_vhosts(Config),
Config.
+add_vhosts(Config) ->
+ %% The broker is managed by {init,end}_per_testcase().
+ lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:add_vhost(Config, Value) end,
+ [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]).
+
+delete_vhosts(Config) ->
+ %% The broker is managed by {init,end}_per_testcase().
+ lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:delete_vhost(Config, Value) end,
+ [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]).
init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_a_full_permission_token_and_explicitly_configured_vhost orelse
Testcase =:= test_successful_token_refresh ->
@@ -107,6 +125,24 @@ init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
+init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction ->
+ KeyConfig = ?config(key_config, Config),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"HS256">>]} | KeyConfig]]),
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config;
+
+init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_algorithm_restriction ->
+ KeyConfig = ?config(key_config, Config),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"RS256">>]} | KeyConfig]]),
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config;
+
+init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_unvalidated_jwks_server ->
+ KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), {jwks_url, ?config(non_strict_jwks_url, Config)}),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config;
+
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config.
@@ -126,6 +162,14 @@ end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config;
+end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction orelse
+ Testcase =:= test_failed_connection_with_algorithm_restriction orelse
+ Testcase =:= test_failed_connection_with_unvalidated_jwks_server ->
+ rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, ?config(key_config, Config)]),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase),
+ Config;
+
end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>),
rabbit_ct_helpers:testcase_finished(Config, Testcase),
@@ -143,13 +187,27 @@ start_jwks_server(Config) ->
%% Assume we don't have more than 100 ports allocated for tests
PortBase = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_ports_base),
JwksServerPort = PortBase + 100,
+
+ %% Both URLs direct to the same JWKS server
+ %% The NonStrictJwksUrl identity cannot be validated while StrictJwksUrl identity can be validated
+ NonStrictJwksUrl = "https://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks",
+ StrictJwksUrl = "https://localhost:" ++ integer_to_list(JwksServerPort) ++ "/jwks",
+
ok = application:set_env(jwks_http, keys, [Jwk]),
+ {ok, _} = application:ensure_all_started(ssl),
{ok, _} = application:ensure_all_started(cowboy),
- ok = jwks_http_app:start(JwksServerPort),
- KeyConfig = [{jwks_url, "http://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks"}],
+ CertsDir = ?config(rmq_certsdir, Config),
+ ok = jwks_http_app:start(JwksServerPort, CertsDir),
+ KeyConfig = [{jwks_url, StrictJwksUrl},
+ {peer_verification, verify_peer},
+ {cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbitmq_auth_backend_oauth2, key_config, KeyConfig]),
- rabbit_ct_helpers:set_config(Config, {fixture_jwk, Jwk}).
+ rabbit_ct_helpers:set_config(Config,
+ [{non_strict_jwks_url, NonStrictJwksUrl},
+ {strict_jwks_url, StrictJwksUrl},
+ {key_config, KeyConfig},
+ {fixture_jwk, Jwk}]).
stop_jwks_server(Config) ->
ok = jwks_http_app:stop(),
@@ -305,7 +363,7 @@ test_successful_token_refresh(Config) ->
Conn = open_unmanaged_connection(Config, 0, <<"vhost1">>, <<"username">>, Token),
{ok, Ch} = amqp_connection:open_channel(Conn),
- {_Algo, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
+ {_Algo2, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
<<"rabbitmq.write:vhost1/*">>,
<<"rabbitmq.read:vhost1/*">>]),
?UTIL_MOD:wait_for_token_to_expire(timer:seconds(Duration)),
@@ -321,6 +379,13 @@ test_successful_token_refresh(Config) ->
amqp_channel:close(Ch2),
close_connection_and_channel(Conn, Ch).
+test_successful_connection_with_algorithm_restriction(Config) ->
+ {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
+ Conn = open_unmanaged_connection(Config, 0, <<"username">>, Token),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
+ close_connection_and_channel(Conn, Ch).
test_failed_connection_with_expired_token(Config) ->
{_Algo, Token} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost1/*">>,
@@ -359,7 +424,7 @@ test_failed_token_refresh_case1(Config) ->
#'queue.declare_ok'{queue = _} =
amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
- {_Algo, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>,
+ {_Algo2, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>,
<<"rabbitmq.write:vhost4/*">>,
<<"rabbitmq.read:vhost4/*">>]),
%% the error is communicated asynchronously via a connection-level error
@@ -387,3 +452,13 @@ test_failed_token_refresh_case2(Config) ->
amqp_connection:open_channel(Conn)),
close_connection(Conn).
+
+test_failed_connection_with_algorithm_restriction(Config) ->
+ {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
+ ?assertMatch({error, {auth_failure, _}},
+ open_unmanaged_connection(Config, 0, <<"username">>, Token)).
+
+test_failed_connection_with_unvalidated_jwks_server(Config) ->
+ {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt),
+ ?assertMatch({error, {auth_failure, _}},
+ open_unmanaged_connection(Config, 0, <<"username">>, Token)).
diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl
index 16353e34f4..c745e436f6 100644
--- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl
+++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl
@@ -1,8 +1,8 @@
-module(jwks_http_app).
--export([start/1, stop/0]).
+-export([start/2, stop/0]).
-start(Port) ->
+start(Port, CertsDir) ->
Dispatch =
cowboy_router:compile(
[
@@ -11,8 +11,10 @@ start(Port) ->
]}
]
),
- {ok, _} = cowboy:start_clear(jwks_http_listener,
- [{port, Port}],
+ {ok, _} = cowboy:start_tls(jwks_http_listener,
+ [{port, Port},
+ {certfile, filename:join([CertsDir, "server", "cert.pem"])},
+ {keyfile, filename:join([CertsDir, "server", "key.pem"])}],
#{env => #{dispatch => Dispatch}}),
ok.
diff --git a/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl b/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl
index e586ffca8c..80b6648352 100644
--- a/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl
+++ b/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl
@@ -73,7 +73,7 @@ expired_token_with_scopes(Scopes) ->
token_with_scopes_and_expiration(Scopes, os:system_time(seconds) - 10).
fixture_token_with_scopes(Scopes) ->
- token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 10).
+ token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 30).
token_with_scopes_and_expiration(Scopes, Expiration) ->
%% expiration is a timestamp with precision in seconds
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 3ff764a808..b8f4fdc923 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -87,7 +87,9 @@ delete_super_stream(VirtualHost, Name, Username) ->
gen_server:call(?MODULE,
{delete_super_stream, VirtualHost, Name, Username}).
--spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
+-spec lookup_leader(binary(), binary()) ->
+ {ok, pid()} | {error, not_available} |
+ {error, not_found}.
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
@@ -294,20 +296,25 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
LeaderPid = amqqueue:get_pid(Q),
case process_alive(LeaderPid) of
true ->
- LeaderPid;
+ {ok, LeaderPid};
false ->
case leader_from_members(Q) of
{ok, Pid} ->
- Pid;
+ {ok, Pid};
_ ->
- cluster_not_found
+ {error, not_available}
end
end;
_ ->
- cluster_not_found
+ {error, not_found}
end;
- _ ->
- cluster_not_found
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
+ end
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 046d59463b..9c29090f4a 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1494,7 +1494,7 @@ handle_frame_post_auth(Transport,
of
{false, false} ->
case lookup_leader(Stream, Connection0) of
- cluster_not_found ->
+ {error, not_found} ->
response(Transport,
Connection0,
declare_publisher,
@@ -1504,6 +1504,16 @@ handle_frame_post_auth(Transport,
?STREAM_DOES_NOT_EXIST,
1),
{Connection0, State};
+ {error, not_available} ->
+ response(Transport,
+ Connection0,
+ declare_publisher,
+ CorrelationId,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE),
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_NOT_AVAILABLE,
+ 1),
+ {Connection0, State};
{ClusterLeader,
#stream_connection{publishers = Publishers0,
publisher_to_ids = RefIds0} =
@@ -1960,9 +1970,9 @@ handle_frame_post_auth(_Transport,
of
ok ->
case lookup_leader(Stream, Connection) of
- cluster_not_found ->
- rabbit_log:warning("Could not find leader to store offset on ~p",
- [Stream]),
+ {error, Error} ->
+ rabbit_log:warning("Could not find leader to store offset on ~p: ~p",
+ [Stream, Error]),
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
{Connection, State};
{ClusterLeader, Connection1} ->
@@ -1992,11 +2002,16 @@ handle_frame_post_auth(Transport,
of
ok ->
case lookup_leader(Stream, Connection0) of
- cluster_not_found ->
+ {error, not_found} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_DOES_NOT_EXIST,
1),
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0};
+ {error, not_available} ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_NOT_AVAILABLE,
+ 1),
+ {?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0};
{LeaderPid, C} ->
{RC, O} =
case osiris:read_tracking(LeaderPid, Reference) of
@@ -2532,9 +2547,9 @@ lookup_leader(Stream,
case maps:get(Stream, StreamLeaders, undefined) of
undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of
- cluster_not_found ->
- cluster_not_found;
- LeaderPid ->
+ {error, Error} ->
+ {error, Error};
+ {ok, LeaderPid} ->
Connection1 =
maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid,
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
index 5d008958dd..7dcf3ea494 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
@@ -27,9 +27,9 @@
<properties>
<stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version>
- <junit.jupiter.version>5.8.1</junit.jupiter.version>
+ <junit.jupiter.version>5.8.2</junit.jupiter.version>
<assertj.version>3.21.0</assertj.version>
- <logback.version>1.2.6</logback.version>
+ <logback.version>1.2.7</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.2.0</spotless.version>
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
index 8903ee0c62..6ecf2b4ae4 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
@@ -16,6 +16,9 @@
package com.rabbitmq.stream;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode;
import static org.assertj.core.api.Assertions.assertThat;
import com.rabbitmq.stream.impl.Client;
@@ -40,8 +43,7 @@ public class ClusterSizeTest {
String s = UUID.randomUUID().toString();
Response response =
client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize));
- assertThat(response.isOk()).isFalse();
- assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
}
@ParameterizedTest
@@ -53,7 +55,7 @@ public class ClusterSizeTest {
try {
Response response =
client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize));
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
StreamMetadata metadata = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index b806471207..bf47ad01ee 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -16,11 +16,16 @@
package com.rabbitmq.stream;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
+import static com.rabbitmq.stream.TestUtils.waitAtMost;
+import static com.rabbitmq.stream.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
+import com.rabbitmq.stream.impl.Client.Response;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
@@ -66,7 +71,7 @@ public class FailureTest {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
- TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
+ waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
streamMetadata = client.metadata(stream).get(stream);
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
@@ -107,7 +112,7 @@ public class FailureTest {
assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
// wait until there's a new leader
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
@@ -133,7 +138,7 @@ public class FailureTest {
}
// wait until all the replicas are there
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(10),
() -> {
LOGGER.info("Getting metadata for {}", stream);
@@ -164,7 +169,7 @@ public class FailureTest {
consumeLatch.countDown();
}));
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.Response response =
@@ -219,7 +224,7 @@ public class FailureTest {
cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until there's a new leader
try {
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream);
@@ -314,7 +319,7 @@ public class FailureTest {
Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until all the replicas are there
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
@@ -350,7 +355,7 @@ public class FailureTest {
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(generations).hasSize(2).contains(0L, 1L);
@@ -372,8 +377,7 @@ public class FailureTest {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
- TestUtils.waitUntil(
- () -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
+ waitUntil(() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
metadata = metadataClient.metadata(stream);
streamMetadata = metadata.get(stream);
@@ -497,7 +501,7 @@ public class FailureTest {
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
// let's publish for a bit of time
Thread.sleep(2000);
@@ -521,7 +525,7 @@ public class FailureTest {
confirmedCount = confirmed.size();
// wait until all the replicas are there
- TestUtils.waitAtMost(
+ waitAtMost(
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
@@ -535,9 +539,9 @@ public class FailureTest {
keepPublishing.set(false);
- assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
+ waitAtMost(Duration.ofSeconds(10), () -> consumed.size() >= confirmed.size());
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
@@ -551,4 +555,33 @@ public class FailureTest {
confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
}
+
+ @Test
+ void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Exception {
+ try {
+ Host.rabbitmqctl("stop_app");
+ } finally {
+ Host.rabbitmqctl("start_app");
+ }
+ AtomicReference<Client> client = new AtomicReference<>();
+ waitUntil(
+ () -> {
+ try {
+ client.set(cf.get(new ClientParameters().port(TestUtils.streamPortNode1())));
+ } catch (Exception e) {
+
+ }
+ return client.get() != null;
+ });
+ Set<Short> responseCodes = ConcurrentHashMap.newKeySet();
+
+ waitUntil(
+ () -> {
+ Response response = client.get().declarePublisher((byte) 0, null, stream);
+ responseCodes.add(response.getResponseCode());
+ return response.isOk();
+ });
+
+ assertThat(responseCodes).doesNotContain(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
+ }
}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
index 104f5772c1..7d387c09c2 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
@@ -16,6 +16,9 @@
package com.rabbitmq.stream;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -47,8 +50,7 @@ public class LeaderLocatorTest {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
String s = UUID.randomUUID().toString();
Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo"));
- assertThat(response.isOk()).isFalse();
- assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
}
@Test
@@ -60,7 +62,7 @@ public class LeaderLocatorTest {
try {
Response response =
client.create(s, Collections.singletonMap("queue-leader-locator", "client-local"));
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
StreamMetadata metadata = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
@@ -136,7 +138,7 @@ public class LeaderLocatorTest {
Response response =
client.create(
s, Collections.singletonMap("queue-leader-locator", "least-leaders"));
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
createdStreams.add(s);
});
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
index e0a5afee67..03015a0c44 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
@@ -16,11 +16,13 @@
package com.rabbitmq.stream;
+import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Response;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
@@ -30,6 +32,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
+import org.assertj.core.api.Condition;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.*;
@@ -106,7 +109,7 @@ public class TestUtils {
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.create(stream);
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
client.close();
store(context).put("testMethodStream", stream);
} catch (NoSuchFieldException e) {
@@ -136,7 +139,7 @@ public class TestUtils {
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.delete(stream);
- assertThat(response.isOk()).isTrue();
+ assertThat(response).is(ok());
client.close();
store(context).remove("testMethodStream");
} catch (NoSuchFieldException e) {
@@ -197,4 +200,22 @@ public class TestUtils {
}
}
}
+
+ static class ResponseConditions {
+
+ static Condition<Response> ok() {
+ return new Condition<>(Response::isOk, "Response should be OK");
+ }
+
+ static Condition<Response> ko() {
+ return new Condition<>(response -> !response.isOk(), "Response should be OK");
+ }
+
+ static Condition<Response> responseCode(short expectedResponse) {
+ return new Condition<>(
+ response -> response.getResponseCode() == expectedResponse,
+ "response code %d",
+ expectedResponse);
+ }
+ }
}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
index f9402c2b4b..397b9f6d53 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -16,7 +16,7 @@ all() ->
[{group, non_parallel_tests}].
groups() ->
- [{non_parallel_tests, [], [manage_super_stream]}].
+ [{non_parallel_tests, [], [manage_super_stream, lookup_leader]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@@ -71,6 +71,17 @@ end_per_testcase(Testcase, Config) ->
%% Testcases.
%% -------------------------------------------------------------------
+lookup_leader(Config) ->
+ Stream = <<"stream_manager_lookup_leader_stream">>,
+ ?assertMatch({ok, _}, create_stream(Config, Stream)),
+
+ {ok, Pid} = lookup_leader(Config, Stream),
+ ?assert(is_pid(Pid)),
+
+ ?assertEqual({error, not_found}, lookup_leader(Config, <<"foo">>)),
+
+ ?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
+
manage_super_stream(Config) ->
% create super stream
?assertEqual(ok,
@@ -140,6 +151,20 @@ create_stream(Config, Name) ->
create,
[<<"/">>, Name, [], <<"guest">>]).
+delete_stream(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ delete,
+ [<<"/">>, Name, <<"guest">>]).
+
+lookup_leader(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ lookup_leader,
+ [<<"/">>, Name]).
+
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
index 8f7f3cb717..5ec43a9a0e 100644
--- a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
+++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
@@ -27,11 +27,11 @@
<properties>
<stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version>
- <junit.jupiter.version>5.8.1</junit.jupiter.version>
+ <junit.jupiter.version>5.8.2</junit.jupiter.version>
<assertj.version>3.21.0</assertj.version>
- <okhttp.version>4.9.2</okhttp.version>
+ <okhttp.version>4.9.3</okhttp.version>
<gson.version>2.8.9</gson.version>
- <logback.version>1.2.6</logback.version>
+ <logback.version>1.2.7</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.2.0</spotless.version>
diff --git a/tools/erlang_ls.bzl b/tools/erlang_ls.bzl
index adde90405d..073a83787a 100644
--- a/tools/erlang_ls.bzl
+++ b/tools/erlang_ls.bzl
@@ -15,7 +15,9 @@ deps_dirs:
- bazel-bin/external/*
include_dirs:
- deps
+ - deps/*
- deps/*/include
+ - deps/*/src
- bazel-bin/external
- bazel-bin/external/*/include
plt_path: bazel-bin/deps/rabbit/.base_plt.plt