summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-06-13 15:54:56 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-06-13 15:54:56 -0400
commit6ed9b4f02fd0e2e5ecf4d9a02ac74d047028be47 (patch)
tree39dd44d7c454976fbbde232cf6a4d11fc85bf9f0
parent4edbdfc4ea276cf276608f987dc30b3311560832 (diff)
downloadrabbitmq-server-git-confirm_on_ack.tar.gz
Only support confirm-on as a policy. Make it conflict with ha-mode policyconfirm_on_ack
A new registry class: `policy_conflicts` with a new validator: `validate_policy_conflicts` This validator checks multiple policy keys to see if they conflict after all of them were validated independently. Currently only implement for confirm-on and ha-mode.
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_policies.erl20
-rw-r--r--src/rabbit_policy.erl30
-rw-r--r--test/confirms_rejects_SUITE.erl46
5 files changed, 112 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d975d4d9a9..2743b005a1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -694,7 +694,7 @@ declare_args() ->
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
{<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2},
- {<<"x-confirm-on">>, fun check_confirm_on/2}].
+ {<<"x-confirm-on">>, fun confirm_on_not_supported/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -787,13 +787,8 @@ check_queue_type({longstr, Val}, _Args) ->
check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_confirm_on({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"enqueue">>, <<"ack">>]) of
- true -> ok;
- false -> {error, invalid_confirm_on}
- end;
-check_confirm_on({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+confirm_on_not_supported(_, _Args) ->
+ {error, can_only_be_set_as_policy}.
-spec list() -> [amqqueue:amqqueue()].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e2d857b507..60ca888c6a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -425,9 +425,10 @@ bq_init(BQ, Q, Recover) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
-process_args_policy(State = #q{q = Q,
+process_args_policy(State0 = #q{q = Q,
args_policy_version = N}) ->
- ArgsTable =
+
+ ArgsTable =
[{<<"expires">>, fun res_min/2, fun init_exp/2},
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
@@ -435,12 +436,20 @@ process_args_policy(State = #q{q = Q,
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
- {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2},
- {<<"confirm-on">>, fun res_arg/2, fun init_confirm_on/2}],
- drop_expired_msgs(
- lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
- Fun(args_policy_lookup(Name, Resolve, Q), StateN)
- end, State#q{args_policy_version = N + 1}, ArgsTable)).
+ {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
+ State1 = lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
+ Fun(args_policy_lookup(Name, Resolve, Q), StateN)
+ end,
+ State0#q{args_policy_version = N + 1},
+ ArgsTable),
+ %% Policies which cannot be set as x- args
+ PoliciesTable = [{<<"confirm-on">>, fun init_confirm_on/2}],
+ State2 = lists:foldl(fun({Name, Fun}, StateN) ->
+ Fun(rabbit_policy:get(Name, Q), StateN)
+ end,
+ State1,
+ PoliciesTable),
+ drop_expired_msgs(State2).
args_policy_lookup(Name, Resolve, Q) ->
Args = amqqueue:get_arguments(Q),
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 5dcfc4f980..329d489ad1 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -20,11 +20,15 @@
%% validation functions.
-behaviour(rabbit_policy_validator).
+-behaviour(rabbit_policy_conflict_validator).
-behaviour(rabbit_policy_merge_strategy).
-include("rabbit.hrl").
--export([register/0, validate_policy/1, merge_policy_value/3]).
+-export([register/0,
+ validate_policy/1,
+ merge_policy_value/3,
+ validate_policy_conflicts/1]).
-rabbit_boot_step({?MODULE,
[{description, "internal policies"},
@@ -47,6 +51,8 @@ register() ->
{policy_validator, <<"overflow">>},
{policy_validator, <<"delivery-limit">>},
{policy_validator, <<"confirm-on">>},
+ {policy_conflicts, <<"confirm-on">>},
+ {policy_conflicts, <<"ha-mode">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -68,6 +74,18 @@ validate_policy(Terms) ->
(_, Error) -> Error
end, ok, Terms).
+validate_policy_conflicts(Terms) ->
+ ConfirmOn = proplists:get_value(<<"confirm-on">>, Terms, undefined),
+ HaMode = proplists:get_value(<<"ha-mode">>, Terms, undefined),
+ case {ConfirmOn, HaMode} of
+ {<<"ack">>, undefined} ->
+ ok;
+ {<<"ack">>, _} ->
+ {error, "Confirm on ack is incompatible with HA mode", []};
+ _ ->
+ ok
+ end.
+
validate_policy0(<<"alternate-exchange">>, Value)
when is_binary(Value) ->
ok;
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index ff07bfa8ee..00c4d9d900 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -494,7 +494,35 @@ validation_op(Name, Terms) ->
validation(Name, Terms, operator_policy_validator).
validation(Name, Terms) ->
- validation(Name, Terms, policy_validator).
+ conflict_validation(Terms, policy_conflicts,
+ validation(Name, Terms, policy_validator)).
+
+conflict_validation(Terms, Validator, ok) ->
+ Registry = rabbit_registry:lookup_all(Validator),
+ {RegistryKeys, _} = lists:unzip(Registry),
+ [] = dups(RegistryKeys), %% ASSERTION
+
+ ModuleToKeys = lists:foldl(
+ fun({Key, Module}, ModuleToKeys) ->
+ maps:update_with(Module,
+ fun(Keys) -> [Key | Keys] end,
+ [Key],
+ ModuleToKeys)
+ end,
+ #{},
+ Registry),
+
+ maps:fold(
+ fun (Module, Keys, ok) ->
+ Policies = [Term
+ || Term = {Key, _} <- Terms,
+ lists:member(binary_to_atom(Key, utf8), Keys)],
+ Module:validate_policy_conflicts(Policies);
+ (_Module, _Keys, Error) ->
+ Error
+ end,
+ ok,
+ ModuleToKeys).
validation(_Name, [], _Validator) ->
{error, "no policy provided", []};
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 0842490fde..d02a2a5624 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -22,7 +22,9 @@ groups() ->
dead_queue_rejects,
mixed_dead_alive_queues_reject,
{confirm_on_ack, [],
- [coa_policy_resets_to_enqueue,
+ [coa_argument_not_supported,
+ coa_ha_mode_policies_conflict,
+ coa_policy_resets_to_enqueue,
coa_confirms_on_ack,
coa_rejects_on_nack_without_requeue,
coa_confirms_on_no_ack,
@@ -72,6 +74,7 @@ end_per_group(_Group, Config) ->
init_per_testcase(Testcase, Config)
when Testcase == policy_resets_to_default;
+ Testcase == coa_argument_not_supported;
Testcase == coa_policy_resets_to_enqueue;
Testcase == coa_confirms_on_no_ack;
Testcase == coa_confirms_on_ack;
@@ -83,6 +86,8 @@ init_per_testcase(Testcase, Config)
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
rabbit_ct_helpers:testcase_started(
rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
+init_per_testcase(coa_ha_mode_policies_conflict = Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(Testcase, Config)
when Testcase == confirms_rejects_conflict;
Testcase == dead_queue_rejects;
@@ -108,7 +113,8 @@ end_per_testcase(coa_rejects_on_dead_letter = Testcase, Config) ->
clean_acks_mailbox(),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config)
- when Testcase == coa_confirms_on_ack;
+ when Testcase == coa_argument_not_supported;
+ Testcase == coa_confirms_on_ack;
Testcase == coa_confirms_on_no_ack;
Testcase == coa_policy_resets_to_enqueue;
Testcase == coa_rejects_on_nack_without_requeue;
@@ -126,6 +132,11 @@ end_per_testcase(Testcase, Config)
clean_consume_mailbox(),
clean_acks_mailbox(),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
+end_per_testcase(coa_ha_mode_policies_conflict = Testcase, Config) ->
+ TestNameBin = <<"coa_ha_mode_policies_conflict">>,
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<TestNameBin/binary, "_invalid_policy">>),
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<TestNameBin/binary, "_valid_policy">>),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(policy_resets_to_default = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
XOverflow = ?config(overflow, Config),
@@ -367,6 +378,37 @@ policy_resets_to_default(Config) ->
_ -> ok
end.
+coa_argument_not_supported(Config) ->
+ Conn = ?config(conn, Config),
+
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ QueueName = <<"coa_policy_resets_to_enqueue">>,
+ Declare = #'queue.declare'{queue = QueueName,
+ durable = true,
+ arguments = [{<<"x-confirm-on">>, longstr, <<"ack">>}]},
+ try amqp_channel:call(Ch, Declare) of
+ _ -> exit(expected_to_exit)
+ catch
+ exit:{{shutdown, {server_initiated_close, Code, _}},_} ->
+ ?PRECONDITION_FAILED = Code
+ end.
+
+coa_ha_mode_policies_conflict(Config) ->
+ TestName = <<"coa_ha_mode_policies_conflict">>,
+ InvalidPolicy = [{<<"ha-mode">>, <<"all">>},
+ {<<"confirm-on">>, <<"ack">>}],
+ {error_string, _} = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_policy, set,
+ [<<"/">>, <<TestName/binary, "_invalid_policy">>, TestName,
+ InvalidPolicy, 0, <<"queues">>, <<"acting-user">>]),
+ ValidPolicy = [{<<"confirm-on">>, <<"ack">>}],
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_policy, set,
+ [<<"/">>, <<TestName/binary, "_valid_policy">>, TestName,
+ ValidPolicy, 0, <<"queues">>, <<"acting-user">>]).
+
+
coa_policy_resets_to_enqueue(Config) ->
Conn = ?config(conn, Config),