diff options
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
-rw-r--r-- | src/rabbit_policies.erl | 20 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 30 | ||||
-rw-r--r-- | test/confirms_rejects_SUITE.erl | 46 |
5 files changed, 112 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d975d4d9a9..2743b005a1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -694,7 +694,7 @@ declare_args() -> {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, {<<"x-queue-type">>, fun check_queue_type/2}, {<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}, - {<<"x-confirm-on">>, fun check_confirm_on/2}]. + {<<"x-confirm-on">>, fun confirm_on_not_supported/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -787,13 +787,8 @@ check_queue_type({longstr, Val}, _Args) -> check_queue_type({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_confirm_on({longstr, Val}, _Args) -> - case lists:member(Val, [<<"enqueue">>, <<"ack">>]) of - true -> ok; - false -> {error, invalid_confirm_on} - end; -check_confirm_on({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. +confirm_on_not_supported(_, _Args) -> + {error, can_only_be_set_as_policy}. -spec list() -> [amqqueue:amqqueue()]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e2d857b507..60ca888c6a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,9 +425,10 @@ bq_init(BQ, Q, Recover) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). -process_args_policy(State = #q{q = Q, +process_args_policy(State0 = #q{q = Q, args_policy_version = N}) -> - ArgsTable = + + ArgsTable = [{<<"expires">>, fun res_min/2, fun init_exp/2}, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, @@ -435,12 +436,20 @@ process_args_policy(State = #q{q = Q, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, {<<"overflow">>, fun res_arg/2, fun init_overflow/2}, - {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}, - {<<"confirm-on">>, fun res_arg/2, fun init_confirm_on/2}], - drop_expired_msgs( - lists:foldl(fun({Name, Resolve, Fun}, StateN) -> - Fun(args_policy_lookup(Name, Resolve, Q), StateN) - end, State#q{args_policy_version = N + 1}, ArgsTable)). + {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], + State1 = lists:foldl(fun({Name, Resolve, Fun}, StateN) -> + Fun(args_policy_lookup(Name, Resolve, Q), StateN) + end, + State0#q{args_policy_version = N + 1}, + ArgsTable), + %% Policies which cannot be set as x- args + PoliciesTable = [{<<"confirm-on">>, fun init_confirm_on/2}], + State2 = lists:foldl(fun({Name, Fun}, StateN) -> + Fun(rabbit_policy:get(Name, Q), StateN) + end, + State1, + PoliciesTable), + drop_expired_msgs(State2). args_policy_lookup(Name, Resolve, Q) -> Args = amqqueue:get_arguments(Q), diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 5dcfc4f980..329d489ad1 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -20,11 +20,15 @@ %% validation functions. -behaviour(rabbit_policy_validator). +-behaviour(rabbit_policy_conflict_validator). -behaviour(rabbit_policy_merge_strategy). -include("rabbit.hrl"). --export([register/0, validate_policy/1, merge_policy_value/3]). +-export([register/0, + validate_policy/1, + merge_policy_value/3, + validate_policy_conflicts/1]). -rabbit_boot_step({?MODULE, [{description, "internal policies"}, @@ -47,6 +51,8 @@ register() -> {policy_validator, <<"overflow">>}, {policy_validator, <<"delivery-limit">>}, {policy_validator, <<"confirm-on">>}, + {policy_conflicts, <<"confirm-on">>}, + {policy_conflicts, <<"ha-mode">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, @@ -68,6 +74,18 @@ validate_policy(Terms) -> (_, Error) -> Error end, ok, Terms). +validate_policy_conflicts(Terms) -> + ConfirmOn = proplists:get_value(<<"confirm-on">>, Terms, undefined), + HaMode = proplists:get_value(<<"ha-mode">>, Terms, undefined), + case {ConfirmOn, HaMode} of + {<<"ack">>, undefined} -> + ok; + {<<"ack">>, _} -> + {error, "Confirm on ack is incompatible with HA mode", []}; + _ -> + ok + end. + validate_policy0(<<"alternate-exchange">>, Value) when is_binary(Value) -> ok; diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index ff07bfa8ee..00c4d9d900 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -494,7 +494,35 @@ validation_op(Name, Terms) -> validation(Name, Terms, operator_policy_validator). validation(Name, Terms) -> - validation(Name, Terms, policy_validator). + conflict_validation(Terms, policy_conflicts, + validation(Name, Terms, policy_validator)). + +conflict_validation(Terms, Validator, ok) -> + Registry = rabbit_registry:lookup_all(Validator), + {RegistryKeys, _} = lists:unzip(Registry), + [] = dups(RegistryKeys), %% ASSERTION + + ModuleToKeys = lists:foldl( + fun({Key, Module}, ModuleToKeys) -> + maps:update_with(Module, + fun(Keys) -> [Key | Keys] end, + [Key], + ModuleToKeys) + end, + #{}, + Registry), + + maps:fold( + fun (Module, Keys, ok) -> + Policies = [Term + || Term = {Key, _} <- Terms, + lists:member(binary_to_atom(Key, utf8), Keys)], + Module:validate_policy_conflicts(Policies); + (_Module, _Keys, Error) -> + Error + end, + ok, + ModuleToKeys). validation(_Name, [], _Validator) -> {error, "no policy provided", []}; diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 0842490fde..d02a2a5624 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -22,7 +22,9 @@ groups() -> dead_queue_rejects, mixed_dead_alive_queues_reject, {confirm_on_ack, [], - [coa_policy_resets_to_enqueue, + [coa_argument_not_supported, + coa_ha_mode_policies_conflict, + coa_policy_resets_to_enqueue, coa_confirms_on_ack, coa_rejects_on_nack_without_requeue, coa_confirms_on_no_ack, @@ -72,6 +74,7 @@ end_per_group(_Group, Config) -> init_per_testcase(Testcase, Config) when Testcase == policy_resets_to_default; + Testcase == coa_argument_not_supported; Testcase == coa_policy_resets_to_enqueue; Testcase == coa_confirms_on_no_ack; Testcase == coa_confirms_on_ack; @@ -83,6 +86,8 @@ init_per_testcase(Testcase, Config) Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), rabbit_ct_helpers:testcase_started( rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase); +init_per_testcase(coa_ha_mode_policies_conflict = Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase); init_per_testcase(Testcase, Config) when Testcase == confirms_rejects_conflict; Testcase == dead_queue_rejects; @@ -108,7 +113,8 @@ end_per_testcase(coa_rejects_on_dead_letter = Testcase, Config) -> clean_acks_mailbox(), rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) - when Testcase == coa_confirms_on_ack; + when Testcase == coa_argument_not_supported; + Testcase == coa_confirms_on_ack; Testcase == coa_confirms_on_no_ack; Testcase == coa_policy_resets_to_enqueue; Testcase == coa_rejects_on_nack_without_requeue; @@ -126,6 +132,11 @@ end_per_testcase(Testcase, Config) clean_consume_mailbox(), clean_acks_mailbox(), rabbit_ct_helpers:testcase_finished(Config, Testcase); +end_per_testcase(coa_ha_mode_policies_conflict = Testcase, Config) -> + TestNameBin = <<"coa_ha_mode_policies_conflict">>, + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<TestNameBin/binary, "_invalid_policy">>), + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<TestNameBin/binary, "_valid_policy">>), + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(policy_resets_to_default = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), XOverflow = ?config(overflow, Config), @@ -367,6 +378,37 @@ policy_resets_to_default(Config) -> _ -> ok end. +coa_argument_not_supported(Config) -> + Conn = ?config(conn, Config), + + {ok, Ch} = amqp_connection:open_channel(Conn), + + QueueName = <<"coa_policy_resets_to_enqueue">>, + Declare = #'queue.declare'{queue = QueueName, + durable = true, + arguments = [{<<"x-confirm-on">>, longstr, <<"ack">>}]}, + try amqp_channel:call(Ch, Declare) of + _ -> exit(expected_to_exit) + catch + exit:{{shutdown, {server_initiated_close, Code, _}},_} -> + ?PRECONDITION_FAILED = Code + end. + +coa_ha_mode_policies_conflict(Config) -> + TestName = <<"coa_ha_mode_policies_conflict">>, + InvalidPolicy = [{<<"ha-mode">>, <<"all">>}, + {<<"confirm-on">>, <<"ack">>}], + {error_string, _} = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_policy, set, + [<<"/">>, <<TestName/binary, "_invalid_policy">>, TestName, + InvalidPolicy, 0, <<"queues">>, <<"acting-user">>]), + ValidPolicy = [{<<"confirm-on">>, <<"ack">>}], + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_policy, set, + [<<"/">>, <<TestName/binary, "_valid_policy">>, TestName, + ValidPolicy, 0, <<"queues">>, <<"acting-user">>]). + + coa_policy_resets_to_enqueue(Config) -> Conn = ?config(conn, Config), |