diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2021-12-26 01:46:29 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2021-12-26 01:46:29 +0300 |
| commit | 8a0ad561829aa2baf581e49d7be802be86b1aa5c (patch) | |
| tree | ff4331925254ef51d3604e31445d42942ed23916 | |
| parent | 40db6563b8ae7c07d86f0e7710fccde1433029ef (diff) | |
| parent | 6835a5b9a7717c49168564069e086fb963dd612e (diff) | |
| download | rabbitmq-server-git-8a0ad561829aa2baf581e49d7be802be86b1aa5c.tar.gz | |
Merge branch 'master' into delegate_opt
42 files changed, 997 insertions, 224 deletions
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9efb138102..ce66e26c73 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -13,6 +13,7 @@ on: - '*.bzl' - '*.bazel' - .github/workflows/test.yaml + pull_request: jobs: test: name: Test diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index b6c0d665f4..00e4d4bebd 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel @@ -34,13 +34,13 @@ buildbuddy( git_repository( name = "rbe_23", - commit = "d2b454dc5138a2a92de45a0a672241a4fbb5a1e5", + commit = "b21c066e426de48e526cc0f8c5158b7024d04e85", remote = "https://github.com/rabbitmq/rbe-erlang-platform.git", ) git_repository( name = "rbe_24", - commit = "a087892ef4202dc3245b64d36d5921491848315f", + commit = "c8cbf65e2facbe464ebbcee7b6cf6f7a2d422ded", remote = "https://github.com/rabbitmq/rbe-erlang-platform.git", ) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 01be7ae4e2..f607f38246 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -703,9 +703,6 @@ suites = [ additional_hdrs = [ "src/rabbit_fifo.hrl", ], - erlc_opts = [ - "-I deps/rabbit", # allow rabbit_fifo.hrl to be included at src/rabbit_fifo.hrl - ], runtime_deps = [ "@meck//:bazel_erlang_lib", "@ra//:bazel_erlang_lib", diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 32e07d095b..48ef9ec439 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -500,7 +500,7 @@ stop_and_halt() -> %% init:stop() will be called regardless of any errors. try AppsLeft = [ A || {A, _, _} <- application:which_applications() ], - ?LOG_ERROR( + ?LOG_INFO( lists:flatten( ["Halting Erlang VM with the following applications:~n", [" ~p~n" || _ <- AppsLeft]]), diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl index 5411969759..d9670d87ec 100644 --- a/deps/rabbit/src/rabbit_access_control.erl +++ b/deps/rabbit/src/rabbit_access_control.erl @@ -38,35 +38,42 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> %% extra auth properties like MQTT client id are in AuthProps {ok, Modules} = application:get_env(rabbit, auth_backends), - R = lists:foldl( - fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) -> - %% It is possible to specify authn/authz within the cache module settings, - %% so we have to do both auth steps here - %% See this rabbitmq-users discussion: - %% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion - try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps); - ({ModN, ModZs}, {refused, _, _, _}) -> - %% Different modules for authN vs authZ. So authenticate - %% with authN module, then if that succeeds do - %% passwordless (i.e pre-authenticated) login with authZ. - try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps); - (Mod, {refused, _, _, _}) -> - %% Same module for authN and authZ. Just take the result - %% it gives us - case try_authenticate(Mod, Username, AuthProps) of - {ok, ModNUser = #auth_user{username = Username2, impl = Impl}} -> - rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]), - user(ModNUser, {ok, [{Mod, Impl}], []}); - Else -> - rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]), - Else - end; - (_, {ok, User}) -> - %% We've successfully authenticated. Skip to the end... - {ok, User} - end, - {refused, Username, "No modules checked '~s'", [Username]}, Modules), - R. + try + lists:foldl( + fun (rabbit_auth_backend_cache=ModN, {refused, _, _, _}) -> + %% It is possible to specify authn/authz within the cache module settings, + %% so we have to do both auth steps here + %% See this rabbitmq-users discussion: + %% https://groups.google.com/d/topic/rabbitmq-users/ObqM7MQdA3I/discussion + try_authenticate_and_try_authorize(ModN, ModN, Username, AuthProps); + ({ModN, ModZs}, {refused, _, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ. + try_authenticate_and_try_authorize(ModN, ModZs, Username, AuthProps); + (Mod, {refused, _, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + case try_authenticate(Mod, Username, AuthProps) of + {ok, ModNUser = #auth_user{username = Username2, impl = Impl}} -> + rabbit_log:debug("User '~s' authenticated successfully by backend ~s", [Username2, Mod]), + user(ModNUser, {ok, [{Mod, Impl}], []}); + Else -> + rabbit_log:debug("User '~s' failed authenticatation by backend ~s", [Username, Mod]), + Else + end; + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... + {ok, User} + end, + {refused, Username, "No modules checked '~s'", [Username]}, Modules) + catch + Type:Error:Stacktrace -> + rabbit_log:debug("User '~s' authentication failed with ~s:~p:~n~p", [Username, Type, Error, Stacktrace]), + {refused, Username, "User '~s' authentication failed with internal error. " + "Enable debug logs to see the real error.", [Username]} + + end. try_authenticate_and_try_authorize(ModN, ModZs0, Username, AuthProps) -> ModZs = case ModZs0 of diff --git a/deps/rabbit/src/rabbit_auth_backend_internal.erl b/deps/rabbit/src/rabbit_auth_backend_internal.erl index a46beaaf83..9295b65a4c 100644 --- a/deps/rabbit/src/rabbit_auth_backend_internal.erl +++ b/deps/rabbit/src/rabbit_auth_backend_internal.erl @@ -14,12 +14,15 @@ -export([user_login_authentication/2, user_login_authorization/2, check_vhost_access/3, check_resource_access/4, check_topic_access/4]). --export([add_user/3, delete_user/2, lookup_user/1, exists/1, +-export([add_user/3, add_user/4, add_user/5, delete_user/2, lookup_user/1, exists/1, change_password/3, clear_password/2, hash_password/2, change_password_hash/2, change_password_hash/3, set_tags/3, set_permissions/6, clear_permissions/3, set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4, - add_user_sans_validation/3, put_user/2, put_user/3]). + add_user_sans_validation/3, put_user/2, put_user/3, + update_user/5, + update_user_with_hash/5, + add_user_sans_validation/6]). -export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1, is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]). @@ -208,14 +211,56 @@ add_user(Username, Password, ActingUser) -> validate_and_alternate_credentials(Username, Password, ActingUser, fun add_user_sans_validation/3). +-spec add_user(rabbit_types:username(), rabbit_types:password(), + rabbit_types:username(), [atom()]) -> 'ok' | {'error', string()}. + +add_user(Username, Password, ActingUser, Tags) -> + add_user(Username, Password, ActingUser, undefined, Tags). + +add_user(Username, Password, ActingUser, Limits, Tags) -> + validate_and_alternate_credentials(Username, Password, ActingUser, + add_user_sans_validation(Limits, Tags)). + add_user_sans_validation(Username, Password, ActingUser) -> + add_user_sans_validation(Username, Password, ActingUser, undefined, []). + +add_user_sans_validation(Limits, Tags) -> + fun(Username, Password, ActingUser) -> + add_user_sans_validation(Username, Password, ActingUser, Limits, Tags) + end. + +add_user_sans_validation(Username, Password, ActingUser, Limits, Tags) -> rabbit_log:debug("Asked to create a new user '~s', password length in bytes: ~p", [Username, bit_size(Password)]), %% hash_password will pick the hashing function configured for us %% but we also need to store a hint as part of the record, so we %% retrieve it here one more time HashingMod = rabbit_password:hashing_mod(), PasswordHash = hash_password(HashingMod, Password), - User = internal_user:create_user(Username, PasswordHash, HashingMod), + User0 = internal_user:create_user(Username, PasswordHash, HashingMod), + ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], + User1 = internal_user:set_tags(User0, ConvertedTags), + User = case Limits of + undefined -> User1; + Term -> internal_user:update_limits(add, User1, Term) + end, + add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser). + +add_user_sans_validation(Username, PasswordHash, HashingAlgorithm, Tags, Limits, ActingUser) -> + rabbit_log:debug("Asked to create a new user '~s' with password hash", [Username]), + ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], + HashingMod = rabbit_password:hashing_mod(), + User0 = internal_user:create_user(Username, PasswordHash, HashingMod), + User1 = internal_user:set_tags( + internal_user:set_password_hash(User0, + PasswordHash, HashingAlgorithm), + ConvertedTags), + User = case Limits of + undefined -> User1; + Term -> internal_user:update_limits(add, User1, Term) + end, + add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser). + +add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser) -> try R = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -229,6 +274,14 @@ add_user_sans_validation(Username, Password, ActingUser) -> rabbit_log:info("Created user '~s'", [Username]), rabbit_event:notify(user_created, [{name, Username}, {user_who_performed_action, ActingUser}]), + case ConvertedTags of + [] -> ok; + _ -> notify_user_tags_set(Username, ConvertedTags, ActingUser) + end, + case Limits of + undefined -> ok; + _ -> notify_limit_set(Username, ActingUser, Limits) + end, R catch throw:{error, {user_already_exists, _}} = Error -> @@ -322,6 +375,42 @@ change_password_sans_validation(Username, Password, ActingUser) -> erlang:raise(Class, Error, Stacktrace) end. +update_user(Username, Password, Tags, Limits, ActingUser) -> + validate_and_alternate_credentials(Username, Password, ActingUser, + update_user_sans_validation(Tags, Limits)). + +update_user_sans_validation(Tags, Limits) -> + fun(Username, Password, ActingUser) -> + try + rabbit_log:debug("Asked to change password of user '~s', new password length in bytes: ~p", [Username, bit_size(Password)]), + HashingAlgorithm = rabbit_password:hashing_mod(), + + rabbit_log:debug("Asked to set user tags for user '~s' to ~p", [Username, Tags]), + + ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], + R = update_user_with_hash(Username, + hash_password(rabbit_password:hashing_mod(), + Password), + HashingAlgorithm, + ConvertedTags, + Limits), + rabbit_log:info("Successfully changed password for user '~s'", [Username]), + rabbit_event:notify(user_password_changed, + [{name, Username}, + {user_who_performed_action, ActingUser}]), + + notify_user_tags_set(Username, ConvertedTags, ActingUser), + R + catch + throw:{error, {no_such_user, _}} = Error -> + rabbit_log:warning("Failed to change password for user '~s': the user does not exist", [Username]), + throw(Error); + Class:Error:Stacktrace -> + rabbit_log:warning("Failed to change password for user '~s': ~p", [Username, Error]), + erlang:raise(Class, Error, Stacktrace) + end + end. + -spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'. clear_password(Username, ActingUser) -> @@ -346,10 +435,22 @@ change_password_hash(Username, PasswordHash) -> change_password_hash(Username, PasswordHash, HashingAlgorithm) -> - update_user(Username, fun(User) -> - internal_user:set_password_hash(User, - PasswordHash, HashingAlgorithm) - end). + update_user_with_hash(Username, PasswordHash, HashingAlgorithm, [], undefined). + +update_user_with_hash(Username, PasswordHash, HashingAlgorithm, ConvertedTags, Limits) -> + update_user(Username, + fun(User0) -> + User1 = internal_user:set_password_hash(User0, + PasswordHash, HashingAlgorithm), + User2 = case Limits of + undefined -> User1; + _ -> internal_user:update_limits(add, User1, Limits) + end, + case ConvertedTags of + [] -> User2; + _ -> internal_user:set_tags(User2, ConvertedTags) + end + end). -spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'. @@ -360,9 +461,7 @@ set_tags(Username, Tags, ActingUser) -> R = update_user(Username, fun(User) -> internal_user:set_tags(User, ConvertedTags) end), - rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]), - rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags}, - {user_who_performed_action, ActingUser}]), + notify_user_tags_set(Username, ConvertedTags, ActingUser), R catch throw:{error, {no_such_user, _}} = Error -> @@ -373,6 +472,11 @@ set_tags(Username, Tags, ActingUser) -> erlang:raise(Class, Error, Stacktrace) end . +notify_user_tags_set(Username, ConvertedTags, ActingUser) -> + rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]), + rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags}, + {user_who_performed_action, ActingUser}]). + -spec set_permissions (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), rabbit_types:username()) -> @@ -648,13 +752,27 @@ put_user(User, Version, ActingUser) -> rabbit_credential_validation:validate(Username, Password) =:= ok end, + Limits = case rabbit_feature_flags:is_enabled(user_limits) of + false -> + undefined; + true -> + case maps:get(limits, User, undefined) of + undefined -> + undefined; + Term -> + case validate_user_limits(Term) of + ok -> Term; + Error -> throw(Error) + end + end + end, case exists(Username) of true -> case {HasPassword, HasPasswordHash} of {true, false} -> - update_user_password(PassedCredentialValidation, Username, Password, Tags, ActingUser); + update_user_password(PassedCredentialValidation, Username, Password, Tags, Limits, ActingUser); {false, true} -> - update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser); + update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version); {true, true} -> throw({error, both_password_and_password_hash_are_provided}); %% clear password, update tags if needed @@ -665,63 +783,54 @@ put_user(User, Version, ActingUser) -> false -> case {HasPassword, HasPasswordHash} of {true, false} -> - create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, ActingUser); + create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, Limits, ActingUser); {false, true} -> - create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, ActingUser); + create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, Limits, ActingUser); {true, true} -> throw({error, both_password_and_password_hash_are_provided}); {false, false} -> %% this user won't be able to sign in using %% a username/password pair but can be used for x509 certificate authentication, %% with authn backends such as HTTP or LDAP and so on. - create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, ActingUser) + create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, Limits, ActingUser) end end. -update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, ActingUser) -> - rabbit_auth_backend_internal:change_password(Username, Password, ActingUser), - rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser); -update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _ActingUser) -> +update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, Limits, ActingUser) -> + %% change_password, set_tags and limits + rabbit_auth_backend_internal:update_user(Username, Password, Tags, Limits, ActingUser); +update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _Limits, _ActingUser) -> %% we don't log here because %% rabbit_auth_backend_internal will do it throw({error, credential_validation_failed}). -update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser) -> +update_user_password_hash(Username, PasswordHash, Tags, Limits, User, Version) -> %% when a hash this provided, credential validation %% is not applied HashingAlgorithm = hashing_algorithm(User, Version), Hash = rabbit_misc:b64decode_or_throw(PasswordHash), - rabbit_auth_backend_internal:change_password_hash( - Username, Hash, HashingAlgorithm), - rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser). - -create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, ActingUser) -> - rabbit_auth_backend_internal:add_user(Username, Password, ActingUser), - rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser); -create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, ActingUser) -> - rabbit_auth_backend_internal:add_user(Username, Password, ActingUser), - rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser), + ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], + rabbit_auth_backend_internal:update_user_with_hash( + Username, Hash, HashingAlgorithm, ConvertedTags, Limits). + +create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, Limits, ActingUser) -> + rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags); +create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, Limits, ActingUser) -> + rabbit_auth_backend_internal:add_user(Username, Password, ActingUser, Limits, Tags), preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser); -create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _) -> +create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _, _) -> %% we don't log here because %% rabbit_auth_backend_internal will do it throw({error, credential_validation_failed}). -create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, ActingUser) -> +create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, Limits, ActingUser) -> %% when a hash this provided, credential validation %% is not applied HashingAlgorithm = hashing_algorithm(User, Version), Hash = rabbit_misc:b64decode_or_throw(PasswordHash), - %% first we create a user with dummy credentials and no - %% validation applied, then we update password hash - TmpPassword = rabbit_guid:binary(rabbit_guid:gen_secure(), "tmp"), - rabbit_auth_backend_internal:add_user_sans_validation(Username, TmpPassword, ActingUser), - - rabbit_auth_backend_internal:change_password_hash( - Username, Hash, HashingAlgorithm), - rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser), + rabbit_auth_backend_internal:add_user_sans_validation(Username, Hash, HashingAlgorithm, Tags, Limits, ActingUser), preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser). preconfigure_permissions(_Username, undefined, _ActingUser) -> @@ -756,8 +865,7 @@ set_user_limits(Username, Definition, ActingUser) when is_map(Definition) -> end. validate_parameters_and_update_limit(Username, Term, ActingUser) -> - case flatten_errors(rabbit_parameter_validation:proplist( - <<"user-limits">>, user_limit_validation(), Term)) of + case validate_user_limits(Term) of ok -> update_user(Username, fun(User) -> internal_user:update_limits(add, User, Term) @@ -767,6 +875,10 @@ validate_parameters_and_update_limit(Username, Term, ActingUser) -> {error_string, rabbit_misc:format(Reason, Arguments)} end. +validate_user_limits(Term) -> + flatten_errors(rabbit_parameter_validation:proplist( + <<"user-limits">>, user_limit_validation(), Term)). + user_limit_validation() -> [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional}, {<<"max-channels">>, fun rabbit_parameter_validation:integer/2, optional}]. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 7fee29f2c3..cd8ebe4446 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1306,7 +1306,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, {MsgSeqNo, State1} = - case DoConfirm orelse Mandatory of + case DoConfirm of false -> {undefined, State0}; true -> rabbit_global_counters:messages_received_confirm(amqp091, 1), SeqNo = State0#ch.publish_seqno, diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 8faf2f808a..152a6be5f2 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -464,6 +464,10 @@ add_policy(Param, Username) -> add_policy(VHost, Param, Username) -> Key = maps:get(name, Param, undefined), + case Key of + undefined -> exit(rabbit_misc:format("policy in virtual host '~s' has undefined name", [VHost])); + _ -> ok + end, case rabbit_policy:set( VHost, Key, maps:get(pattern, Param, undefined), case maps:get(definition, Param, undefined) of diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 76edecf5d8..28b4cb3eba 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -33,6 +33,7 @@ get_disk_free/0, set_enabled/1]). -define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). -define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). -define(DEFAULT_DISK_FREE_LIMIT, 50000000). @@ -73,51 +74,42 @@ %%---------------------------------------------------------------------------- -spec get_disk_free_limit() -> integer(). - get_disk_free_limit() -> - gen_server:call(?MODULE, get_disk_free_limit, infinity). + safe_ets_lookup(disk_free_limit, ?DEFAULT_DISK_FREE_LIMIT). -spec set_disk_free_limit(disk_free_limit()) -> 'ok'. - set_disk_free_limit(Limit) -> - gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). + gen_server:call(?MODULE, {set_disk_free_limit, Limit}). -spec get_min_check_interval() -> integer(). - get_min_check_interval() -> - gen_server:call(?MODULE, get_min_check_interval, infinity). + safe_ets_lookup(min_check_interval, ?DEFAULT_MIN_DISK_CHECK_INTERVAL). -spec set_min_check_interval(integer()) -> 'ok'. - set_min_check_interval(Interval) -> - gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity). + gen_server:call(?MODULE, {set_min_check_interval, Interval}). -spec get_max_check_interval() -> integer(). - get_max_check_interval() -> - gen_server:call(?MODULE, get_max_check_interval, infinity). + safe_ets_lookup(max_check_interval, ?DEFAULT_MAX_DISK_CHECK_INTERVAL). -spec set_max_check_interval(integer()) -> 'ok'. - set_max_check_interval(Interval) -> - gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). + gen_server:call(?MODULE, {set_max_check_interval, Interval}). -spec get_disk_free() -> (integer() | 'unknown'). - get_disk_free() -> - gen_server:call(?MODULE, get_disk_free, infinity). + safe_ets_lookup(disk_free, unknown). -spec set_enabled(string()) -> 'ok'. - set_enabled(Enabled) -> - gen_server:call(?MODULE, {set_enabled, Enabled}, infinity). + gen_server:call(?MODULE, {set_enabled, Enabled}). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error(). - start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). @@ -125,18 +117,16 @@ init([Limit]) -> Dir = dir(), {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), - State = #state{dir = Dir, - min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, - max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, - alarmed = false, - enabled = true, - limit = Limit, - retries = Retries, - interval = Interval}, - {ok, enable(State)}. - -handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) -> - {reply, Limit, State}; + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + State0 = #state{dir = Dir, + alarmed = false, + enabled = true, + limit = Limit, + retries = Retries, + interval = Interval}, + State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), + State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), + {ok, enable(State2)}. handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> rabbit_log:info("Cannot set disk free limit: " @@ -146,20 +136,14 @@ handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; -handle_call(get_min_check_interval, _From, State) -> - {reply, State#state.min_interval, State}; - handle_call(get_max_check_interval, _From, State) -> {reply, State#state.max_interval, State}; handle_call({set_min_check_interval, MinInterval}, _From, State) -> - {reply, ok, State#state{min_interval = MinInterval}}; + {reply, ok, set_min_check_interval(MinInterval, State)}; handle_call({set_max_check_interval, MaxInterval}, _From, State) -> - {reply, ok, State#state{max_interval = MaxInterval}}; - -handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> - {reply, Actual, State}; + {reply, ok, set_max_check_interval(MaxInterval, State)}; handle_call({set_enabled, _Enabled = true}, _From, State) -> start_timer(set_disk_limits(State, State#state.limit)), @@ -194,14 +178,36 @@ code_change(_OldVsn, State, _Extra) -> %% Server Internals %%---------------------------------------------------------------------------- +safe_ets_lookup(Key, Default) -> + try + case ets:lookup(?ETS_NAME, Key) of + [{Key, Value}] -> + Value; + [] -> + Default + end + catch + error:badarg -> + Default + end. + % the partition / drive containing this directory will be monitored dir() -> rabbit_mnesia:dir(). +set_min_check_interval(MinInterval, State) -> + ets:insert(?ETS_NAME, {min_check_interval, MinInterval}), + State#state{min_interval = MinInterval}. + +set_max_check_interval(MaxInterval, State) -> + ets:insert(?ETS_NAME, {max_check_interval, MaxInterval}), + State#state{max_interval = MaxInterval}. + set_disk_limits(State, Limit0) -> Limit = interpret_limit(Limit0), State1 = State#state { limit = Limit }, rabbit_log:info("Disk free limit set to ~pMB", [trunc(Limit / 1000000)]), + ets:insert(?ETS_NAME, {disk_free_limit, Limit}), internal_update(State1). internal_update(State = #state { limit = Limit, @@ -219,7 +225,8 @@ internal_update(State = #state { limit = Limit, _ -> ok end, - State #state {alarmed = NewAlarmed, actual = CurrentFree}. + ets:insert(?ETS_NAME, {disk_free, CurrentFree}), + State#state{alarmed = NewAlarmed, actual = CurrentFree}. get_disk_free(Dir) -> get_disk_free(Dir, os:type()). @@ -227,11 +234,89 @@ get_disk_free(Dir) -> get_disk_free(Dir, {unix, Sun}) when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> Df = os:find_executable("df"), - parse_free_unix(rabbit_misc:os_cmd(Df ++ " -k " ++ Dir)); + parse_free_unix(run_cmd(Df ++ " -k " ++ Dir)); get_disk_free(Dir, {unix, _}) -> Df = os:find_executable("df"), - parse_free_unix(rabbit_misc:os_cmd(Df ++ " -kP " ++ Dir)); + parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> + % Dir: + % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia" + case win32_get_drive_letter(Dir) of + error -> + rabbit_log:warning("Expected the mnesia directory absolute " + "path to start with a drive letter like " + "'C:'. The path is: '~p'", [Dir]), + case win32_get_disk_free_dir(Dir) of + {ok, Free} -> + Free; + _ -> exit(could_not_determine_disk_free) + end; + DriveLetter -> + case win32_get_disk_free_fsutil(DriveLetter) of + {ok, Free0} -> Free0; + error -> + case win32_get_disk_free_pwsh(DriveLetter) of + {ok, Free1} -> Free1; + _ -> exit(could_not_determine_disk_free) + end + end + end. + +parse_free_unix(Str) -> + case string:tokens(Str, "\n") of + [_, S | _] -> case string:tokens(S, " \t") of + [_, _, _, Free | _] -> list_to_integer(Free) * 1024; + _ -> exit({unparseable, Str}) + end; + _ -> exit({unparseable, Str}) + end. + +win32_get_drive_letter([DriveLetter, $:, $/ | _]) when + (DriveLetter >= $a andalso DriveLetter =< $z) orelse + (DriveLetter >= $A andalso DriveLetter =< $Z) -> + DriveLetter; +win32_get_drive_letter(_) -> + error. + +win32_get_disk_free_fsutil(DriveLetter) when + (DriveLetter >= $a andalso DriveLetter =< $z) orelse + (DriveLetter >= $A andalso DriveLetter =< $Z) -> + % DriveLetter $c + FsutilCmd = "fsutil.exe volume diskfree " ++ [DriveLetter] ++ ":", + + % C:\windows\system32>fsutil volume diskfree c: + % Total free bytes : 812,733,878,272 (756.9 GB) + % Total bytes : 1,013,310,287,872 (943.7 GB) + % Total quota free bytes : 812,733,878,272 (756.9 GB) + case run_cmd(FsutilCmd) of + {error, timeout} -> + error; + FsutilResult -> + case string:slice(FsutilResult, 0, 5) of + "Error" -> + error; + "Total" -> + FirstLine = hd(string:tokens(FsutilResult, "\r\n")), + {match, [FreeStr]} = re:run(FirstLine, "(\\d+,?)+", [{capture, first, list}]), + {ok, list_to_integer(lists:flatten(string:tokens(FreeStr, ",")))} + end + end. + +win32_get_disk_free_pwsh(DriveLetter) when + (DriveLetter >= $a andalso DriveLetter =< $z) orelse + (DriveLetter >= $A andalso DriveLetter =< $Z) -> + % DriveLetter $c + PoshCmd = "powershell.exe -NoLogo -NoProfile -NonInteractive -Command (Get-PSDrive " ++ [DriveLetter] ++ ").Free", + case run_cmd(PoshCmd) of + {error, timeout} -> + error; + PoshResultStr -> + % Note: remove \r\n + PoshResult = string:slice(PoshResultStr, 0, length(PoshResultStr) - 2), + {ok, list_to_integer(PoshResult)} + end. + +win32_get_disk_free_dir(Dir) -> %% On Windows, the Win32 API enforces a limit of 260 characters %% (MAX_PATH). If we call `dir` with a path longer than that, it %% fails with "File not found". Starting with Windows 10 version @@ -253,22 +338,11 @@ get_disk_free(Dir, {win32, _}) -> %% See the following page to learn more about this: %% https://ss64.com/nt/syntax-filenames.html RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all), - parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ RawDir ++ "\"")). - -parse_free_unix(Str) -> - case string:tokens(Str, "\n") of - [_, S | _] -> case string:tokens(S, " \t") of - [_, _, _, Free | _] -> list_to_integer(Free) * 1024; - _ -> exit({unparseable, Str}) - end; - _ -> exit({unparseable, Str}) - end. - -parse_free_win32(CommandResult) -> + CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""), LastLine = lists:last(string:tokens(CommandResult, "\r\n")), {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)", [{capture, all_but_first, list}]), - list_to_integer(lists:reverse(Free)). + {ok, list_to_integer(lists:reverse(Free))}. interpret_limit({mem_relative, Relative}) when is_number(Relative) -> @@ -318,3 +392,20 @@ enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} erlang:send_after(Interval, self(), try_enable), State#state{enabled = false} end. + +run_cmd(Cmd) -> + Pid = self(), + Ref = make_ref(), + CmdFun = fun() -> + CmdResult = rabbit_misc:os_cmd(Cmd), + Pid ! {Pid, Ref, CmdResult} + end, + CmdPid = spawn(CmdFun), + receive + {Pid, Ref, CmdResult} -> + CmdResult + after 5000 -> + exit(CmdPid, kill), + rabbit_log:error("Command timed out: '~s'", [Cmd]), + {error, timeout} + end. diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index 925a5356d7..2060c354c5 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -40,7 +40,7 @@ is_file(File) -> is_dir(Dir) -> is_dir_internal(read_file_info(Dir)). -is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)). +is_dir_no_handle(Dir) -> is_dir_internal(file:read_file_info(Dir, [raw])). is_dir_internal({ok, #file_info{type=directory}}) -> true; is_dir_internal(_) -> false. @@ -83,14 +83,23 @@ wildcard(Pattern, Dir) -> list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end). read_file_info(File) -> - with_handle(fun () -> prim_file:read_file_info(File) end). + with_handle(fun () -> file:read_file_info(File, [raw]) end). -spec read_term_file (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()). read_term_file(File) -> try - {ok, Data} = with_handle(fun () -> prim_file:read_file(File) end), + F = fun() -> + {ok, FInfo} = file:read_file_info(File, [raw]), + {ok, Fd} = file:open(File, [read, raw, binary]), + try + file:read(Fd, FInfo#file_info.size) + after + file:close(Fd) + end + end, + {ok, Data} = with_handle(F), {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), TokenGroups = group_tokens(Tokens), {ok, [begin diff --git a/deps/rabbit/src/rabbit_health_check.erl b/deps/rabbit/src/rabbit_health_check.erl index a454c252fd..b04c4f9853 100644 --- a/deps/rabbit/src/rabbit_health_check.erl +++ b/deps/rabbit/src/rabbit_health_check.erl @@ -64,7 +64,11 @@ node_health_check(rabbit_node_monitor) -> end; node_health_check(alarms) -> - case proplists:get_value(alarms, rabbit:status()) of + % Note: + % Removed call to rabbit:status/0 here due to a memory leak on win32, + % plus it uses an excessive amount of resources + % Alternative to https://github.com/rabbitmq/rabbitmq-server/pull/3893 + case rabbit:alarms() of [] -> ok; Alarms -> diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl index e93d81dc47..710ce1b65e 100644 --- a/deps/rabbit/src/rabbit_osiris_metrics.erl +++ b/deps/rabbit/src/rabbit_osiris_metrics.erl @@ -78,6 +78,8 @@ handle_info(tick, #state{timeout = Timeout} = State) -> %% down `rabbit_sup` and the whole `rabbit` app. [] end, + + rabbit_core_metrics:queue_stats(QName, Infos), rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, {messages, COffs}, diff --git a/deps/rabbit/src/rabbit_policies.erl b/deps/rabbit/src/rabbit_policies.erl index 062635c5b4..e23d12d81a 100644 --- a/deps/rabbit/src/rabbit_policies.erl +++ b/deps/rabbit/src/rabbit_policies.erl @@ -176,4 +176,6 @@ merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); -merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). +merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal); +%% use operator policy value for booleans +merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 5c23fbb51d..f88822a54f 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -846,18 +846,22 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, %% This can happen during recovery %% we need to re-initialise the queue record %% if the stream id is a match - [Q] = mnesia:dirty_read(rabbit_durable_queue, QName), - case amqqueue:get_type_state(Q) of - #{name := S} when S == StreamId -> - rabbit_log:debug("~s: initializing queue record for stream id ~s", - [?MODULE, StreamId]), - _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + case mnesia:dirty_read(rabbit_durable_queue, QName) of + [] -> + %% queue not found at all, it must have been deleted ok; - _ -> - ok - end, - - send_self_command({mnesia_updated, StreamId, Args}); + [Q] -> + case amqqueue:get_type_state(Q) of + #{name := S} when S == StreamId -> + rabbit_log:debug("~s: initializing queue record for stream id ~s", + [?MODULE, StreamId]), + _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + ok; + _ -> + ok + end, + send_self_command({mnesia_updated, StreamId, Args}) + end; _ -> send_self_command({mnesia_updated, StreamId, Args}) catch _:E -> diff --git a/deps/rabbit/test/definition_import_SUITE.erl b/deps/rabbit/test/definition_import_SUITE.erl index d893862daf..ba3cb979d1 100644 --- a/deps/rabbit/test/definition_import_SUITE.erl +++ b/deps/rabbit/test/definition_import_SUITE.erl @@ -46,7 +46,10 @@ groups() -> import_case13, import_case14, import_case15, - import_case16 + import_case16, + import_case17, + import_case18, + import_case19 ]}, {boot_time_import_using_classic_source, [], [ @@ -236,6 +239,36 @@ import_case16(Config) -> {skip, "Should not run in mixed version environments"} end. +import_case17(Config) -> import_invalid_file_case(Config, "failing_case17"). + +import_case18(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + false -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, user_limits) of + ok -> + import_file_case(Config, "case18"), + User = <<"limited_guest">>, + UserIsImported = + fun () -> + case user_lookup(Config, User) of + {error, not_found} -> false; + _ -> true + end + end, + rabbit_ct_helpers:await_condition(UserIsImported, 20000), + {ok, UserRec} = user_lookup(Config, User), + ?assertEqual(#{<<"max-connections">> => 2}, internal_user:get_limits(UserRec)), + ok; + Skip -> + Skip + end; + _ -> + %% skip the test in mixed version mode + {skip, "Should not run in mixed version environments"} + end. + +import_case19(Config) -> import_invalid_file_case(Config, "failing_case19"). + export_import_round_trip_case1(Config) -> case rabbit_ct_helpers:is_mixed_versions() of false -> @@ -382,3 +415,6 @@ queue_lookup(Config, VHost, Name) -> vhost_lookup(Config, VHost) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, lookup, [VHost]). + +user_lookup(Config, User) -> + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, lookup_user, [User]). diff --git a/deps/rabbit/test/definition_import_SUITE_data/case18.json b/deps/rabbit/test/definition_import_SUITE_data/case18.json new file mode 100644 index 0000000000..9e0f755beb --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/case18.json @@ -0,0 +1,46 @@ +{ + "bindings": [], + "exchanges": [], + "global_parameters": [ + { + "name": "cluster_name", + "value": "rabbitmq@localhost" + } + ], + "parameters": [], + "permissions": [ + { + "configure": ".*", + "read": ".*", + "user": "guest", + "vhost": "/", + "write": ".*" + } + ], + "policies": [], + "queues": [], + "rabbit_version": "3.9.1", + "rabbitmq_version": "3.9.1", + "topic_permissions": [], + "users": [ + { + "hashing_algorithm": "rabbit_password_hashing_sha256", + "limits": {"max-connections" : 2}, + "name": "limited_guest", + "password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6", + "tags": [ + "administrator" + ] + } + ], + "vhosts": [ + { + "limits": [], + "name": "/" + }, + { + "limits": [], + "name": "tagged" + } + ] +} diff --git a/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json b/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json new file mode 100644 index 0000000000..4776408833 --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/failing_case17.json @@ -0,0 +1,19 @@ +{ + "vhosts": [ + { + "name": "\/" + } + ], + "policies": [ + { + "vhost": "\/", + "pattern": "^project-nd-ns-", + "apply-to": "queues", + "definition": { + "expires": 120000, + "max-length": 10000 + }, + "priority": 1 + } + ] +} diff --git a/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json b/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json new file mode 100644 index 0000000000..ab9d355538 --- /dev/null +++ b/deps/rabbit/test/definition_import_SUITE_data/failing_case19.json @@ -0,0 +1,46 @@ +{ + "bindings": [], + "exchanges": [], + "global_parameters": [ + { + "name": "cluster_name", + "value": "rabbitmq@localhost" + } + ], + "parameters": [], + "permissions": [ + { + "configure": ".*", + "read": ".*", + "user": "guest", + "vhost": "/", + "write": ".*" + } + ], + "policies": [], + "queues": [], + "rabbit_version": "3.9.1", + "rabbitmq_version": "3.9.1", + "topic_permissions": [], + "users": [ + { + "hashing_algorithm": "rabbit_password_hashing_sha256", + "limits": {"max-connections" : "twomincepies"}, + "name": "limited_guest", + "password_hash": "wS4AT3B4Z5RpWlFn1FA30osf2C75D7WA3gem591ACDZ6saO6", + "tags": [ + "administrator" + ] + } + ], + "vhosts": [ + { + "limits": [], + "name": "/" + }, + { + "limits": [], + "name": "tagged" + } + ] +} diff --git a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl index 30bc5f8ba6..6d2e515da3 100644 --- a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl +++ b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl @@ -29,6 +29,7 @@ groups() -> confirm_nowait, confirm_ack, confirm_acks, + confirm_after_mandatory_bug, confirm_mandatory_unroutable, confirm_unroutable_message], [ @@ -187,6 +188,17 @@ confirm_acks(Config) -> publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>, <<"msg4">>]), receive_many(lists:seq(1, 4)). +confirm_after_mandatory_bug(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName, + mandatory = true}, #amqp_msg{payload = <<"msg1">>}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish(Ch, QName, [<<"msg2">>]), + true = amqp_channel:wait_for_confirms(Ch, 1), + ok. + %% For unroutable messages, the broker will issue a confirm once the exchange verifies a message %% won't route to any queue (returns an empty list of queues). %% If the message is also published as mandatory, the basic.return is sent to the client before diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 9cb401fc8e..cf853bca98 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -11,7 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). --include("src/rabbit_fifo.hrl"). +-include_lib("rabbit/src/rabbit_fifo.hrl"). %%%=================================================================== %%% Common Test callbacks diff --git a/deps/rabbit/test/unit_disk_monitor_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_SUITE.erl index 71393d5d35..7a746349ca 100644 --- a/deps/rabbit/test/unit_disk_monitor_SUITE.erl +++ b/deps/rabbit/test/unit_disk_monitor_SUITE.erl @@ -67,6 +67,12 @@ set_disk_free_limit_command(Config) -> ?MODULE, set_disk_free_limit_command1, [Config]). set_disk_free_limit_command1(_Config) -> + F = fun () -> + DiskFree = rabbit_disk_monitor:get_disk_free(), + DiskFree =/= unknown + end, + rabbit_ct_helpers:await_condition(F), + %% Use an integer rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1}), disk_free_limit_to_total_memory_ratio_is(1), @@ -84,7 +90,8 @@ set_disk_free_limit_command1(_Config) -> passed. disk_free_limit_to_total_memory_ratio_is(MemRatio) -> + DiskFreeLimit = rabbit_disk_monitor:get_disk_free_limit(), ExpectedLimit = MemRatio * vm_memory_monitor:get_total_memory(), % Total memory is unstable, so checking order - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2, - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98. + true = ExpectedLimit/DiskFreeLimit < 1.2, + true = ExpectedLimit/DiskFreeLimit > 0.98. diff --git a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl index 80fa3d2e09..ae16cbb379 100644 --- a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl +++ b/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl @@ -88,7 +88,7 @@ disk_monitor_enable1(_Config) -> application:set_env(rabbit, disk_monitor_failure_retry_interval, 100), ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup), ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]), - undefined = rabbit_disk_monitor:get_disk_free(), + unknown = rabbit_disk_monitor:get_disk_free(), Cmd = case os:type() of {win32, _} -> " Le volume dans le lecteur C n’a pas de nom.\n" " Le numéro de série du volume est 707D-5BDC\n" diff --git a/deps/rabbit/test/unit_operator_policy_SUITE.erl b/deps/rabbit/test/unit_operator_policy_SUITE.erl index 804da2d1c9..dedd6c82af 100644 --- a/deps/rabbit/test/unit_operator_policy_SUITE.erl +++ b/deps/rabbit/test/unit_operator_policy_SUITE.erl @@ -21,7 +21,8 @@ all() -> groups() -> [ {parallel_tests, [parallel], [ - merge_operator_policy_definitions + merge_operator_policy_definitions, + conflict_resolution_for_booleans ]} ]. @@ -102,6 +103,54 @@ merge_operator_policy_definitions(_Config) -> [{definition, [ {<<"message-ttl">>, 3000} ]}]) - ), + ). + - passed. + conflict_resolution_for_booleans(_Config) -> + ?assertEqual( + [ + {<<"remote-dc-replicate">>, true} + ], + rabbit_policy:merge_operator_definitions( + #{definition => #{ + <<"remote-dc-replicate">> => true + }}, + [{definition, [ + {<<"remote-dc-replicate">>, true} + ]}])), + + ?assertEqual( + [ + {<<"remote-dc-replicate">>, false} + ], + rabbit_policy:merge_operator_definitions( + #{definition => #{ + <<"remote-dc-replicate">> => false + }}, + [{definition, [ + {<<"remote-dc-replicate">>, false} + ]}])), + + ?assertEqual( + [ + {<<"remote-dc-replicate">>, true} + ], + rabbit_policy:merge_operator_definitions( + #{definition => #{ + <<"remote-dc-replicate">> => false + }}, + [{definition, [ + {<<"remote-dc-replicate">>, true} + ]}])), + + ?assertEqual( + [ + {<<"remote-dc-replicate">>, false} + ], + rabbit_policy:merge_operator_definitions( + #{definition => #{ + <<"remote-dc-replicate">> => true + }}, + [{definition, [ + {<<"remote-dc-replicate">>, false} + ]}])).
\ No newline at end of file diff --git a/deps/rabbitmq_auth_backend_oauth2/README.md b/deps/rabbitmq_auth_backend_oauth2/README.md index bb698ecc09..b923a47fb6 100644 --- a/deps/rabbitmq_auth_backend_oauth2/README.md +++ b/deps/rabbitmq_auth_backend_oauth2/README.md @@ -139,6 +139,46 @@ In that case, the configuration will look like this: NOTE: `jwks_url` takes precedence over `signing_keys` if both are provided. +### Variables Configurable in rabbitmq.conf + +| Key | Documentation +|------------------------------------------|----------- +| `auth_oauth2.resource_server_id` | [The Resource Server ID](#resource-server-id-and-scope-prefixes) +| `auth_oauth2.additional_scopes_key` | Configure the plugin to also look in other fields (maps to `additional_rabbitmq_scopes` in the old format). +| `auth_oauth2.default_key` | ID of the default signing key. +| `auth_oauth2.signing_keys` | Paths to signing key files. +| `auth_oauth2.jwks_url` | The URL of key server. According to the [JWT Specification](https://datatracker.ietf.org/doc/html/rfc7515#section-4.1.2) key server URL must be https. +| `auth_oauth2.https.cacertfile` | Path to a file containing PEM-encoded CA certificates. The CA certificates are used during key server [peer verification](https://rabbitmq.com/ssl.html#peer-verification). +| `auth_oauth2.https.depth` | The maximum number of non-self-issued intermediate certificates that may follow the peer certificate in a valid [certification path](https://rabbitmq.com/ssl.html#peer-verification-depth). Default is 10. +| `auth_oauth2.https.peer_verification` | Should [peer verification](https://rabbitmq.com/ssl.html#peer-verification) be enabled. Available values: `verify_none`, `verify_peer`. Default is `verify_none`. It is recommended to configure `verify_peer`. Peer verification requires a certain amount of setup and is more secure. +| `auth_oauth2.https.fail_if_no_peer_cert` | Used together with `auth_oauth2.https.peer_verification = verify_peer`. When set to `true`, TLS connection will be rejected if client fails to provide a certificate. Default is `false`. +| `auth_oauth2.https.hostname_verification`| Enable wildcard-aware hostname verification for key server. Available values: `wildcard`, `none`. Default is `none`. +| `auth_oauth2.algorithms` | Restrict [the usable algorithms](https://github.com/potatosalad/erlang-jose#algorithm-support). + +For example: + +Configure with key files +``` +auth_oauth2.resource_server_id = new_resource_server_id +auth_oauth2.additional_scopes_key = my_custom_scope_key +auth_oauth2.default_key = id1 +auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem +auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem +auth_oauth2.algorithms.1 = HS256 +auth_oauth2.algorithms.2 = RS256 +``` +Configure with key server +``` +auth_oauth2.resource_server_id = new_resource_server_id +auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json +auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem +auth_oauth2.https.peer_verification = verify_peer +auth_oauth2.https.depth = 5 +auth_oauth2.https.fail_if_no_peer_cert = true +auth_oauth2.https.hostname_verification = wildcard +auth_oauth2.algorithms.1 = HS256 +auth_oauth2.algorithms.2 = RS256 +``` ### Resource Server ID and Scope Prefixes OAuth 2.0 (and thus UAA-provided) tokens use scopes to communicate what set of permissions particular diff --git a/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema b/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema index 0feb73d9aa..1c8593e434 100644 --- a/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema +++ b/deps/rabbitmq_auth_backend_oauth2/priv/schema/rabbitmq_auth_backend_oauth2.schema @@ -77,3 +77,52 @@ end, Settings), maps:from_list(SigningKeys) end}. + +{mapping, + "auth_oauth2.jwks_url", + "rabbitmq_auth_backend_oauth2.key_config.jwks_url", + [{datatype, string}, {validators, ["uri", "https_uri"]}]}. + +{mapping, + "auth_oauth2.https.peer_verification", + "rabbitmq_auth_backend_oauth2.key_config.peer_verification", + [{datatype, {enum, [verify_peer, verify_none]}}]}. + +{mapping, + "auth_oauth2.https.cacertfile", + "rabbitmq_auth_backend_oauth2.key_config.cacertfile", + [{datatype, file}, {validators, ["file_accessible"]}]}. + +{mapping, + "auth_oauth2.https.depth", + "rabbitmq_auth_backend_oauth2.key_config.depth", + [{datatype, integer}]}. + +{mapping, + "auth_oauth2.https.hostname_verification", + "rabbitmq_auth_backend_oauth2.key_config.hostname_verification", + [{datatype, {enum, [wildcard, none]}}]}. + +{mapping, + "auth_oauth2.https.crl_check", + "rabbitmq_auth_backend_oauth2.key_config.crl_check", + [{datatype, {enum, [true, false, peer, best_effort]}}]}. + +{mapping, + "auth_oauth2.https.fail_if_no_peer_cert", + "rabbitmq_auth_backend_oauth2.key_config.fail_if_no_peer_cert", + [{datatype, {enum, [true, false]}}]}. + +{validator, "https_uri", "According to the JWT Specification, Key Server URL must be https.", + fun(Uri) -> string:nth_lexeme(Uri, 1, "://") == "https" end}. + +{mapping, + "auth_oauth2.algorithms.$algorithm", + "rabbitmq_auth_backend_oauth2.key_config.algorithms", + [{datatype, string}]}. + +{translation, "rabbitmq_auth_backend_oauth2.key_config.algorithms", + fun(Conf) -> + Settings = cuttlefish_variable:filter_by_prefix("auth_oauth2.algorithms", Conf), + [list_to_binary(V) || {_, V} <- Settings] + end}. diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl new file mode 100644 index 0000000000..d34d9d5d99 --- /dev/null +++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwks.erl @@ -0,0 +1,27 @@ +-module(uaa_jwks). +-export([get/1]). + +-spec get(string() | binary()) -> {ok, term()} | {error, term()}. +get(JwksUrl) -> + httpc:request(get, {JwksUrl, []}, [{ssl, ssl_options()}, {timeout, 60000}], []). + +-spec ssl_options() -> list(). +ssl_options() -> + UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []), + PeerVerification = proplists:get_value(peer_verification, UaaEnv, verify_none), + CaCertFile = proplists:get_value(cacertfile, UaaEnv), + Depth = proplists:get_value(depth, UaaEnv, 10), + FailIfNoPeerCert = proplists:get_value(fail_if_no_peer_cert, UaaEnv, false), + CrlCheck = proplists:get_value(crl_check, UaaEnv, false), + SslOpts0 = [{verify, PeerVerification}, + {cacertfile, CaCertFile}, + {depth, Depth}, + {fail_if_no_peer_cert, FailIfNoPeerCert}, + {crl_check, CrlCheck}, + {crl_cache, {ssl_crl_cache, {internal, [{http, 10000}]}}}], + case proplists:get_value(hostname_verification, UaaEnv, none) of + wildcard -> + [{customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} | SslOpts0]; + none -> + SslOpts0 + end.
\ No newline at end of file diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl index c004c6070c..6721bbf5e7 100644 --- a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl +++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt.erl @@ -58,7 +58,7 @@ update_jwks_signing_keys() -> undefined -> {error, no_jwks_url}; JwksUrl -> - case httpc:request(JwksUrl) of + case uaa_jwks:get(JwksUrl) of {ok, {_, _, JwksBody}} -> KeyList = maps:get(<<"keys">>, jose:decode(erlang:iolist_to_binary(JwksBody)), []), Keys = maps:from_list(lists:map(fun(Key) -> {maps:get(<<"kid">>, Key, undefined), {json, Key}} end, KeyList)), diff --git a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl index bb73cf9ae4..aa1cdd8241 100644 --- a/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl +++ b/deps/rabbitmq_auth_backend_oauth2/src/uaa_jwt_jwt.erl @@ -24,7 +24,15 @@ decode(Token) -> end. decode_and_verify(Jwk, Token) -> - case jose_jwt:verify(Jwk, Token) of + UaaEnv = application:get_env(rabbitmq_auth_backend_oauth2, key_config, []), + Verify = + case proplists:get_value(algorithms, UaaEnv) of + undefined -> + jose_jwt:verify(Jwk, Token); + Algs -> + jose_jwt:verify_strict(Jwk, Algs, Token) + end, + case Verify of {true, #jose_jwt{fields = Fields}, _} -> {true, Fields}; {false, #jose_jwt{fields = Fields}, _} -> {false, Fields} end. diff --git a/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets b/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets index 2b7018fdd8..27976d3abc 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets +++ b/deps/rabbitmq_auth_backend_oauth2/test/config_schema_SUITE_data/rabbitmq_auth_backend_oauth2.snippets @@ -4,7 +4,16 @@ auth_oauth2.additional_scopes_key = my_custom_scope_key auth_oauth2.default_key = id1 auth_oauth2.signing_keys.id1 = test/config_schema_SUITE_data/certs/key.pem - auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem", + auth_oauth2.signing_keys.id2 = test/config_schema_SUITE_data/certs/cert.pem + auth_oauth2.jwks_url = https://my-jwt-issuer/jwks.json + auth_oauth2.https.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem + auth_oauth2.https.peer_verification = verify_none + auth_oauth2.https.depth = 5 + auth_oauth2.https.fail_if_no_peer_cert = false + auth_oauth2.https.hostname_verification = wildcard + auth_oauth2.https.crl_check = true + auth_oauth2.algorithms.1 = HS256 + auth_oauth2.algorithms.2 = RS256", [ {rabbitmq_auth_backend_oauth2, [ {resource_server_id,<<"new_resource_server_id">>}, @@ -16,7 +25,15 @@ <<"id1">> => {pem, <<"I'm not a certificate">>}, <<"id2">> => {pem, <<"I'm not a certificate">>} } - } + }, + {jwks_url, "https://my-jwt-issuer/jwks.json"}, + {cacertfile, "test/config_schema_SUITE_data/certs/cacert.pem"}, + {peer_verification, verify_none}, + {depth, 5}, + {fail_if_no_peer_cert, false}, + {hostname_verification, wildcard}, + {crl_check, true}, + {algorithms, [<<"HS256">>, <<"RS256">>]} ] } ]} diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl index 4823a68e27..2ae29a2761 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl @@ -21,7 +21,9 @@ all() -> [ {group, happy_path}, - {group, unhappy_path} + {group, unhappy_path}, + {group, unvalidated_jwks_server}, + {group, no_peer_verification} ]. groups() -> @@ -34,6 +36,7 @@ groups() -> test_successful_connection_with_complex_claim_as_a_list, test_successful_connection_with_complex_claim_as_a_binary, test_successful_connection_with_keycloak_token, + test_successful_connection_with_algorithm_restriction, test_successful_token_refresh ]}, {unhappy_path, [], [ @@ -41,9 +44,12 @@ groups() -> test_failed_connection_with_a_non_token, test_failed_connection_with_a_token_with_insufficient_vhost_permission, test_failed_connection_with_a_token_with_insufficient_resource_permission, + test_failed_connection_with_algorithm_restriction, test_failed_token_refresh_case1, test_failed_token_refresh_case2 - ]} + ]}, + {unvalidated_jwks_server, [], [test_failed_connection_with_unvalidated_jwks_server]}, + {no_peer_verification, [], [{group, happy_path}, {group, unhappy_path}]} ]. %% @@ -69,23 +75,35 @@ end_per_suite(Config) -> fun stop_jwks_server/1 ] ++ rabbit_ct_broker_helpers:teardown_steps()). +init_per_group(no_peer_verification, Config) -> + add_vhosts(Config), + KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(non_strict_jwks_url, Config)}, {peer_verification, verify_none}]), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]), + rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig}); init_per_group(_Group, Config) -> - %% The broker is managed by {init,end}_per_testcase(). - lists:foreach(fun(Value) -> - rabbit_ct_broker_helpers:add_vhost(Config, Value) - end, - [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]), + add_vhosts(Config), Config. +end_per_group(no_peer_verification, Config) -> + delete_vhosts(Config), + KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), [{jwks_url, ?config(strict_jwks_url, Config)}, {peer_verification, verify_peer}]), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]), + rabbit_ct_helpers:set_config(Config, {key_config, KeyConfig}); + end_per_group(_Group, Config) -> - %% The broker is managed by {init,end}_per_testcase(). - lists:foreach(fun(Value) -> - rabbit_ct_broker_helpers:delete_vhost(Config, Value) - end, - [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]), + delete_vhosts(Config), Config. +add_vhosts(Config) -> + %% The broker is managed by {init,end}_per_testcase(). + lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:add_vhost(Config, Value) end, + [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]). + +delete_vhosts(Config) -> + %% The broker is managed by {init,end}_per_testcase(). + lists:foreach(fun(Value) -> rabbit_ct_broker_helpers:delete_vhost(Config, Value) end, + [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]). init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_a_full_permission_token_and_explicitly_configured_vhost orelse Testcase =:= test_successful_token_refresh -> @@ -107,6 +125,24 @@ init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection rabbit_ct_helpers:testcase_started(Config, Testcase), Config; +init_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction -> + KeyConfig = ?config(key_config, Config), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"HS256">>]} | KeyConfig]]), + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config; + +init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_algorithm_restriction -> + KeyConfig = ?config(key_config, Config), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, [{algorithms, [<<"RS256">>]} | KeyConfig]]), + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config; + +init_per_testcase(Testcase, Config) when Testcase =:= test_failed_connection_with_unvalidated_jwks_server -> + KeyConfig = rabbit_ct_helpers:set_config(?config(key_config, Config), {jwks_url, ?config(non_strict_jwks_url, Config)}), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]), + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config; + init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), Config. @@ -126,6 +162,14 @@ end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_ rabbit_ct_helpers:testcase_started(Config, Testcase), Config; +end_per_testcase(Testcase, Config) when Testcase =:= test_successful_connection_with_algorithm_restriction orelse + Testcase =:= test_failed_connection_with_algorithm_restriction orelse + Testcase =:= test_failed_connection_with_unvalidated_jwks_server -> + rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, ?config(key_config, Config)]), + rabbit_ct_helpers:testcase_finished(Config, Testcase), + Config; + end_per_testcase(Testcase, Config) -> rabbit_ct_broker_helpers:delete_vhost(Config, <<"vhost1">>), rabbit_ct_helpers:testcase_finished(Config, Testcase), @@ -143,13 +187,27 @@ start_jwks_server(Config) -> %% Assume we don't have more than 100 ports allocated for tests PortBase = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_ports_base), JwksServerPort = PortBase + 100, + + %% Both URLs direct to the same JWKS server + %% The NonStrictJwksUrl identity cannot be validated while StrictJwksUrl identity can be validated + NonStrictJwksUrl = "https://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks", + StrictJwksUrl = "https://localhost:" ++ integer_to_list(JwksServerPort) ++ "/jwks", + ok = application:set_env(jwks_http, keys, [Jwk]), + {ok, _} = application:ensure_all_started(ssl), {ok, _} = application:ensure_all_started(cowboy), - ok = jwks_http_app:start(JwksServerPort), - KeyConfig = [{jwks_url, "http://127.0.0.1:" ++ integer_to_list(JwksServerPort) ++ "/jwks"}], + CertsDir = ?config(rmq_certsdir, Config), + ok = jwks_http_app:start(JwksServerPort, CertsDir), + KeyConfig = [{jwks_url, StrictJwksUrl}, + {peer_verification, verify_peer}, + {cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}], ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_auth_backend_oauth2, key_config, KeyConfig]), - rabbit_ct_helpers:set_config(Config, {fixture_jwk, Jwk}). + rabbit_ct_helpers:set_config(Config, + [{non_strict_jwks_url, NonStrictJwksUrl}, + {strict_jwks_url, StrictJwksUrl}, + {key_config, KeyConfig}, + {fixture_jwk, Jwk}]). stop_jwks_server(Config) -> ok = jwks_http_app:stop(), @@ -305,7 +363,7 @@ test_successful_token_refresh(Config) -> Conn = open_unmanaged_connection(Config, 0, <<"vhost1">>, <<"username">>, Token), {ok, Ch} = amqp_connection:open_channel(Conn), - {_Algo, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>, + {_Algo2, Token2} = generate_valid_token(Config, [<<"rabbitmq.configure:vhost1/*">>, <<"rabbitmq.write:vhost1/*">>, <<"rabbitmq.read:vhost1/*">>]), ?UTIL_MOD:wait_for_token_to_expire(timer:seconds(Duration)), @@ -321,6 +379,13 @@ test_successful_token_refresh(Config) -> amqp_channel:close(Ch2), close_connection_and_channel(Conn, Ch). +test_successful_connection_with_algorithm_restriction(Config) -> + {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt), + Conn = open_unmanaged_connection(Config, 0, <<"username">>, Token), + {ok, Ch} = amqp_connection:open_channel(Conn), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), + close_connection_and_channel(Conn, Ch). test_failed_connection_with_expired_token(Config) -> {_Algo, Token} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost1/*">>, @@ -359,7 +424,7 @@ test_failed_token_refresh_case1(Config) -> #'queue.declare_ok'{queue = _} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), - {_Algo, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>, + {_Algo2, Token2} = generate_expired_token(Config, [<<"rabbitmq.configure:vhost4/*">>, <<"rabbitmq.write:vhost4/*">>, <<"rabbitmq.read:vhost4/*">>]), %% the error is communicated asynchronously via a connection-level error @@ -387,3 +452,13 @@ test_failed_token_refresh_case2(Config) -> amqp_connection:open_channel(Conn)), close_connection(Conn). + +test_failed_connection_with_algorithm_restriction(Config) -> + {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt), + ?assertMatch({error, {auth_failure, _}}, + open_unmanaged_connection(Config, 0, <<"username">>, Token)). + +test_failed_connection_with_unvalidated_jwks_server(Config) -> + {_Algo, Token} = rabbit_ct_helpers:get_config(Config, fixture_jwt), + ?assertMatch({error, {auth_failure, _}}, + open_unmanaged_connection(Config, 0, <<"username">>, Token)). diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl index 16353e34f4..c745e436f6 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_http_app.erl @@ -1,8 +1,8 @@ -module(jwks_http_app). --export([start/1, stop/0]). +-export([start/2, stop/0]). -start(Port) -> +start(Port, CertsDir) -> Dispatch = cowboy_router:compile( [ @@ -11,8 +11,10 @@ start(Port) -> ]} ] ), - {ok, _} = cowboy:start_clear(jwks_http_listener, - [{port, Port}], + {ok, _} = cowboy:start_tls(jwks_http_listener, + [{port, Port}, + {certfile, filename:join([CertsDir, "server", "cert.pem"])}, + {keyfile, filename:join([CertsDir, "server", "key.pem"])}], #{env => #{dispatch => Dispatch}}), ok. diff --git a/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl b/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl index e586ffca8c..80b6648352 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/rabbit_auth_backend_oauth2_test_util.erl @@ -73,7 +73,7 @@ expired_token_with_scopes(Scopes) -> token_with_scopes_and_expiration(Scopes, os:system_time(seconds) - 10). fixture_token_with_scopes(Scopes) -> - token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 10). + token_with_scopes_and_expiration(Scopes, os:system_time(seconds) + 30). token_with_scopes_and_expiration(Scopes, Expiration) -> %% expiration is a timestamp with precision in seconds diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3ff764a808..b8f4fdc923 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -87,7 +87,9 @@ delete_super_stream(VirtualHost, Name, Username) -> gen_server:call(?MODULE, {delete_super_stream, VirtualHost, Name, Username}). --spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found. +-spec lookup_leader(binary(), binary()) -> + {ok, pid()} | {error, not_available} | + {error, not_found}. lookup_leader(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). @@ -294,20 +296,25 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> LeaderPid = amqqueue:get_pid(Q), case process_alive(LeaderPid) of true -> - LeaderPid; + {ok, LeaderPid}; false -> case leader_from_members(Q) of {ok, Pid} -> - Pid; + {ok, Pid}; _ -> - cluster_not_found + {error, not_available} end end; _ -> - cluster_not_found + {error, not_found} end; - _ -> - cluster_not_found + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, not_found}; + _ -> + {error, not_available} + end end, {reply, Res, State}; handle_call({lookup_local_member, VirtualHost, Stream}, _From, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 046d59463b..9c29090f4a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1494,7 +1494,7 @@ handle_frame_post_auth(Transport, of {false, false} -> case lookup_leader(Stream, Connection0) of - cluster_not_found -> + {error, not_found} -> response(Transport, Connection0, declare_publisher, @@ -1504,6 +1504,16 @@ handle_frame_post_auth(Transport, ?STREAM_DOES_NOT_EXIST, 1), {Connection0, State}; + {error, not_available} -> + response(Transport, + Connection0, + declare_publisher, + CorrelationId, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {Connection0, State}; {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = @@ -1960,9 +1970,9 @@ handle_frame_post_auth(_Transport, of ok -> case lookup_leader(Stream, Connection) of - cluster_not_found -> - rabbit_log:warning("Could not find leader to store offset on ~p", - [Stream]), + {error, Error} -> + rabbit_log:warning("Could not find leader to store offset on ~p: ~p", + [Stream, Error]), %% FIXME store offset is fire-and-forget, so no response even if error, change this? {Connection, State}; {ClusterLeader, Connection1} -> @@ -1992,11 +2002,16 @@ handle_frame_post_auth(Transport, of ok -> case lookup_leader(Stream, Connection0) of - cluster_not_found -> + {error, not_found} -> rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0}; + {error, not_available} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0}; {LeaderPid, C} -> {RC, O} = case osiris:read_tracking(LeaderPid, Reference) of @@ -2532,9 +2547,9 @@ lookup_leader(Stream, case maps:get(Stream, StreamLeaders, undefined) of undefined -> case lookup_leader_from_manager(VirtualHost, Stream) of - cluster_not_found -> - cluster_not_found; - LeaderPid -> + {error, Error} -> + {error, Error}; + {ok, LeaderPid} -> Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection), {LeaderPid, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index 5d008958dd..7dcf3ea494 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -27,9 +27,9 @@ <properties> <stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version> - <junit.jupiter.version>5.8.1</junit.jupiter.version> + <junit.jupiter.version>5.8.2</junit.jupiter.version> <assertj.version>3.21.0</assertj.version> - <logback.version>1.2.6</logback.version> + <logback.version>1.2.7</logback.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> <spotless.version>2.2.0</spotless.version> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java index 8903ee0c62..6ecf2b4ae4 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java @@ -16,6 +16,9 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode; import static org.assertj.core.api.Assertions.assertThat; import com.rabbitmq.stream.impl.Client; @@ -40,8 +43,7 @@ public class ClusterSizeTest { String s = UUID.randomUUID().toString(); Response response = client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize)); - assertThat(response.isOk()).isFalse(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED); + assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED)); } @ParameterizedTest @@ -53,7 +55,7 @@ public class ClusterSizeTest { try { Response response = client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize)); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); StreamMetadata metadata = client.metadata(s).get(s); assertThat(metadata).isNotNull(); assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index b806471207..bf47ad01ee 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,11 +16,16 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; +import static com.rabbitmq.stream.TestUtils.waitAtMost; +import static com.rabbitmq.stream.TestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import com.rabbitmq.stream.codec.WrapperMessageBuilder; import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.ClientParameters; +import com.rabbitmq.stream.impl.Client.Response; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -66,7 +71,7 @@ public class FailureTest { Client.StreamMetadata streamMetadata = metadata.get(stream); assertThat(streamMetadata).isNotNull(); - TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2); + waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2); streamMetadata = client.metadata(stream).get(stream); assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); @@ -107,7 +112,7 @@ public class FailureTest { assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); // wait until there's a new leader - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(10), () -> { Client.StreamMetadata m = publisher.metadata(stream).get(stream); @@ -133,7 +138,7 @@ public class FailureTest { } // wait until all the replicas are there - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(10), () -> { LOGGER.info("Getting metadata for {}", stream); @@ -164,7 +169,7 @@ public class FailureTest { consumeLatch.countDown(); })); - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(5), () -> { Client.Response response = @@ -219,7 +224,7 @@ public class FailureTest { cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); // wait until there's a new leader try { - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(5), () -> { Client.StreamMetadata m = locator.metadata(stream).get(stream); @@ -314,7 +319,7 @@ public class FailureTest { Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); // wait until all the replicas are there - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(5), () -> { Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); @@ -350,7 +355,7 @@ public class FailureTest { Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); assertThat(generations).hasSize(2).contains(0L, 1L); @@ -372,8 +377,7 @@ public class FailureTest { Client.StreamMetadata streamMetadata = metadata.get(stream); assertThat(streamMetadata).isNotNull(); - TestUtils.waitUntil( - () -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2); + waitUntil(() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2); metadata = metadataClient.metadata(stream); streamMetadata = metadata.get(stream); @@ -497,7 +501,7 @@ public class FailureTest { Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); // let's publish for a bit of time Thread.sleep(2000); @@ -521,7 +525,7 @@ public class FailureTest { confirmedCount = confirmed.size(); // wait until all the replicas are there - TestUtils.waitAtMost( + waitAtMost( Duration.ofSeconds(10), () -> { Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); @@ -535,9 +539,9 @@ public class FailureTest { keepPublishing.set(false); - assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); + waitAtMost(Duration.ofSeconds(10), () -> consumed.size() >= confirmed.size()); assertThat(generations).hasSize(2).contains(0L, 1L); assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); @@ -551,4 +555,33 @@ public class FailureTest { confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); } + + @Test + void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Exception { + try { + Host.rabbitmqctl("stop_app"); + } finally { + Host.rabbitmqctl("start_app"); + } + AtomicReference<Client> client = new AtomicReference<>(); + waitUntil( + () -> { + try { + client.set(cf.get(new ClientParameters().port(TestUtils.streamPortNode1()))); + } catch (Exception e) { + + } + return client.get() != null; + }); + Set<Short> responseCodes = ConcurrentHashMap.newKeySet(); + + waitUntil( + () -> { + Response response = client.get().declarePublisher((byte) 0, null, stream); + responseCodes.add(response.getResponseCode()); + return response.isOk(); + }); + + assertThat(responseCodes).doesNotContain(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java index 104f5772c1..7d387c09c2 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java @@ -16,6 +16,9 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ko; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.responseCode; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -47,8 +50,7 @@ public class LeaderLocatorTest { Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); String s = UUID.randomUUID().toString(); Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo")); - assertThat(response.isOk()).isFalse(); - assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED); + assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED)); } @Test @@ -60,7 +62,7 @@ public class LeaderLocatorTest { try { Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "client-local")); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); StreamMetadata metadata = client.metadata(s).get(s); assertThat(metadata).isNotNull(); assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); @@ -136,7 +138,7 @@ public class LeaderLocatorTest { Response response = client.create( s, Collections.singletonMap("queue-leader-locator", "least-leaders")); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); createdStreams.add(s); }); diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index e0a5afee67..03015a0c44 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -16,11 +16,13 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.Response; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.lang.reflect.Field; @@ -30,6 +32,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BooleanSupplier; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.*; @@ -106,7 +109,7 @@ public class TestUtils { .eventLoopGroup(eventLoopGroup(context)) .port(streamPortNode1())); Client.Response response = client.create(stream); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); client.close(); store(context).put("testMethodStream", stream); } catch (NoSuchFieldException e) { @@ -136,7 +139,7 @@ public class TestUtils { .eventLoopGroup(eventLoopGroup(context)) .port(streamPortNode1())); Client.Response response = client.delete(stream); - assertThat(response.isOk()).isTrue(); + assertThat(response).is(ok()); client.close(); store(context).remove("testMethodStream"); } catch (NoSuchFieldException e) { @@ -197,4 +200,22 @@ public class TestUtils { } } } + + static class ResponseConditions { + + static Condition<Response> ok() { + return new Condition<>(Response::isOk, "Response should be OK"); + } + + static Condition<Response> ko() { + return new Condition<>(response -> !response.isOk(), "Response should be OK"); + } + + static Condition<Response> responseCode(short expectedResponse) { + return new Condition<>( + response -> response.getResponseCode() == expectedResponse, + "response code %d", + expectedResponse); + } + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index f9402c2b4b..397b9f6d53 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -16,7 +16,7 @@ all() -> [{group, non_parallel_tests}]. groups() -> - [{non_parallel_tests, [], [manage_super_stream]}]. + [{non_parallel_tests, [], [manage_super_stream, lookup_leader]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -71,6 +71,17 @@ end_per_testcase(Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- +lookup_leader(Config) -> + Stream = <<"stream_manager_lookup_leader_stream">>, + ?assertMatch({ok, _}, create_stream(Config, Stream)), + + {ok, Pid} = lookup_leader(Config, Stream), + ?assert(is_pid(Pid)), + + ?assertEqual({error, not_found}, lookup_leader(Config, <<"foo">>)), + + ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). + manage_super_stream(Config) -> % create super stream ?assertEqual(ok, @@ -140,6 +151,20 @@ create_stream(Config, Name) -> create, [<<"/">>, Name, [], <<"guest">>]). +delete_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + delete, + [<<"/">>, Name, <<"guest">>]). + +lookup_leader(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + lookup_leader, + [<<"/">>, Name]). + partitions(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml index 8f7f3cb717..5ec43a9a0e 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml @@ -27,11 +27,11 @@ <properties> <stream-client.version>[0.5.0-SNAPSHOT,1.0-SNAPSHOT)</stream-client.version> - <junit.jupiter.version>5.8.1</junit.jupiter.version> + <junit.jupiter.version>5.8.2</junit.jupiter.version> <assertj.version>3.21.0</assertj.version> - <okhttp.version>4.9.2</okhttp.version> + <okhttp.version>4.9.3</okhttp.version> <gson.version>2.8.9</gson.version> - <logback.version>1.2.6</logback.version> + <logback.version>1.2.7</logback.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> <spotless.version>2.2.0</spotless.version> diff --git a/tools/erlang_ls.bzl b/tools/erlang_ls.bzl index adde90405d..073a83787a 100644 --- a/tools/erlang_ls.bzl +++ b/tools/erlang_ls.bzl @@ -15,7 +15,9 @@ deps_dirs: - bazel-bin/external/* include_dirs: - deps + - deps/* - deps/*/include + - deps/*/src - bazel-bin/external - bazel-bin/external/*/include plt_path: bazel-bin/deps/rabbit/.base_plt.plt |
