summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-10-08 16:26:28 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-10-09 14:33:08 +0100
commit60edb61e208df84350e36f3138b57833590cb432 (patch)
tree24c796a34034771ac084132af15b6a375d6b1407
parent424d3158fb0b176b1f3a018c0a7d779531616416 (diff)
downloadrabbitmq-server-git-60edb61e208df84350e36f3138b57833590cb432.tar.gz
Refactor: queue capabilities on queue types
Queue implementations can delegate the responsibility of validating policies, arguments and other capabilities to `rabbit_queue_type`, they only need to declare a 'capabilities' map that provides the list of supported capabilities.
-rw-r--r--src/rabbit_classic_queue.erl31
-rw-r--r--src/rabbit_queue_type.erl29
-rw-r--r--src/rabbit_queue_type_util.erl13
-rw-r--r--src/rabbit_quorum_queue.erl34
-rw-r--r--src/rabbit_stream_queue.erl23
5 files changed, 63 insertions, 67 deletions
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 5ab32b8632..275a481af8 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -38,7 +38,7 @@
dequeue/4,
info/2,
state_info/1,
- is_policy_applicable/2
+ capabilities/1
]).
-export([delete_crashed/1,
@@ -438,19 +438,22 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
- <<"dead-letter-routing-key">>, <<"max-length">>,
- <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
- <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
- <<"single-active-consumer">>, <<"delivery-limit">>,
- <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
- <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
- <<"queue-master-locator">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities(_Q) ->
+ #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>, <<"max-length">>,
+ <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
+ <<"single-active-consumer">>, <<"delivery-limit">>,
+ <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
+ <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
+ <<"queue-master-locator">>],
+ queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
+ <<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
+ <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
+ <<"x-queue-type">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>]}.
reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 40028c8241..8eb16d1e27 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -188,8 +188,8 @@
-callback stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
--callback is_policy_applicable(amqqueue:amqqueue(), any()) ->
- boolean().
+-callback capabilities(amqqueue:amqqueue()) ->
+ #{atom() := term()}.
%% TODO: this should be controlled by a registry that is populated on boot
discover(<<"quorum">>) ->
@@ -212,6 +212,9 @@ is_enabled(Type) ->
rabbit_types:channel_exit().
declare(Q, Node) ->
Mod = amqqueue:get_type(Q),
+ Capabilities = Mod:capabilities(Q),
+ ValidQueueArgs = maps:get(queue_arguments, Capabilities, []),
+ check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs),
Mod:declare(Q, Node).
-spec delete(amqqueue:amqqueue(), boolean(),
@@ -291,8 +294,12 @@ i_down(K, _Q, _DownReason) ->
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
- Mod:is_policy_applicable(Q, Policy).
-
+ Capabilities = Mod:capabilities(Q),
+ Applicable = maps:get(policies, Capabilities, []),
+ lists:all(fun({P, _}) ->
+ lists:member(P, Applicable)
+ end, Policy).
+
-spec init() -> state().
init() ->
#?STATE{}.
@@ -316,6 +323,9 @@ new(Q, State) when ?is_amqqueue(Q) ->
consume(Q, Spec, State) ->
#ctx{state = State0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
+ Capabilities = Mod:capabilities(Q),
+ ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []),
+ check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs),
case Mod:consume(Q, Spec, State0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
@@ -556,3 +566,14 @@ return_ok(State0, Actions0) ->
{S, [Act | A0]}
end, {State0, []}, Actions0),
{ok, State, lists:reverse(Actions)}.
+
+
+check_invalid_arguments(QueueName, Args, Keys) ->
+ [case lists:member(Arg, Keys) of
+ true -> ok;
+ false -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg '~s' for ~s",
+ [Arg, rabbit_misc:rs(QueueName)])
+ end || {Arg, _, _} <- Args],
+ ok.
diff --git a/src/rabbit_queue_type_util.erl b/src/rabbit_queue_type_util.erl
index 0ff7fb312d..68b21749c7 100644
--- a/src/rabbit_queue_type_util.erl
+++ b/src/rabbit_queue_type_util.erl
@@ -16,8 +16,7 @@
-module(rabbit_queue_type_util).
--export([check_invalid_arguments/3,
- args_policy_lookup/3,
+-export([args_policy_lookup/3,
qname_to_internal_name/1,
check_auto_delete/1,
check_exclusive/1,
@@ -26,16 +25,6 @@
-include("rabbit.hrl").
-include("amqqueue.hrl").
-check_invalid_arguments(QueueName, Args, Keys) ->
- [case rabbit_misc:table_lookup(Args, Key) of
- undefined -> ok;
- _TypeVal -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s",
- [Key, rabbit_misc:rs(QueueName)])
- end || Key <- Keys],
- ok.
-
args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) ->
Args = amqqueue:get_arguments(Q),
AName = <<"x-", Name/binary>>,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 1f333bdae1..d4c9dd1d9c 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -43,7 +43,7 @@
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
--export([is_policy_applicable/2]).
+-export([capabilities/1]).
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
@@ -134,7 +134,6 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) ->
Arguments = amqqueue:get_arguments(Q),
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
- check_invalid_arguments(QName, Arguments),
rabbit_queue_type_util:check_auto_delete(Q),
rabbit_queue_type_util:check_exclusive(Q),
rabbit_queue_type_util:check_non_durable(Q),
@@ -331,20 +330,17 @@ filter_quorum_critical(Queues, ReplicaStates) ->
length(AllUp) =< MinQuorum
end, Queues).
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>,
- <<"max-length-bytes">>,
- <<"overflow">>,
- <<"expires">>,
- <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>,
- <<"delivery-limit">>,
- <<"dead-letter-exchange">>,
- <<"dead-letter-routing-key">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities(_Q) ->
+ #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
+ <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
+ queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
+ <<"x-max-in-memory-bytes">>, <<"x-overflow">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
+ consumer_arguments => [<<"x-priority">>]}.
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
@@ -1416,12 +1412,6 @@ quorum_ctag(Other) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-message-ttl">>,
- <<"x-max-priority">>,
- <<"x-queue-mode">>],
- rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys).
-
queue_name(RaFifoState) ->
rabbit_fifo_client:cluster_name(RaFifoState).
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 612bf81d00..e251fe0117 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -37,7 +37,7 @@
update/2,
state_info/1,
stat/1,
- is_policy_applicable/2]).
+ capabilities/1]).
-export([set_retention_policy/3]).
-export([add_replica/3,
@@ -80,7 +80,6 @@ is_enabled() ->
declare(Q0, Node) when ?amqqueue_is_stream(Q0) ->
Arguments = amqqueue:get_arguments(Q0),
QName = amqqueue:get_name(Q0),
- check_invalid_arguments(QName, Arguments),
rabbit_queue_type_util:check_auto_delete(Q0),
rabbit_queue_type_util:check_exclusive(Q0),
rabbit_queue_type_util:check_non_durable(Q0),
@@ -543,13 +542,6 @@ max_age(Age) ->
max_age(Age1, Age2) ->
min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)).
-check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>,
- <<"x-max-in-memory-length">>, <<"x-max-in-memory-bytes">>,
- <<"x-quorum-initial-group-size">>, <<"x-cancel-on-ha-failover">>],
- rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys).
-
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -657,9 +649,10 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
<<"x-routing-key">> => {utf8, RKey}}, R0),
rabbit_msg_record:to_iodata(R).
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities(_Q) ->
+ #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
+ queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
+ <<"x-max-length">>, <<"x-max-length-bytes">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-max-age">>, <<"x-max-segment-size">>],
+ consumer_arguments => [<<"x-stream-offset">>]}.