diff options
| author | dcorbacho <dparracorbacho@piotal.io> | 2020-01-14 15:19:28 +0100 |
|---|---|---|
| committer | dcorbacho <dparracorbacho@piotal.io> | 2020-01-14 15:50:35 +0100 |
| commit | f3b27c75a50bc1357f6eec7f8b1359083e9c51ad (patch) | |
| tree | 9a899d2612fa52623257c9b050d4b0b87ed07e4e | |
| parent | 27212242463cfc3327aafd3010421f769afa7be8 (diff) | |
| download | rabbitmq-server-git-f3b27c75a50bc1357f6eec7f8b1359083e9c51ad.tar.gz | |
Filter policies that cannot be applied to quorum queues
Some policies, such as highly available, do not apply to all types of queues.
Even though quorum queues ignores some policies, they're still listed as
an applied policy on this type of queue. This commit ignores filters these
policies when applied, so they'll never be listed on the wrong type of queue.
[#169811193]
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 10 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 22 |
5 files changed, 56 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5ff2e09636..50f804bc14 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,6 +60,8 @@ -export([rebalance/3]). -export([collect_info_all/2]). +-export([is_policy_applicable/2]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -459,6 +461,17 @@ policy_changed(Q1, Q2) -> %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). +is_policy_applicable(QName, Policy) -> + case lookup(QName) of + {ok, Q} when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue:is_policy_applicable(Q, Policy); + {ok, Q} when ?amqqueue_is_classic(Q) -> + rabbit_amqqueue_process:is_policy_applicable(Q, Policy); + _ -> + %% Defaults to previous behaviour. Apply always + true + end. + -spec lookup (name()) -> rabbit_types:ok(amqqueue:amqqueue()) | diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7bf8308614..36eea86bb5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,6 +33,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). -export([format/1]). +-export([is_policy_applicable/2]). %% Queue's state -record(q, { @@ -1783,6 +1784,10 @@ format(Q) when ?is_amqqueue(Q) -> {node, node(amqqueue:get_pid(Q))}] end. +-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). +is_policy_applicable(_Q, _Policy) -> + true. + log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); log_delete_exclusive(ConPid, #q{ q = Q }) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7b1438ee7a..91102fd91a 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -463,8 +463,9 @@ match(Name, Policies) -> match_all(Name, Policies) -> lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]). -matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> +matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) -> matches_type(Kind, pget('apply-to', Policy)) andalso + is_applicable(Resource, pget(definition, Policy)) andalso match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso VHost =:= pget(vhost, Policy). @@ -476,6 +477,11 @@ matches_type(_, _) -> false. sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). +is_applicable(#resource{kind = queue} = Resource, Policy) -> + rabbit_amqqueue:is_policy_applicable(Resource, Policy); +is_applicable(_, _) -> + true. + %%---------------------------------------------------------------------------- operator_policy_validation() -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e43e61db28..b8df6a3028 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -45,6 +45,7 @@ -export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). +-export([is_policy_applicable/2]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -299,6 +300,15 @@ filter_quorum_critical(Queues, ReplicaStates) -> length(AllUp) =< MinQuorum end, Queues). +-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). +is_policy_applicable(_Q, Policy) -> + Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>, + <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>, + <<"dead-letter-routing-key">>], + lists:all(fun({P, _}) -> + lists:member(P, Applicable) + end, Policy). + rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), ets:delete(queue_metrics, QName), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 3a872ff607..456bd31fb8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -132,7 +132,8 @@ all_tests() -> queue_length_in_memory_bytes_limit, queue_length_in_memory_purge, in_memory, - consumer_metrics + consumer_metrics, + invalid_policy ]. memory_tests() -> @@ -810,6 +811,25 @@ dead_letter_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). +invalid_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"message-ttl">>, 5}]), + Info = rpc:call(Server, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QQ)]), + ?assertEqual('', proplists:get_value(policy, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>). + dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
