diff options
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 4 | ||||
-rw-r--r-- | deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs | 3 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl | 11 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl | 3 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl | 13 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 26 | ||||
-rw-r--r-- | deps/rabbitmq_shovel_management/priv/www/js/shovel.js | 4 |
8 files changed, 71 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index f7315f12eb..d6d3b552a5 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -55,7 +55,7 @@ -export([rebalance/3]). -export([collect_info_all/2]). --export([is_policy_applicable/2, declare_args/0]). +-export([is_policy_applicable/2, declare_args/0, consume_args/0]). -export([is_server_named_allowed/1]). -export([check_max_age/1]). @@ -787,7 +787,8 @@ declare_args() -> {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, - {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. + {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}, + {<<"x-stream-offset">>, fun check_stream_offset_arg/2}]. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -969,6 +970,19 @@ check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) -> check_queue_leader_locator_arg(_Val, _Args) -> {error, invalid_queue_locator_arg}. +-define(KNOWN_OFFSETS, [<<"first">>, <<"last">>, <<"next">>]). +check_stream_offset_arg({longstr, Val}, _Args) -> + case lists:member(Val, ?KNOWN_OFFSETS) of + true -> ok; + false -> {error, invalid_stream_offset_arg} + end; +check_stream_offset_arg({timestamp, Val}, _Args) when is_integer(Val) -> + ok; +check_stream_offset_arg({_, Val}, _Args) when is_integer(Val) -> + ok; +check_stream_offset_arg(_Val, _Args) -> + {error, invalid_stream_offset_arg}. + -define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]). check_queue_mode({longstr, Val}, _Args) -> case lists:member(Val, ?KNOWN_QUEUE_MODES) of diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 05977c3a58..c0697c0ee5 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -155,8 +155,8 @@ policy_changed(Q) -> _ = rabbit_stream_coordinator:policy_changed(Q), ok. -stat(_) -> - {ok, 0, 0}. +stat(Q) -> + {ok, i(messages, Q), 0}. consume(Q, #{prefetch_count := 0}, _) when ?amqqueue_is_stream(Q) -> diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index e3ac572fde..773cea5b9e 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -369,6 +369,9 @@ <input type="hidden" name="dest-uri" value="amqp:///<%= esc(queue.vhost) %>"/> <input type="hidden" name="dest-add-forward-headers" value="false"/> <input type="hidden" name="ack-mode" value="on-confirm"/> + <% if (is_stream(queue)) { %> + <input type="hidden" name="src-consumer-args-stream-offset" value="first"/> + <% } %> <input type="hidden" name="redirect" value="#/queues"/> <table class="form"> diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 29091c553d..1f50abd05c 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -41,12 +41,15 @@ parse(_Name, {source, Source}) -> ?DEFAULT_PREFETCH)), Queue = parse_parameter(queue, fun parse_binary/1, proplists:get_value(queue, Source)), + %% TODO parse + CArgs = proplists:get_value(consumer_args, Source, []), #{module => ?MODULE, uris => proplists:get_value(uris, Source), resource_decl => decl_fun(Source), queue => Queue, delete_after => proplists:get_value(delete_after, Source, never), - prefetch_count => Prefetch}; + prefetch_count => Prefetch, + consumer_args => CArgs}; parse(Name, {destination, Dest}) -> PubProp = proplists:get_value(publish_properties, Dest, []), PropsFun = try_make_parse_publish(publish_properties, PubProp), @@ -73,7 +76,8 @@ init_source(Conf = #{ack_mode := AckMode, source := #{queue := Queue, current := {Conn, Chan, _}, prefetch_count := Prefetch, - resource_decl := Decl} = Src}) -> + resource_decl := Decl, + consumer_args := Args} = Src}) -> Decl(Conn, Chan), NoAck = AckMode =:= no_ack, @@ -92,7 +96,8 @@ init_source(Conf = #{ack_mode := AckMode, end, #'basic.consume_ok'{} = amqp_channel:subscribe(Chan, #'basic.consume'{queue = Queue, - no_ack = NoAck}, self()), + no_ack = NoAck, + arguments = Args}, self()), Conf#{source => Src#{remaining => Remaining, remaining_unacked => Remaining}}. diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 44e0ee4fe2..73b47ca809 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -66,7 +66,8 @@ parse(_Name, {source, Conf}) -> uris => Uris, prefetch_count => pget(prefetch_count, Conf, 1000), delete_after => pget(delete_after, Conf, never), - source_address => pget(source_address, Conf)}. + source_address => pget(source_address, Conf), + consumer_args => pget(consumer_args, Conf, [])}. -spec connect_source(state()) -> state(). connect_source(State = #{name := Name, diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index e4ea1add43..5471dc8478 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -121,6 +121,7 @@ amqp091_src_validation(_Def, User) -> {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, {<<"src-queue-args">>, fun validate_queue_args/2, optional}, + {<<"src-consumer-args">>, fun validate_consumer_args/2, optional}, {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, %% a deprecated pre-3.7 setting @@ -213,6 +214,11 @@ validate_queue_args(Name, Term0) -> rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term). +validate_consumer_args(Name, Term0) -> + Term = rabbit_data_coercion:to_proplist(Term0), + + rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:consume_args(), Term). + validate_amqp10_map(Name, Terms0) -> Terms = rabbit_data_coercion:to_proplist(Terms0), Str = fun rabbit_parameter_validation:binary/2, @@ -376,7 +382,8 @@ parse_amqp10_source(Def) -> uris => Uris, source_address => Address, delete_after => opt_b2a(DeleteAfter), - prefetch_count => PrefetchCount}, Headers}. + prefetch_count => PrefetchCount, + consumer_args => []}, Headers}. parse_amqp091_source(Def) -> SrcURIs = get_uris(<<"src-uri">>, Def), @@ -384,6 +391,7 @@ parse_amqp091_source(Def) -> SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] SrcQ = pget(<<"src-queue">>, Def, none), SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), + SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), {SrcDeclFun, Queue, DestHeaders} = case SrcQ of none -> {fun (_Conn, Ch) -> @@ -411,7 +419,8 @@ parse_amqp091_source(Def) -> resource_decl => SrcDeclFun, queue => Queue, delete_after => opt_b2a(DeleteAfter), - prefetch_count => PrefetchCount + prefetch_count => PrefetchCount, + consumer_args => SrcCArgs }, Details), DestHeaders}. get_uris(Key, Def) -> diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index aa5bf49fcb..fb09554785 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -15,7 +15,8 @@ all() -> [ {group, core_tests}, - {group, quorum_queue_tests} + {group, quorum_queue_tests}, + {group, stream_queue_tests} ]. groups() -> @@ -38,6 +39,10 @@ groups() -> {quorum_queue_tests, [], [ quorum_queues + ]}, + + {stream_queue_tests, [], [ + stream_queues ]} ]. @@ -64,6 +69,11 @@ init_per_group(quorum_queue_tests, Config) -> false -> Config; _ -> {skip, "quorum queue tests are skipped in mixed mode"} end; +init_per_group(stream_queue_tests, Config) -> + case os:getenv("SECONDARY_UMBRELLA") of + false -> Config; + _ -> {skip, "stream queue tests are skipped in mixed mode"} + end; init_per_group(_, Config) -> Config. @@ -104,6 +114,20 @@ quorum_queues(Config) -> publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) end). +stream_queues(Config) -> + with_ch(Config, + fun (Ch) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [ + {<<"src-queue">>, <<"src">>}, + {<<"dest-queue">>, <<"dest">>}, + {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"stream">>}}, + {<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"first">>}} + ]), + publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + end). + set_properties_using_map(Config) -> with_ch(Config, fun (Ch) -> diff --git a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js index a4f85cb988..7aa174bc21 100644 --- a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js +++ b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js @@ -22,6 +22,10 @@ dispatcher_add(function(sammy) { if (redirect !== undefined) { delete this.params['redirect']; } + var stream_offset = this.params['src-consumer-args-stream-offset']; + var src_consumer_args = {'x-stream-offset': stream_offset}; + delete this.params['src-consumer-args-stream-offset']; + this.params['src-consumer-args'] = src_consumer_args; put_parameter(this, [], num_keys, bool_keys, arrayable_keys); if (redirect !== undefined) { go_to(redirect); |