diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-30 10:43:29 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-10-11 16:50:03 +0200 |
commit | 6b9589bae42b0bec11d75ae906810260da14ca76 (patch) | |
tree | a8e182aa89da61399113c7f5a4f6396b04d4b3ad | |
parent | ecbd9698341406543efd195aff481c6704f6022b (diff) | |
download | rabbitmq-server-git-super-stream-cli.tar.gz |
Handle stream arguments in add_super_stream commandsuper-stream-cli
max-age, leader-locator, etc.
6 files changed, 295 insertions, 11 deletions
diff --git a/deps/rabbit_common/src/rabbit_date_time.erl b/deps/rabbit_common/src/rabbit_date_time.erl new file mode 100644 index 0000000000..e4a56ad783 --- /dev/null +++ b/deps/rabbit_common/src/rabbit_date_time.erl @@ -0,0 +1,48 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_date_time). + +-export([parse_duration/1]). + +-type datetime_plist() :: list({atom(), integer()}). + +% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl +-spec gi(string()) -> integer(). +gi(DS) -> + {Int, _Rest} = string:to_integer(DS), + case Int of + error -> + 0; + _ -> + Int + end. + +-spec parse_duration(string()) -> datetime_plist(). +parse_duration(Bin) + when is_binary(Bin) -> %TODO extended format + parse_duration(binary_to_list(Bin)); +parse_duration(Str) -> + case re:run(Str, + "^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0" + "-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?(" + "?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]" + "+)?)S)?)?$", + [{capture, [sign, years, months, days, hours, minutes, seconds], + list}]) + of + {match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} -> + {ok, [{sign, Sign}, + {years, gi(Years)}, + {months, gi(Months)}, + {days, gi(Days)}, + {hours, gi(Hours)}, + {minutes, gi(Minutes)}, + {seconds, gi(Seconds)}]}; + nomatch -> + error + end. diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl index 105488bed0..f00df8787b 100644 --- a/deps/rabbit_common/test/unit_SUITE.erl +++ b/deps/rabbit_common/test/unit_SUITE.erl @@ -44,7 +44,8 @@ groups() -> frame_encoding_does_not_fail_with_empty_binary_payload, amqp_table_conversion, name_type, - get_erl_path + get_erl_path, + date_time_parse_duration ]}, {parse_mem_limit, [parallel], [ parse_mem_limit_relative_exactly_max, @@ -460,3 +461,23 @@ get_erl_path(_) -> ?assertNotMatch(nomatch, string:find(Exe, "erl")) end, ok. + +date_time_parse_duration(_) -> + ?assertEqual( + {ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]}, + rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]}, + rabbit_date_time:parse_duration("PT6M") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]}, + rabbit_date_time:parse_duration("PT10M30S") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]}, + rabbit_date_time:parse_duration("P5DT8H") + ), + ?assertEqual(error, rabbit_date_time:parse_duration("foo")), + ok.
\ No newline at end of file diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 7843ed4895..4f2093a020 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -43,7 +43,13 @@ description() -> <<"Add a super stream (experimental feature)">>. switches() -> - [{partitions, integer}, {routing_keys, string}]. + [{partitions, integer}, + {routing_keys, string}, + {max_length_bytes, string}, + {max_age, string}, + {stream_max_segment_size_bytes, string}, + {leader_locator, string}, + {initial_cluster_size, integer}]. help_section() -> {plugin, stream}. @@ -55,11 +61,73 @@ validate([_Name], #{partitions := _, routing_keys := _}) -> "Specify --partitions or routing-keys, not both."}; validate([_Name], #{partitions := Partitions}) when Partitions < 1 -> {validation_failure, "The partition number must be greater than 0"}; -validate([_Name], _Opts) -> - ok; +validate([_Name], Opts) -> + validate_stream_arguments(Opts); validate(_, _Opts) -> {validation_failure, too_many_args}. +validate_stream_arguments(#{max_length_bytes := Value} = Opts) -> + case parse_information_unit(Value) of + error -> + {validation_failure, + "Invalid value for --max-length-bytes, valid example " + "values: 100gb, 50mb"}; + _ -> + validate_stream_arguments(maps:remove(max_length_bytes, Opts)) + end; +validate_stream_arguments(#{max_age := Value} = Opts) -> + case rabbit_date_time:parse_duration(Value) of + {ok, _} -> + validate_stream_arguments(maps:remove(max_age, Opts)); + error -> + {validation_failure, + "Invalid value for --max-age, the value must a " + "ISO 8601 duration, e.g. e.g. PT10M30S for 10 " + "minutes 30 seconds, P5DT8H for 5 days 8 hours."} + end; +validate_stream_arguments(#{stream_max_segment_size_bytes := Value} = + Opts) -> + case parse_information_unit(Value) of + error -> + {validation_failure, + "Invalid value for --stream-max-segment-size-bytes, " + "valid example values: 100gb, 50mb"}; + _ -> + validate_stream_arguments(maps:remove(stream_max_segment_size_bytes, + Opts)) + end; +validate_stream_arguments(#{leader_locator := <<"client-local">>} = + Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := <<"least-leaders">>} = + Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := _}) -> + {validation_failure, + "Invalid value for --leader-locator, valid values " + "are client-local, random, least-leaders."}; +validate_stream_arguments(#{initial_cluster_size := Value} = Opts) -> + try + case rabbit_data_coercion:to_integer(Value) of + S when S > 0 -> + validate_stream_arguments(maps:remove(initial_cluster_size, + Opts)); + _ -> + {validation_failure, + "Invalid value for --initial-cluster-size, the " + "value must be positive."} + end + catch + error:_ -> + {validation_failure, + "Invalid value for --initial-cluster-size, the " + "value must be a positive integer."} + end; +validate_stream_arguments(_) -> + ok. + merge_defaults(_Args, #{routing_keys := _V} = Opts) -> {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}; merge_defaults(_Args, Opts) -> @@ -77,7 +145,25 @@ usage_additional() -> "exclusive with --routing-keys."], ["--routing-keys <routing-keys>", "Comma-separated list of routing keys. Mutually " - "exclusive with --partitions."]]. + "exclusive with --partitions."], + ["--max-length-bytes <max-length-bytes>", + "The maximum size of partition streams, example " + "values: 20gb, 500mb."], + ["--max-age <max-age>", + "The maximum age of partition stream segments, " + "using the ISO 8601 duration format, e.g. PT10M30S " + "for 10 minutes 30 seconds, P5DT8H for 5 days " + "8 hours."], + ["--stream-max-segment-size-bytes <stream-max-segment-si" + "ze-bytes>", + "The maximum size of partition stream segments, " + "example values: 500mb, 1gb."], + ["--leader-locator <leader-locator>", + "Leader locator strategy for partition streams, " + "possible values are client-local, least-leaders, " + "random."], + ["--initial-cluster-size <initial-cluster-size>", + "The initial cluster size of partition streams."]]. usage_doc_guides() -> [?STREAM_GUIDE_URL]. @@ -86,7 +172,8 @@ run([SuperStream], #{node := NodeName, vhost := VHost, timeout := Timeout, - partitions := Partitions}) -> + partitions := Partitions} = + Opts) -> Streams = [list_to_binary(binary_to_list(SuperStream) ++ "-" @@ -99,12 +186,14 @@ run([SuperStream], VHost, SuperStream, Streams, + stream_arguments(Opts), RoutingKeys); run([SuperStream], #{node := NodeName, vhost := VHost, timeout := Timeout, - routing_keys := RoutingKeysStr}) -> + routing_keys := RoutingKeysStr} = + Opts) -> RoutingKeys = [rabbit_data_coercion:to_binary( string:strip(K)) @@ -121,13 +210,56 @@ run([SuperStream], VHost, SuperStream, Streams, + stream_arguments(Opts), RoutingKeys). +stream_arguments(Opts) -> + stream_arguments(#{}, Opts). + +stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 -> + Acc; +stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) -> + stream_arguments(maps:put(<<"max-length-bytes">>, + parse_information_unit(Value), Acc), + maps:remove(max_length_bytes, Arguments)); +stream_arguments(Acc, #{max_age := Value} = Arguments) -> + {ok, Duration} = rabbit_date_time:parse_duration(Value), + DurationInSeconds = duration_to_seconds(Duration), + stream_arguments(maps:put(<<"max-age">>, + list_to_binary(integer_to_list(DurationInSeconds) + ++ "s"), + Acc), + maps:remove(max_age, Arguments)); +stream_arguments(Acc, + #{stream_max_segment_size_bytes := Value} = Arguments) -> + stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>, + parse_information_unit(Value), Acc), + maps:remove(stream_max_segment_size_bytes, Arguments)); +stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) -> + stream_arguments(maps:put(<<"initial-cluster-size">>, + rabbit_data_coercion:to_binary(Value), Acc), + maps:remove(initial_cluster_size, Arguments)); +stream_arguments(Acc, #{leader_locator := Value} = Arguments) -> + stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc), + maps:remove(leader_locator, Arguments)); +stream_arguments(ArgumentsAcc, _Arguments) -> + ArgumentsAcc. + +duration_to_seconds([{sign, _}, + {years, Y}, + {months, M}, + {days, D}, + {hours, H}, + {minutes, Mn}, + {seconds, S}]) -> + Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S. + create_super_stream(NodeName, Timeout, VHost, SuperStream, Streams, + Arguments, RoutingKeys) -> case rabbit_misc:rpc_call(NodeName, rabbit_stream_manager, @@ -135,7 +267,7 @@ create_super_stream(NodeName, [VHost, SuperStream, Streams, - [], + Arguments, RoutingKeys, cli_acting_user()], Timeout) @@ -149,7 +281,7 @@ create_super_stream(NodeName, end. banner(_, _) -> - <<"Adding a super stream ...">>. + <<"Adding a super stream (experimental feature)...">>. output({error, Msg}, _Opts) -> {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg}; @@ -158,3 +290,11 @@ output({ok, Msg}, _Opts) -> cli_acting_user() -> 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(). + +parse_information_unit(Value) -> + case rabbit_resource_monitor_misc:parse_information_unit(Value) of + {ok, R} -> + integer_to_binary(R); + {error, _} -> + error + end. diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl index 90bf2db414..0a2f0f785e 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl @@ -86,7 +86,7 @@ delete_super_stream(NodeName, Timeout, VHost, SuperStream) -> end. banner(_, _) -> - <<"Deleting a super stream ...">>. + <<"Deleting a super stream (experimental feature)...">>. output({error, Msg}, _Opts) -> {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg}; diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index 42109a100b..902bfee3c8 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -363,6 +363,39 @@ add_super_stream_validate(_Config) -> ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{routing_keys => <<"a,b,c">>})), + + [case Expected of + ok -> + ?assertEqual(ok, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts)); + error -> + ?assertMatch({validation_failure, _}, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts)) + end + || {Opts, Expected} + <- [{#{max_length_bytes => 1000}, ok}, + {#{max_length_bytes => <<"1000">>}, ok}, + {#{max_length_bytes => <<"100gb">>}, ok}, + {#{max_length_bytes => <<"50mb">>}, ok}, + {#{max_length_bytes => <<"50bm">>}, error}, + {#{max_age => <<"PT10M">>}, ok}, + {#{max_age => <<"P5DT8H">>}, ok}, + {#{max_age => <<"foo">>}, error}, + {#{stream_max_segment_size_bytes => 1000}, ok}, + {#{stream_max_segment_size_bytes => <<"1000">>}, ok}, + {#{stream_max_segment_size_bytes => <<"100gb">>}, ok}, + {#{stream_max_segment_size_bytes => <<"50mb">>}, ok}, + {#{stream_max_segment_size_bytes => <<"50bm">>}, error}, + {#{leader_locator => <<"client-local">>}, ok}, + {#{leader_locator => <<"least-leaders">>}, ok}, + {#{leader_locator => <<"random">>}, ok}, + {#{leader_locator => <<"foo">>}, error}, + {#{initial_cluster_size => <<"1">>}, ok}, + {#{initial_cluster_size => <<"2">>}, ok}, + {#{initial_cluster_size => <<"3">>}, ok}, + {#{initial_cluster_size => <<"0">>}, error}, + {#{initial_cluster_size => <<"-1">>}, error}, + {#{initial_cluster_size => <<"foo">>}, error}]], ok. delete_super_stream_merge_defaults(_Config) -> @@ -387,6 +420,7 @@ add_delete_super_stream_run(Config) -> timeout => 10000, vhost => <<"/">>}, + % with number of partitions ?assertMatch({ok, _}, ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], maps:merge(#{partitions => 3}, @@ -399,6 +433,7 @@ add_delete_super_stream_run(Config) -> ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), + % with routing keys ?assertMatch({ok, _}, ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], maps:merge(#{routing_keys => @@ -413,6 +448,38 @@ add_delete_super_stream_run(Config) -> ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), + % with arguments + ExtraOptions = + #{partitions => 3, + max_length_bytes => <<"50mb">>, + max_age => <<"PT10M">>, + stream_max_segment_size_bytes => <<"1mb">>, + leader_locator => <<"random">>, + initial_cluster_size => <<"1">>}, + + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(ExtraOptions, Opts))), + + {ok, Q} = queue_lookup(Config, <<"invoices-0">>), + Args = amqqueue:get_arguments(Q), + ?assertMatch({_, <<"random">>}, + rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)), + ?assertMatch({_, 1}, + rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)), + ?assertMatch({_, 1000000}, + rabbit_misc:table_lookup(Args, + <<"x-stream-max-segment-size-bytes">>)), + ?assertMatch({_, <<"600s">>}, + rabbit_misc:table_lookup(Args, <<"x-max-age">>)), + ?assertMatch({_, 50000000}, + rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)), + ?assertMatch({_, <<"stream">>}, + rabbit_misc:table_lookup(Args, <<"x-queue-type">>)), + + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ok. partitions(Config, Name) -> @@ -497,3 +564,11 @@ amqp_params(network, _, Port) -> #amqp_params_network{port = Port}; amqp_params(direct, Node, _) -> #amqp_params_direct{node = Node}. + +queue_lookup(Config, Q) -> + QueueName = rabbit_misc:r(<<"/">>, queue, Q), + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_amqqueue, + lookup, + [QueueName]). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 3c036fc1e2..b47a954a95 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -116,7 +116,7 @@ create_super_stream(Config, Name, Partitions, RKs) -> [<<"/">>, Name, Partitions, - [], + #{}, RKs, <<"guest">>]). |