summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-30 10:43:29 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-10-11 16:50:03 +0200
commit6b9589bae42b0bec11d75ae906810260da14ca76 (patch)
treea8e182aa89da61399113c7f5a4f6396b04d4b3ad
parentecbd9698341406543efd195aff481c6704f6022b (diff)
downloadrabbitmq-server-git-super-stream-cli.tar.gz
Handle stream arguments in add_super_stream commandsuper-stream-cli
max-age, leader-locator, etc.
-rw-r--r--deps/rabbit_common/src/rabbit_date_time.erl48
-rw-r--r--deps/rabbit_common/test/unit_SUITE.erl23
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl156
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl2
-rw-r--r--deps/rabbitmq_stream/test/commands_SUITE.erl75
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl2
6 files changed, 295 insertions, 11 deletions
diff --git a/deps/rabbit_common/src/rabbit_date_time.erl b/deps/rabbit_common/src/rabbit_date_time.erl
new file mode 100644
index 0000000000..e4a56ad783
--- /dev/null
+++ b/deps/rabbit_common/src/rabbit_date_time.erl
@@ -0,0 +1,48 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_date_time).
+
+-export([parse_duration/1]).
+
+-type datetime_plist() :: list({atom(), integer()}).
+
+% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl
+-spec gi(string()) -> integer().
+gi(DS) ->
+ {Int, _Rest} = string:to_integer(DS),
+ case Int of
+ error ->
+ 0;
+ _ ->
+ Int
+ end.
+
+-spec parse_duration(string()) -> datetime_plist().
+parse_duration(Bin)
+ when is_binary(Bin) -> %TODO extended format
+ parse_duration(binary_to_list(Bin));
+parse_duration(Str) ->
+ case re:run(Str,
+ "^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0"
+ "-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?("
+ "?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]"
+ "+)?)S)?)?$",
+ [{capture, [sign, years, months, days, hours, minutes, seconds],
+ list}])
+ of
+ {match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} ->
+ {ok, [{sign, Sign},
+ {years, gi(Years)},
+ {months, gi(Months)},
+ {days, gi(Days)},
+ {hours, gi(Hours)},
+ {minutes, gi(Minutes)},
+ {seconds, gi(Seconds)}]};
+ nomatch ->
+ error
+ end.
diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl
index 105488bed0..f00df8787b 100644
--- a/deps/rabbit_common/test/unit_SUITE.erl
+++ b/deps/rabbit_common/test/unit_SUITE.erl
@@ -44,7 +44,8 @@ groups() ->
frame_encoding_does_not_fail_with_empty_binary_payload,
amqp_table_conversion,
name_type,
- get_erl_path
+ get_erl_path,
+ date_time_parse_duration
]},
{parse_mem_limit, [parallel], [
parse_mem_limit_relative_exactly_max,
@@ -460,3 +461,23 @@ get_erl_path(_) ->
?assertNotMatch(nomatch, string:find(Exe, "erl"))
end,
ok.
+
+date_time_parse_duration(_) ->
+ ?assertEqual(
+ {ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]},
+ rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]},
+ rabbit_date_time:parse_duration("PT6M")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]},
+ rabbit_date_time:parse_duration("PT10M30S")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]},
+ rabbit_date_time:parse_duration("P5DT8H")
+ ),
+ ?assertEqual(error, rabbit_date_time:parse_duration("foo")),
+ ok. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
index 7843ed4895..4f2093a020 100644
--- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
@@ -43,7 +43,13 @@ description() ->
<<"Add a super stream (experimental feature)">>.
switches() ->
- [{partitions, integer}, {routing_keys, string}].
+ [{partitions, integer},
+ {routing_keys, string},
+ {max_length_bytes, string},
+ {max_age, string},
+ {stream_max_segment_size_bytes, string},
+ {leader_locator, string},
+ {initial_cluster_size, integer}].
help_section() ->
{plugin, stream}.
@@ -55,11 +61,73 @@ validate([_Name], #{partitions := _, routing_keys := _}) ->
"Specify --partitions or routing-keys, not both."};
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
{validation_failure, "The partition number must be greater than 0"};
-validate([_Name], _Opts) ->
- ok;
+validate([_Name], Opts) ->
+ validate_stream_arguments(Opts);
validate(_, _Opts) ->
{validation_failure, too_many_args}.
+validate_stream_arguments(#{max_length_bytes := Value} = Opts) ->
+ case parse_information_unit(Value) of
+ error ->
+ {validation_failure,
+ "Invalid value for --max-length-bytes, valid example "
+ "values: 100gb, 50mb"};
+ _ ->
+ validate_stream_arguments(maps:remove(max_length_bytes, Opts))
+ end;
+validate_stream_arguments(#{max_age := Value} = Opts) ->
+ case rabbit_date_time:parse_duration(Value) of
+ {ok, _} ->
+ validate_stream_arguments(maps:remove(max_age, Opts));
+ error ->
+ {validation_failure,
+ "Invalid value for --max-age, the value must a "
+ "ISO 8601 duration, e.g. e.g. PT10M30S for 10 "
+ "minutes 30 seconds, P5DT8H for 5 days 8 hours."}
+ end;
+validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
+ Opts) ->
+ case parse_information_unit(Value) of
+ error ->
+ {validation_failure,
+ "Invalid value for --stream-max-segment-size-bytes, "
+ "valid example values: 100gb, 50mb"};
+ _ ->
+ validate_stream_arguments(maps:remove(stream_max_segment_size_bytes,
+ Opts))
+ end;
+validate_stream_arguments(#{leader_locator := <<"client-local">>} =
+ Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
+ Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := _}) ->
+ {validation_failure,
+ "Invalid value for --leader-locator, valid values "
+ "are client-local, random, least-leaders."};
+validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
+ try
+ case rabbit_data_coercion:to_integer(Value) of
+ S when S > 0 ->
+ validate_stream_arguments(maps:remove(initial_cluster_size,
+ Opts));
+ _ ->
+ {validation_failure,
+ "Invalid value for --initial-cluster-size, the "
+ "value must be positive."}
+ end
+ catch
+ error:_ ->
+ {validation_failure,
+ "Invalid value for --initial-cluster-size, the "
+ "value must be a positive integer."}
+ end;
+validate_stream_arguments(_) ->
+ ok.
+
merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
merge_defaults(_Args, Opts) ->
@@ -77,7 +145,25 @@ usage_additional() ->
"exclusive with --routing-keys."],
["--routing-keys <routing-keys>",
"Comma-separated list of routing keys. Mutually "
- "exclusive with --partitions."]].
+ "exclusive with --partitions."],
+ ["--max-length-bytes <max-length-bytes>",
+ "The maximum size of partition streams, example "
+ "values: 20gb, 500mb."],
+ ["--max-age <max-age>",
+ "The maximum age of partition stream segments, "
+ "using the ISO 8601 duration format, e.g. PT10M30S "
+ "for 10 minutes 30 seconds, P5DT8H for 5 days "
+ "8 hours."],
+ ["--stream-max-segment-size-bytes <stream-max-segment-si"
+ "ze-bytes>",
+ "The maximum size of partition stream segments, "
+ "example values: 500mb, 1gb."],
+ ["--leader-locator <leader-locator>",
+ "Leader locator strategy for partition streams, "
+ "possible values are client-local, least-leaders, "
+ "random."],
+ ["--initial-cluster-size <initial-cluster-size>",
+ "The initial cluster size of partition streams."]].
usage_doc_guides() ->
[?STREAM_GUIDE_URL].
@@ -86,7 +172,8 @@ run([SuperStream],
#{node := NodeName,
vhost := VHost,
timeout := Timeout,
- partitions := Partitions}) ->
+ partitions := Partitions} =
+ Opts) ->
Streams =
[list_to_binary(binary_to_list(SuperStream)
++ "-"
@@ -99,12 +186,14 @@ run([SuperStream],
VHost,
SuperStream,
Streams,
+ stream_arguments(Opts),
RoutingKeys);
run([SuperStream],
#{node := NodeName,
vhost := VHost,
timeout := Timeout,
- routing_keys := RoutingKeysStr}) ->
+ routing_keys := RoutingKeysStr} =
+ Opts) ->
RoutingKeys =
[rabbit_data_coercion:to_binary(
string:strip(K))
@@ -121,13 +210,56 @@ run([SuperStream],
VHost,
SuperStream,
Streams,
+ stream_arguments(Opts),
RoutingKeys).
+stream_arguments(Opts) ->
+ stream_arguments(#{}, Opts).
+
+stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
+ Acc;
+stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"max-length-bytes">>,
+ parse_information_unit(Value), Acc),
+ maps:remove(max_length_bytes, Arguments));
+stream_arguments(Acc, #{max_age := Value} = Arguments) ->
+ {ok, Duration} = rabbit_date_time:parse_duration(Value),
+ DurationInSeconds = duration_to_seconds(Duration),
+ stream_arguments(maps:put(<<"max-age">>,
+ list_to_binary(integer_to_list(DurationInSeconds)
+ ++ "s"),
+ Acc),
+ maps:remove(max_age, Arguments));
+stream_arguments(Acc,
+ #{stream_max_segment_size_bytes := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>,
+ parse_information_unit(Value), Acc),
+ maps:remove(stream_max_segment_size_bytes, Arguments));
+stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"initial-cluster-size">>,
+ rabbit_data_coercion:to_binary(Value), Acc),
+ maps:remove(initial_cluster_size, Arguments));
+stream_arguments(Acc, #{leader_locator := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc),
+ maps:remove(leader_locator, Arguments));
+stream_arguments(ArgumentsAcc, _Arguments) ->
+ ArgumentsAcc.
+
+duration_to_seconds([{sign, _},
+ {years, Y},
+ {months, M},
+ {days, D},
+ {hours, H},
+ {minutes, Mn},
+ {seconds, S}]) ->
+ Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.
+
create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
+ Arguments,
RoutingKeys) ->
case rabbit_misc:rpc_call(NodeName,
rabbit_stream_manager,
@@ -135,7 +267,7 @@ create_super_stream(NodeName,
[VHost,
SuperStream,
Streams,
- [],
+ Arguments,
RoutingKeys,
cli_acting_user()],
Timeout)
@@ -149,7 +281,7 @@ create_super_stream(NodeName,
end.
banner(_, _) ->
- <<"Adding a super stream ...">>.
+ <<"Adding a super stream (experimental feature)...">>.
output({error, Msg}, _Opts) ->
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
@@ -158,3 +290,11 @@ output({ok, Msg}, _Opts) ->
cli_acting_user() ->
'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
+
+parse_information_unit(Value) ->
+ case rabbit_resource_monitor_misc:parse_information_unit(Value) of
+ {ok, R} ->
+ integer_to_binary(R);
+ {error, _} ->
+ error
+ end.
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
index 90bf2db414..0a2f0f785e 100644
--- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
@@ -86,7 +86,7 @@ delete_super_stream(NodeName, Timeout, VHost, SuperStream) ->
end.
banner(_, _) ->
- <<"Deleting a super stream ...">>.
+ <<"Deleting a super stream (experimental feature)...">>.
output({error, Msg}, _Opts) ->
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl
index 42109a100b..902bfee3c8 100644
--- a/deps/rabbitmq_stream/test/commands_SUITE.erl
+++ b/deps/rabbitmq_stream/test/commands_SUITE.erl
@@ -363,6 +363,39 @@ add_super_stream_validate(_Config) ->
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
#{routing_keys =>
<<"a,b,c">>})),
+
+ [case Expected of
+ ok ->
+ ?assertEqual(ok,
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
+ error ->
+ ?assertMatch({validation_failure, _},
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
+ end
+ || {Opts, Expected}
+ <- [{#{max_length_bytes => 1000}, ok},
+ {#{max_length_bytes => <<"1000">>}, ok},
+ {#{max_length_bytes => <<"100gb">>}, ok},
+ {#{max_length_bytes => <<"50mb">>}, ok},
+ {#{max_length_bytes => <<"50bm">>}, error},
+ {#{max_age => <<"PT10M">>}, ok},
+ {#{max_age => <<"P5DT8H">>}, ok},
+ {#{max_age => <<"foo">>}, error},
+ {#{stream_max_segment_size_bytes => 1000}, ok},
+ {#{stream_max_segment_size_bytes => <<"1000">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"50bm">>}, error},
+ {#{leader_locator => <<"client-local">>}, ok},
+ {#{leader_locator => <<"least-leaders">>}, ok},
+ {#{leader_locator => <<"random">>}, ok},
+ {#{leader_locator => <<"foo">>}, error},
+ {#{initial_cluster_size => <<"1">>}, ok},
+ {#{initial_cluster_size => <<"2">>}, ok},
+ {#{initial_cluster_size => <<"3">>}, ok},
+ {#{initial_cluster_size => <<"0">>}, error},
+ {#{initial_cluster_size => <<"-1">>}, error},
+ {#{initial_cluster_size => <<"foo">>}, error}]],
ok.
delete_super_stream_merge_defaults(_Config) ->
@@ -387,6 +420,7 @@ add_delete_super_stream_run(Config) ->
timeout => 10000,
vhost => <<"/">>},
+ % with number of partitions
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{partitions => 3},
@@ -399,6 +433,7 @@ add_delete_super_stream_run(Config) ->
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
+ % with routing keys
?assertMatch({ok, _},
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
maps:merge(#{routing_keys =>
@@ -413,6 +448,38 @@ add_delete_super_stream_run(Config) ->
?assertEqual({error, stream_not_found},
partitions(Config, <<"invoices">>)),
+ % with arguments
+ ExtraOptions =
+ #{partitions => 3,
+ max_length_bytes => <<"50mb">>,
+ max_age => <<"PT10M">>,
+ stream_max_segment_size_bytes => <<"1mb">>,
+ leader_locator => <<"random">>,
+ initial_cluster_size => <<"1">>},
+
+ ?assertMatch({ok, _},
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(ExtraOptions, Opts))),
+
+ {ok, Q} = queue_lookup(Config, <<"invoices-0">>),
+ Args = amqqueue:get_arguments(Q),
+ ?assertMatch({_, <<"random">>},
+ rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
+ ?assertMatch({_, 1},
+ rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
+ ?assertMatch({_, 1000000},
+ rabbit_misc:table_lookup(Args,
+ <<"x-stream-max-segment-size-bytes">>)),
+ ?assertMatch({_, <<"600s">>},
+ rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
+ ?assertMatch({_, 50000000},
+ rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
+ ?assertMatch({_, <<"stream">>},
+ rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
+
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+
ok.
partitions(Config, Name) ->
@@ -497,3 +564,11 @@ amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.
+
+queue_lookup(Config, Q) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q),
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_amqqueue,
+ lookup,
+ [QueueName]).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
index 3c036fc1e2..b47a954a95 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -116,7 +116,7 @@ create_super_stream(Config, Name, Partitions, RKs) ->
[<<"/">>,
Name,
Partitions,
- [],
+ #{},
RKs,
<<"guest">>]).