summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl18
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl4
-rw-r--r--deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs3
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl11
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl3
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl13
-rw-r--r--deps/rabbitmq_shovel/test/dynamic_SUITE.erl26
-rw-r--r--deps/rabbitmq_shovel_management/priv/www/js/shovel.js4
8 files changed, 71 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index f7315f12eb..d6d3b552a5 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -55,7 +55,7 @@
-export([rebalance/3]).
-export([collect_info_all/2]).
--export([is_policy_applicable/2, declare_args/0]).
+-export([is_policy_applicable/2, declare_args/0, consume_args/0]).
-export([is_server_named_allowed/1]).
-export([check_max_age/1]).
@@ -787,7 +787,8 @@ declare_args() ->
{<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
- {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
+ {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2},
+ {<<"x-stream-offset">>, fun check_stream_offset_arg/2}].
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -969,6 +970,19 @@ check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) ->
check_queue_leader_locator_arg(_Val, _Args) ->
{error, invalid_queue_locator_arg}.
+-define(KNOWN_OFFSETS, [<<"first">>, <<"last">>, <<"next">>]).
+check_stream_offset_arg({longstr, Val}, _Args) ->
+ case lists:member(Val, ?KNOWN_OFFSETS) of
+ true -> ok;
+ false -> {error, invalid_stream_offset_arg}
+ end;
+check_stream_offset_arg({timestamp, Val}, _Args) when is_integer(Val) ->
+ ok;
+check_stream_offset_arg({_, Val}, _Args) when is_integer(Val) ->
+ ok;
+check_stream_offset_arg(_Val, _Args) ->
+ {error, invalid_stream_offset_arg}.
+
-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 05977c3a58..c0697c0ee5 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -155,8 +155,8 @@ policy_changed(Q) ->
_ = rabbit_stream_coordinator:policy_changed(Q),
ok.
-stat(_) ->
- {ok, 0, 0}.
+stat(Q) ->
+ {ok, i(messages, Q), 0}.
consume(Q, #{prefetch_count := 0}, _)
when ?amqqueue_is_stream(Q) ->
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
index e3ac572fde..773cea5b9e 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
@@ -369,6 +369,9 @@
<input type="hidden" name="dest-uri" value="amqp:///<%= esc(queue.vhost) %>"/>
<input type="hidden" name="dest-add-forward-headers" value="false"/>
<input type="hidden" name="ack-mode" value="on-confirm"/>
+ <% if (is_stream(queue)) { %>
+ <input type="hidden" name="src-consumer-args-stream-offset" value="first"/>
+ <% } %>
<input type="hidden" name="redirect" value="#/queues"/>
<table class="form">
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
index 29091c553d..1f50abd05c 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -41,12 +41,15 @@ parse(_Name, {source, Source}) ->
?DEFAULT_PREFETCH)),
Queue = parse_parameter(queue, fun parse_binary/1,
proplists:get_value(queue, Source)),
+ %% TODO parse
+ CArgs = proplists:get_value(consumer_args, Source, []),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
resource_decl => decl_fun(Source),
queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
- prefetch_count => Prefetch};
+ prefetch_count => Prefetch,
+ consumer_args => CArgs};
parse(Name, {destination, Dest}) ->
PubProp = proplists:get_value(publish_properties, Dest, []),
PropsFun = try_make_parse_publish(publish_properties, PubProp),
@@ -73,7 +76,8 @@ init_source(Conf = #{ack_mode := AckMode,
source := #{queue := Queue,
current := {Conn, Chan, _},
prefetch_count := Prefetch,
- resource_decl := Decl} = Src}) ->
+ resource_decl := Decl,
+ consumer_args := Args} = Src}) ->
Decl(Conn, Chan),
NoAck = AckMode =:= no_ack,
@@ -92,7 +96,8 @@ init_source(Conf = #{ack_mode := AckMode,
end,
#'basic.consume_ok'{} =
amqp_channel:subscribe(Chan, #'basic.consume'{queue = Queue,
- no_ack = NoAck}, self()),
+ no_ack = NoAck,
+ arguments = Args}, self()),
Conf#{source => Src#{remaining => Remaining,
remaining_unacked => Remaining}}.
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
index 44e0ee4fe2..73b47ca809 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
@@ -66,7 +66,8 @@ parse(_Name, {source, Conf}) ->
uris => Uris,
prefetch_count => pget(prefetch_count, Conf, 1000),
delete_after => pget(delete_after, Conf, never),
- source_address => pget(source_address, Conf)}.
+ source_address => pget(source_address, Conf),
+ consumer_args => pget(consumer_args, Conf, [])}.
-spec connect_source(state()) -> state().
connect_source(State = #{name := Name,
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
index e4ea1add43..5471dc8478 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
@@ -121,6 +121,7 @@ amqp091_src_validation(_Def, User) ->
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue-args">>, fun validate_queue_args/2, optional},
+ {<<"src-consumer-args">>, fun validate_consumer_args/2, optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
%% a deprecated pre-3.7 setting
@@ -213,6 +214,11 @@ validate_queue_args(Name, Term0) ->
rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term).
+validate_consumer_args(Name, Term0) ->
+ Term = rabbit_data_coercion:to_proplist(Term0),
+
+ rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:consume_args(), Term).
+
validate_amqp10_map(Name, Terms0) ->
Terms = rabbit_data_coercion:to_proplist(Terms0),
Str = fun rabbit_parameter_validation:binary/2,
@@ -376,7 +382,8 @@ parse_amqp10_source(Def) ->
uris => Uris,
source_address => Address,
delete_after => opt_b2a(DeleteAfter),
- prefetch_count => PrefetchCount}, Headers}.
+ prefetch_count => PrefetchCount,
+ consumer_args => []}, Headers}.
parse_amqp091_source(Def) ->
SrcURIs = get_uris(<<"src-uri">>, Def),
@@ -384,6 +391,7 @@ parse_amqp091_source(Def) ->
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
+ SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {fun (_Conn, Ch) ->
@@ -411,7 +419,8 @@ parse_amqp091_source(Def) ->
resource_decl => SrcDeclFun,
queue => Queue,
delete_after => opt_b2a(DeleteAfter),
- prefetch_count => PrefetchCount
+ prefetch_count => PrefetchCount,
+ consumer_args => SrcCArgs
}, Details), DestHeaders}.
get_uris(Key, Def) ->
diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
index aa5bf49fcb..fb09554785 100644
--- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
@@ -15,7 +15,8 @@
all() ->
[
{group, core_tests},
- {group, quorum_queue_tests}
+ {group, quorum_queue_tests},
+ {group, stream_queue_tests}
].
groups() ->
@@ -38,6 +39,10 @@ groups() ->
{quorum_queue_tests, [], [
quorum_queues
+ ]},
+
+ {stream_queue_tests, [], [
+ stream_queues
]}
].
@@ -64,6 +69,11 @@ init_per_group(quorum_queue_tests, Config) ->
false -> Config;
_ -> {skip, "quorum queue tests are skipped in mixed mode"}
end;
+init_per_group(stream_queue_tests, Config) ->
+ case os:getenv("SECONDARY_UMBRELLA") of
+ false -> Config;
+ _ -> {skip, "stream queue tests are skipped in mixed mode"}
+ end;
init_per_group(_, Config) ->
Config.
@@ -104,6 +114,20 @@ quorum_queues(Config) ->
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).
+stream_queues(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ shovel_test_utils:set_param(
+ Config,
+ <<"test">>, [
+ {<<"src-queue">>, <<"src">>},
+ {<<"dest-queue">>, <<"dest">>},
+ {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"stream">>}},
+ {<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"first">>}}
+ ]),
+ publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
+ end).
+
set_properties_using_map(Config) ->
with_ch(Config,
fun (Ch) ->
diff --git a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
index a4f85cb988..7aa174bc21 100644
--- a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
+++ b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
@@ -22,6 +22,10 @@ dispatcher_add(function(sammy) {
if (redirect !== undefined) {
delete this.params['redirect'];
}
+ var stream_offset = this.params['src-consumer-args-stream-offset'];
+ var src_consumer_args = {'x-stream-offset': stream_offset};
+ delete this.params['src-consumer-args-stream-offset'];
+ this.params['src-consumer-args'] = src_consumer_args;
put_parameter(this, [], num_keys, bool_keys, arrayable_keys);
if (redirect !== undefined) {
go_to(redirect);