summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-01-14 15:19:28 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-01-14 15:50:35 +0100
commitf3b27c75a50bc1357f6eec7f8b1359083e9c51ad (patch)
tree9a899d2612fa52623257c9b050d4b0b87ed07e4e
parent27212242463cfc3327aafd3010421f769afa7be8 (diff)
downloadrabbitmq-server-git-f3b27c75a50bc1357f6eec7f8b1359083e9c51ad.tar.gz
Filter policies that cannot be applied to quorum queues
Some policies, such as highly available, do not apply to all types of queues. Even though quorum queues ignores some policies, they're still listed as an applied policy on this type of queue. This commit ignores filters these policies when applied, so they'll never be listed on the wrong type of queue. [#169811193]
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_policy.erl8
-rw-r--r--src/rabbit_quorum_queue.erl10
-rw-r--r--test/quorum_queue_SUITE.erl22
5 files changed, 56 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5ff2e09636..50f804bc14 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,6 +60,8 @@
-export([rebalance/3]).
-export([collect_info_all/2]).
+-export([is_policy_applicable/2]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -459,6 +461,17 @@ policy_changed(Q1, Q2) ->
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
+is_policy_applicable(QName, Policy) ->
+ case lookup(QName) of
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ rabbit_quorum_queue:is_policy_applicable(Q, Policy);
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ rabbit_amqqueue_process:is_policy_applicable(Q, Policy);
+ _ ->
+ %% Defaults to previous behaviour. Apply always
+ true
+ end.
+
-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7bf8308614..36eea86bb5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,6 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
-export([format/1]).
+-export([is_policy_applicable/2]).
%% Queue's state
-record(q, {
@@ -1783,6 +1784,10 @@ format(Q) when ?is_amqqueue(Q) ->
{node, node(amqqueue:get_pid(Q))}]
end.
+-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
+is_policy_applicable(_Q, _Policy) ->
+ true.
+
log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = Q }) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7b1438ee7a..91102fd91a 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -463,8 +463,9 @@ match(Name, Policies) ->
match_all(Name, Policies) ->
lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]).
-matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
+matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) ->
matches_type(Kind, pget('apply-to', Policy)) andalso
+ is_applicable(Resource, pget(definition, Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
VHost =:= pget(vhost, Policy).
@@ -476,6 +477,11 @@ matches_type(_, _) -> false.
sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
+is_applicable(#resource{kind = queue} = Resource, Policy) ->
+ rabbit_amqqueue:is_policy_applicable(Resource, Policy);
+is_applicable(_, _) ->
+ true.
+
%%----------------------------------------------------------------------------
operator_policy_validation() ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e43e61db28..b8df6a3028 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -45,6 +45,7 @@
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
+-export([is_policy_applicable/2]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -299,6 +300,15 @@ filter_quorum_critical(Queues, ReplicaStates) ->
length(AllUp) =< MinQuorum
end, Queues).
+-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
+is_policy_applicable(_Q, Policy) ->
+ Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>],
+ lists:all(fun({P, _}) ->
+ lists:member(P, Applicable)
+ end, Policy).
+
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
ets:delete(queue_metrics, QName),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 3a872ff607..456bd31fb8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -132,7 +132,8 @@ all_tests() ->
queue_length_in_memory_bytes_limit,
queue_length_in_memory_purge,
in_memory,
- consumer_metrics
+ consumer_metrics,
+ invalid_policy
].
memory_tests() ->
@@ -810,6 +811,25 @@ dead_letter_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
+invalid_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"message-ttl">>, 5}]),
+ Info = rpc:call(Server, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ ?assertEqual('', proplists:get_value(policy, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>).
+
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),