diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2020-01-15 04:06:52 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-01-15 04:06:52 +0300 |
| commit | 04700dde4a79ab6501b282d21b9bc3c876e732db (patch) | |
| tree | 2009d515f7d13460d1005386ad86ca2e84f5d36f | |
| parent | e4fdc7d6c657752b6e0df8f73d4c14159c292065 (diff) | |
| parent | f3b27c75a50bc1357f6eec7f8b1359083e9c51ad (diff) | |
| download | rabbitmq-server-git-04700dde4a79ab6501b282d21b9bc3c876e732db.tar.gz | |
Merge pull request #2210 from rabbitmq/filter-applicable-policies
Filter policies that cannot be applied to quorum queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 10 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 22 |
5 files changed, 56 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5ff2e09636..50f804bc14 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,6 +60,8 @@ -export([rebalance/3]). -export([collect_info_all/2]). +-export([is_policy_applicable/2]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -459,6 +461,17 @@ policy_changed(Q1, Q2) -> %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). +is_policy_applicable(QName, Policy) -> + case lookup(QName) of + {ok, Q} when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue:is_policy_applicable(Q, Policy); + {ok, Q} when ?amqqueue_is_classic(Q) -> + rabbit_amqqueue_process:is_policy_applicable(Q, Policy); + _ -> + %% Defaults to previous behaviour. Apply always + true + end. + -spec lookup (name()) -> rabbit_types:ok(amqqueue:amqqueue()) | diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7bf8308614..36eea86bb5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,6 +33,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). -export([format/1]). +-export([is_policy_applicable/2]). %% Queue's state -record(q, { @@ -1783,6 +1784,10 @@ format(Q) when ?is_amqqueue(Q) -> {node, node(amqqueue:get_pid(Q))}] end. +-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). +is_policy_applicable(_Q, _Policy) -> + true. + log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); log_delete_exclusive(ConPid, #q{ q = Q }) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7b1438ee7a..91102fd91a 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -463,8 +463,9 @@ match(Name, Policies) -> match_all(Name, Policies) -> lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]). -matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> +matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) -> matches_type(Kind, pget('apply-to', Policy)) andalso + is_applicable(Resource, pget(definition, Policy)) andalso match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso VHost =:= pget(vhost, Policy). @@ -476,6 +477,11 @@ matches_type(_, _) -> false. sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). +is_applicable(#resource{kind = queue} = Resource, Policy) -> + rabbit_amqqueue:is_policy_applicable(Resource, Policy); +is_applicable(_, _) -> + true. + %%---------------------------------------------------------------------------- operator_policy_validation() -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e43e61db28..b8df6a3028 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -45,6 +45,7 @@ -export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). +-export([is_policy_applicable/2]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -299,6 +300,15 @@ filter_quorum_critical(Queues, ReplicaStates) -> length(AllUp) =< MinQuorum end, Queues). +-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). +is_policy_applicable(_Q, Policy) -> + Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>, + <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>, + <<"dead-letter-routing-key">>], + lists:all(fun({P, _}) -> + lists:member(P, Applicable) + end, Policy). + rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), ets:delete(queue_metrics, QName), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 3a872ff607..456bd31fb8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -132,7 +132,8 @@ all_tests() -> queue_length_in_memory_bytes_limit, queue_length_in_memory_purge, in_memory, - consumer_metrics + consumer_metrics, + invalid_policy ]. memory_tests() -> @@ -810,6 +811,25 @@ dead_letter_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). +invalid_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"message-ttl">>, 5}]), + Info = rpc:call(Server, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QQ)]), + ?assertEqual('', proplists:get_value(policy, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>). + dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
