summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_policy.erl8
-rw-r--r--src/rabbit_quorum_queue.erl10
-rw-r--r--test/quorum_queue_SUITE.erl22
5 files changed, 56 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5ff2e09636..50f804bc14 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,6 +60,8 @@
-export([rebalance/3]).
-export([collect_info_all/2]).
+-export([is_policy_applicable/2]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -459,6 +461,17 @@ policy_changed(Q1, Q2) ->
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
+is_policy_applicable(QName, Policy) ->
+ case lookup(QName) of
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ rabbit_quorum_queue:is_policy_applicable(Q, Policy);
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ rabbit_amqqueue_process:is_policy_applicable(Q, Policy);
+ _ ->
+ %% Defaults to previous behaviour. Apply always
+ true
+ end.
+
-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7bf8308614..36eea86bb5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,6 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
-export([format/1]).
+-export([is_policy_applicable/2]).
%% Queue's state
-record(q, {
@@ -1783,6 +1784,10 @@ format(Q) when ?is_amqqueue(Q) ->
{node, node(amqqueue:get_pid(Q))}]
end.
+-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
+is_policy_applicable(_Q, _Policy) ->
+ true.
+
log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = Q }) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7b1438ee7a..91102fd91a 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -463,8 +463,9 @@ match(Name, Policies) ->
match_all(Name, Policies) ->
lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]).
-matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
+matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) ->
matches_type(Kind, pget('apply-to', Policy)) andalso
+ is_applicable(Resource, pget(definition, Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
VHost =:= pget(vhost, Policy).
@@ -476,6 +477,11 @@ matches_type(_, _) -> false.
sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
+is_applicable(#resource{kind = queue} = Resource, Policy) ->
+ rabbit_amqqueue:is_policy_applicable(Resource, Policy);
+is_applicable(_, _) ->
+ true.
+
%%----------------------------------------------------------------------------
operator_policy_validation() ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e43e61db28..b8df6a3028 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -45,6 +45,7 @@
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
+-export([is_policy_applicable/2]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -299,6 +300,15 @@ filter_quorum_critical(Queues, ReplicaStates) ->
length(AllUp) =< MinQuorum
end, Queues).
+-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
+is_policy_applicable(_Q, Policy) ->
+ Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>],
+ lists:all(fun({P, _}) ->
+ lists:member(P, Applicable)
+ end, Policy).
+
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
ets:delete(queue_metrics, QName),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 3a872ff607..456bd31fb8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -132,7 +132,8 @@ all_tests() ->
queue_length_in_memory_bytes_limit,
queue_length_in_memory_purge,
in_memory,
- consumer_metrics
+ consumer_metrics,
+ invalid_policy
].
memory_tests() ->
@@ -810,6 +811,25 @@ dead_letter_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
+invalid_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"message-ttl">>, 5}]),
+ Info = rpc:call(Server, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ ?assertEqual('', proplists:get_value(policy, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>).
+
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),