diff options
| author | dcorbacho <dparracorbacho@piotal.io> | 2020-10-08 16:26:28 +0100 |
|---|---|---|
| committer | dcorbacho <dparracorbacho@piotal.io> | 2020-10-09 14:33:08 +0100 |
| commit | 60edb61e208df84350e36f3138b57833590cb432 (patch) | |
| tree | 24c796a34034771ac084132af15b6a375d6b1407 | |
| parent | 424d3158fb0b176b1f3a018c0a7d779531616416 (diff) | |
| download | rabbitmq-server-git-60edb61e208df84350e36f3138b57833590cb432.tar.gz | |
Refactor: queue capabilities on queue types
Queue implementations can delegate the responsibility of validating
policies, arguments and other capabilities to `rabbit_queue_type`,
they only need to declare a 'capabilities' map that provides the
list of supported capabilities.
| -rw-r--r-- | src/rabbit_classic_queue.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_queue_type_util.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_stream_queue.erl | 23 |
5 files changed, 63 insertions, 67 deletions
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 5ab32b8632..275a481af8 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -38,7 +38,7 @@ dequeue/4, info/2, state_info/1, - is_policy_applicable/2 + capabilities/1 ]). -export([delete_crashed/1, @@ -438,19 +438,22 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, - <<"dead-letter-routing-key">>, <<"max-length">>, - <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, - <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, - <<"single-active-consumer">>, <<"delivery-limit">>, - <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, - <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, - <<"queue-master-locator">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities(_Q) -> + #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, + <<"dead-letter-routing-key">>, <<"max-length">>, + <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, + <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, + <<"single-active-consumer">>, <<"delivery-limit">>, + <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, + <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, + <<"queue-master-locator">>], + queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>, + <<"x-dead-letter-routing-key">>, <<"x-max-length">>, + <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, + <<"x-max-in-memory-bytes">>, <<"x-max-priority">>, + <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>, + <<"x-queue-type">>], + consumer_arguments => [<<"x-cancel-on-ha-failover">>]}. reject_seq_no(SeqNo, U0) -> reject_seq_no(SeqNo, U0, []). diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 40028c8241..8eb16d1e27 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -188,8 +188,8 @@ -callback stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. --callback is_policy_applicable(amqqueue:amqqueue(), any()) -> - boolean(). +-callback capabilities(amqqueue:amqqueue()) -> + #{atom() := term()}. %% TODO: this should be controlled by a registry that is populated on boot discover(<<"quorum">>) -> @@ -212,6 +212,9 @@ is_enabled(Type) -> rabbit_types:channel_exit(). declare(Q, Node) -> Mod = amqqueue:get_type(Q), + Capabilities = Mod:capabilities(Q), + ValidQueueArgs = maps:get(queue_arguments, Capabilities, []), + check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs), Mod:declare(Q, Node). -spec delete(amqqueue:amqqueue(), boolean(), @@ -291,8 +294,12 @@ i_down(K, _Q, _DownReason) -> is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), - Mod:is_policy_applicable(Q, Policy). - + Capabilities = Mod:capabilities(Q), + Applicable = maps:get(policies, Capabilities, []), + lists:all(fun({P, _}) -> + lists:member(P, Applicable) + end, Policy). + -spec init() -> state(). init() -> #?STATE{}. @@ -316,6 +323,9 @@ new(Q, State) when ?is_amqqueue(Q) -> consume(Q, Spec, State) -> #ctx{state = State0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), + Capabilities = Mod:capabilities(Q), + ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []), + check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs), case Mod:consume(Q, Spec, State0) of {ok, CtxState, Actions} -> return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); @@ -556,3 +566,14 @@ return_ok(State0, Actions0) -> {S, [Act | A0]} end, {State0, []}, Actions0), {ok, State, lists:reverse(Actions)}. + + +check_invalid_arguments(QueueName, Args, Keys) -> + [case lists:member(Arg, Keys) of + true -> ok; + false -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s", + [Arg, rabbit_misc:rs(QueueName)]) + end || {Arg, _, _} <- Args], + ok. diff --git a/src/rabbit_queue_type_util.erl b/src/rabbit_queue_type_util.erl index 0ff7fb312d..68b21749c7 100644 --- a/src/rabbit_queue_type_util.erl +++ b/src/rabbit_queue_type_util.erl @@ -16,8 +16,7 @@ -module(rabbit_queue_type_util). --export([check_invalid_arguments/3, - args_policy_lookup/3, +-export([args_policy_lookup/3, qname_to_internal_name/1, check_auto_delete/1, check_exclusive/1, @@ -26,16 +25,6 @@ -include("rabbit.hrl"). -include("amqqueue.hrl"). -check_invalid_arguments(QueueName, Args, Keys) -> - [case rabbit_misc:table_lookup(Args, Key) of - undefined -> ok; - _TypeVal -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s", - [Key, rabbit_misc:rs(QueueName)]) - end || Key <- Keys], - ok. - args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) -> Args = amqqueue:get_arguments(Q), AName = <<"x-", Name/binary>>, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 1f333bdae1..d4c9dd1d9c 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -43,7 +43,7 @@ -export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). --export([is_policy_applicable/2]). +-export([capabilities/1]). -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). @@ -134,7 +134,6 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) -> Arguments = amqqueue:get_arguments(Q), Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), - check_invalid_arguments(QName, Arguments), rabbit_queue_type_util:check_auto_delete(Q), rabbit_queue_type_util:check_exclusive(Q), rabbit_queue_type_util:check_non_durable(Q), @@ -331,20 +330,17 @@ filter_quorum_critical(Queues, ReplicaStates) -> length(AllUp) =< MinQuorum end, Queues). --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length">>, - <<"max-length-bytes">>, - <<"overflow">>, - <<"expires">>, - <<"max-in-memory-length">>, - <<"max-in-memory-bytes">>, - <<"delivery-limit">>, - <<"dead-letter-exchange">>, - <<"dead-letter-routing-key">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities(_Q) -> + #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>, + <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, + <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], + queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>, + <<"x-dead-letter-routing-key">>, <<"x-max-length">>, + <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, + <<"x-max-in-memory-bytes">>, <<"x-overflow">>, + <<"x-single-active-consumer">>, <<"x-queue-type">>, + <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>], + consumer_arguments => [<<"x-priority">>]}. rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), @@ -1416,12 +1412,6 @@ quorum_ctag(Other) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-message-ttl">>, - <<"x-max-priority">>, - <<"x-queue-mode">>], - rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys). - queue_name(RaFifoState) -> rabbit_fifo_client:cluster_name(RaFifoState). diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 612bf81d00..e251fe0117 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -37,7 +37,7 @@ update/2, state_info/1, stat/1, - is_policy_applicable/2]). + capabilities/1]). -export([set_retention_policy/3]). -export([add_replica/3, @@ -80,7 +80,6 @@ is_enabled() -> declare(Q0, Node) when ?amqqueue_is_stream(Q0) -> Arguments = amqqueue:get_arguments(Q0), QName = amqqueue:get_name(Q0), - check_invalid_arguments(QName, Arguments), rabbit_queue_type_util:check_auto_delete(Q0), rabbit_queue_type_util:check_exclusive(Q0), rabbit_queue_type_util:check_non_durable(Q0), @@ -543,13 +542,6 @@ max_age(Age) -> max_age(Age1, Age2) -> min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)). -check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-expires">>, <<"x-message-ttl">>, - <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>, - <<"x-max-in-memory-length">>, <<"x-max-in-memory-bytes">>, - <<"x-quorum-initial-group-size">>, <<"x-cancel-on-ha-failover">>], - rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys). - queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -657,9 +649,10 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, <<"x-routing-key">> => {utf8, RKey}}, R0), rabbit_msg_record:to_iodata(R). --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities(_Q) -> + #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], + queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, + <<"x-max-length">>, <<"x-max-length-bytes">>, + <<"x-single-active-consumer">>, <<"x-queue-type">>, + <<"x-max-age">>, <<"x-max-segment-size">>], + consumer_arguments => [<<"x-stream-offset">>]}. |
