diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
commit | f23a51261d9502ec39df0f8db47ba6b22aa7659f (patch) | |
tree | 53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_stream/test | |
parent | afa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff) | |
parent | 9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff) | |
download | rabbitmq-server-git-stream-timestamp-offset.tar.gz |
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_stream/test')
18 files changed, 2561 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl new file mode 100644 index 0000000000..41ab5904ff --- /dev/null +++ b/deps/rabbitmq_stream/test/command_SUITE.erl @@ -0,0 +1,136 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(command_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stream.hrl"). + + +-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand'). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + merge_defaults, + run + ]} + ]. + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +merge_defaults(_Config) -> + {[<<"conn_name">>], #{verbose := false}} = + ?COMMAND:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}), + + {[<<"other_key">>], #{verbose := false}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}). + + +run(Config) -> + + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = #{node => Node, timeout => 10000, verbose => false}, + + %% No connections + [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)), + + StreamPort = rabbit_stream_SUITE:get_stream_port(Config), + + S1 = start_stream_connection(StreamPort), + ct:sleep(100), + + [[{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + S2 = start_stream_connection(StreamPort), + ct:sleep(100), + + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + start_amqp_connection(network, Node, Port), + + %% There are still just two connections + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + start_amqp_connection(direct, Node, Port), + + %% Still two MQTT connections, one direct AMQP 0-9-1 connection + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + %% Verbose returns all keys + Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})), + + %% There are two connections + [First, _Second] = AllKeys, + + %% Keys are INFO_ITEMS + KeysCount = length(?INFO_ITEMS), + KeysCount = length(First), + + {Keys, _} = lists:unzip(First), + + [] = Keys -- ?INFO_ITEMS, + [] = ?INFO_ITEMS -- Keys, + + rabbit_stream_SUITE:test_close(S1), + rabbit_stream_SUITE:test_close(S2), + ok. + +start_stream_connection(Port) -> + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, + {mode, binary}]), + rabbit_stream_SUITE:test_peer_properties(S), + rabbit_stream_SUITE:test_authenticate(S), + S. + +start_amqp_connection(Type, Node, Port) -> + Params = amqp_params(Type, Node, Port), + {ok, _Connection} = amqp_connection:start(Params). + +amqp_params(network, _, Port) -> + #amqp_params_network{port = Port}; +amqp_params(direct, Node, _) -> + #amqp_params_direct{node = Node}. diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE.erl b/deps/rabbitmq_stream/test/config_schema_SUITE.erl new file mode 100644 index 0000000000..a298811541 --- /dev/null +++ b/deps/rabbitmq_stream/test/config_schema_SUITE.erl @@ -0,0 +1,53 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(config_schema_SUITE). + +-compile(export_all). + +all() -> + [ + run_snippets + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +run_snippets(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, run_snippets1, [Config]). + +run_snippets1(Config) -> + rabbit_ct_config_schema:run_snippets(Config). diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets b/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets new file mode 100644 index 0000000000..8f60ef9710 --- /dev/null +++ b/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets @@ -0,0 +1,73 @@ +[{listener_port, + "stream.listeners.tcp.1 = 12345", + [{rabbitmq_stream,[{tcp_listeners,[12345]}]}], + [rabbitmq_stream]}, + {listeners_ip, + "stream.listeners.tcp.1 = 127.0.0.1:5555 + stream.listeners.tcp.2 = ::1:5555", + [{rabbitmq_stream,[{tcp_listeners,[{"127.0.0.1",5555},{"::1",5555}]}]}], + [rabbitmq_stream]}, + + {listener_tcp_options, + "stream.listeners.tcp.1 = 127.0.0.1:5555 + stream.listeners.tcp.2 = ::1:5555 + + stream.tcp_listen_options.backlog = 2048 + stream.tcp_listen_options.recbuf = 8192 + stream.tcp_listen_options.sndbuf = 8192 + + stream.tcp_listen_options.keepalive = true + stream.tcp_listen_options.nodelay = true + + stream.tcp_listen_options.exit_on_close = true + + stream.tcp_listen_options.send_timeout = 120 +", + [{rabbitmq_stream,[ + {tcp_listeners,[ + {"127.0.0.1",5555}, + {"::1",5555} + ]} + , {tcp_listen_options, [ + {backlog, 2048}, + {exit_on_close, true}, + + {recbuf, 8192}, + {sndbuf, 8192}, + + {send_timeout, 120}, + + {keepalive, true}, + {nodelay, true} + ]} + ]}], + [rabbitmq_stream]}, + {defaults, + "stream.frame_max = 1048576 + stream.heartbeat = 60 + stream.initial_credits = 50000 + stream.credits_required_for_unblocking = 12500", + [{rabbitmq_stream,[{initial_credits, 50000}, + {credits_required_for_unblocking, 12500}, + {frame_max, 1048576}, + {heartbeat, 60}]}], + [rabbitmq_stream]}, + {advertised_host_port, + "stream.advertised_host = some-host + stream.advertised_port = 5556", + [{rabbitmq_stream,[{advertised_host, <<"some-host">>}, + {advertised_port, 5556}]}], + [rabbitmq_stream]}, + {credits, + "stream.frame_max = 2097152 + stream.heartbeat = 120", + [{rabbitmq_stream,[{frame_max, 2097152}, + {heartbeat, 120}]}], + [rabbitmq_stream]}, + {protocol, + "stream.initial_credits = 100000 + stream.credits_required_for_unblocking = 25000", + [{rabbitmq_stream,[{initial_credits, 100000}, + {credits_required_for_unblocking, 25000}]}], + [rabbitmq_stream]} +].
\ No newline at end of file diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl new file mode 100644 index 0000000000..4197b1de71 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -0,0 +1,266 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include("rabbit_stream.hrl"). + +-compile(export_all). + +all() -> + [ + {group, single_node}, + {group, cluster} + ]. + +groups() -> + [ + {single_node, [], [test_stream]}, + {cluster, [], [test_stream, java]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(single_node, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()); +init_per_group(cluster = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + rabbit_ct_helpers:run_setup_steps(Config2, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {aten, [{poll_interval, 1000}]}) + end] ++ + rabbit_ct_broker_helpers:setup_steps()); +init_per_group(_, Config) -> + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_group(java, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config); +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_Test, _Config) -> + ok. + +test_stream(Config) -> + Port = get_stream_port(Config), + test_server(Port), + ok. + +java(Config) -> + StreamPortNode1 = get_stream_port(Config, 0), + StreamPortNode2 = get_stream_port(Config, 1), + Node1Name = get_node_name(Config, 0), + Node2Name = get_node_name(Config, 1), + RabbitMqCtl = get_rabbitmqctl(Config), + DataDir = rabbit_ct_helpers:get_config(Config, data_dir), + MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests", + {"NODE1_STREAM_PORT=~b", [StreamPortNode1]}, + {"NODE1_NAME=~p", [Node1Name]}, + {"NODE2_NAME=~p", [Node2Name]}, + {"NODE2_STREAM_PORT=~b", [StreamPortNode2]}, + {"RABBITMQCTL=~p", [RabbitMqCtl]} + ]), + {ok, _} = MakeResult. + +get_rabbitmqctl(Config) -> + rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd). + +get_stream_port(Config) -> + get_stream_port(Config, 0). + +get_stream_port(Config, Node) -> + rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream). + +get_node_name(Config) -> + get_node_name(Config, 0). + +get_node_name(Config, Node) -> + rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename). + +test_server(Port) -> + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, + {mode, binary}]), + test_peer_properties(S), + test_authenticate(S), + Stream = <<"stream1">>, + test_create_stream(S, Stream), + Body = <<"hello">>, + test_publish_confirm(S, Stream, Body), + SubscriptionId = 42, + Rest = test_subscribe(S, SubscriptionId, Stream), + test_deliver(S, Rest, SubscriptionId, Body), + test_delete_stream(S, Stream), + test_metadata_update_stream_deleted(S, Stream), + test_close(S), + closed = wait_for_socket_close(S, 10), + ok. + +test_peer_properties(S) -> + PeerPropertiesFrame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>, + PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame), + gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>), + {ok, <<_Size:32, ?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, _Rest/binary>>} = gen_tcp:recv(S, 0, 5000). + +test_authenticate(S) -> + SaslHandshakeFrame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>, + SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame), + gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>), + Plain = <<"PLAIN">>, + AmqPlain = <<"AMQPLAIN">>, + {ok, SaslAvailable} = gen_tcp:recv(S, 0, 5000), + %% mechanisms order is not deterministic, so checking both orders + ok = case SaslAvailable of + <<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32, + 5:16, Plain:5/binary, 8:16, AmqPlain:8/binary>> -> + ok; + <<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32, + 8:16, AmqPlain:8/binary, 5:16, Plain:5/binary>> -> + ok; + _ -> + failed + end, + + Username = <<"guest">>, + Password = <<"guest">>, + Null = 0, + PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>, + PlainSaslSize = byte_size(PlainSasl), + + SaslAuthenticateFrame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32, + 5:16, Plain/binary, PlainSaslSize:32, PlainSasl/binary>>, + + SaslAuthenticateFrameSize = byte_size(SaslAuthenticateFrame), + + gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>), + + {ok, <<10:32, ?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32, ?RESPONSE_CODE_OK:16, RestTune/binary>>} = gen_tcp:recv(S, 0, 5000), + + TuneExpected = <<12:32, ?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>, + case RestTune of + <<>> -> + {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000); + TuneReceived -> + TuneExpected = TuneReceived + end, + + TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>, + TuneFrameSize = byte_size(TuneFrame), + gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>), + + VirtualHost = <<"/">>, + VirtualHostLength = byte_size(VirtualHost), + OpenFrame = <<?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>, + OpenFrameSize = byte_size(OpenFrame), + gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>), + {ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). + + +test_create_stream(S, Stream) -> + StreamSize = byte_size(Stream), + CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary, 0:32>>, + FrameSize = byte_size(CreateStreamFrame), + gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>), + {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). + +test_delete_stream(S, Stream) -> + StreamSize = byte_size(Stream), + DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>, + FrameSize = byte_size(DeleteStreamFrame), + gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>), + ResponseFrameSize = 10, + {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). + +test_publish_confirm(S, Stream, Body) -> + BodySize = byte_size(Body), + StreamSize = byte_size(Stream), + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, + FrameSize = byte_size(PublishFrame), + gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), + {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). + +test_subscribe(S, SubscriptionId, Stream) -> + StreamSize = byte_size(Stream), + SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:8, StreamSize:16, Stream:StreamSize/binary, + ?OFFSET_TYPE_OFFSET:16, 0:64, 10:16>>, + FrameSize = byte_size(SubscribeFrame), + gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>), + Res = gen_tcp:recv(S, 0, 5000), + {ok, <<_Size:32, ?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res, + Rest. + +test_deliver(S, Rest, SubscriptionId, Body) -> + BodySize = byte_size(Body), + Frame = read_frame(S, Rest), + <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, + 1:16, 1:32, + _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, + 0:1, BodySize:31/unsigned, Body/binary>> = Frame. + +test_metadata_update_stream_deleted(S, Stream) -> + StreamSize = byte_size(Stream), + {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000). + +test_close(S) -> + CloseReason = <<"OK">>, + CloseReasonSize = byte_size(CloseReason), + CloseFrame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, CloseReasonSize:16, CloseReason/binary>>, + CloseFrameSize = byte_size(CloseFrame), + gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>), + {ok, <<10:32, ?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). + +wait_for_socket_close(_S, 0) -> + not_closed; +wait_for_socket_close(S, Attempt) -> + case gen_tcp:recv(S, 0, 1000) of + {error, timeout} -> + wait_for_socket_close(S, Attempt - 1); + {error, closed} -> + closed + end. + +read_frame(S, Buffer) -> + inet:setopts(S, [{active, once}]), + receive + {tcp, S, Received} -> + Data = <<Buffer/binary, Received/binary>>, + case Data of + <<Size:32, _Body:Size/binary>> -> + Data; + _ -> + read_frame(S, Data) + end + after + 1000 -> + inet:setopts(S, [{active, false}]), + Buffer + end.
\ No newline at end of file diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore new file mode 100644 index 0000000000..4c70cdb707 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore @@ -0,0 +1,3 @@ +/build/ +/lib/ +/target/ diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000000..b901097f2d --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000000..642d572ce9 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile new file mode 100644 index 0000000000..89be00931c --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile @@ -0,0 +1,18 @@ +export PATH :=$(CURDIR):$(PATH) +HOSTNAME := $(shell hostname) +MVN_FLAGS += -Dhostname=$(HOSTNAME) \ + -Dnode1.stream.port=$(NODE1_STREAM_PORT) \ + -Dnode1.name=$(NODE1_NAME) \ + -Dnode2.name=$(NODE2_NAME) \ + -Dnode2.stream.port=$(NODE2_STREAM_PORT) \ + -Drabbitmqctl.bin=$(RABBITMQCTL) + +.PHONY: tests clean + +tests: + # Note: to run a single test + # @mvnw -q $(MVN_FLAGS) -Dtest=StreamTest#metadataOnClusterShouldReturnLeaderAndReplicas test + @mvnw $(MVN_FLAGS) test + +clean: + @mvnw clean diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw new file mode 100755 index 0000000000..41c0f0c23d --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd new file mode 100644 index 0000000000..86115719e5 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml new file mode 100644 index 0000000000..aa27c29baf --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -0,0 +1,143 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.rabbitmq.stream</groupId> + <artifactId>rabbitmq-stream-tests</artifactId> + <version>1.0-SNAPSHOT</version> + + <licenses> + <license> + <name>MPL 2.0</name> + <url>https://www.mozilla.org/en-US/MPL/2.0/</url> + <distribution>repo</distribution> + </license> + </licenses> + + <developers> + <developer> + <email>info@rabbitmq.com</email> + <name>Team RabbitMQ</name> + <organization>VMware, Inc. or its affiliates.</organization> + <organizationUrl>https://rabbitmq.com</organizationUrl> + </developer> + </developers> + + <properties> + <stream-client.version>0.1.0-SNAPSHOT</stream-client.version> + <proton-j.version>0.33.6</proton-j.version> + <junit.jupiter.version>5.7.0</junit.jupiter.version> + <assertj.version>3.17.2</assertj.version> + <mockito.version>3.5.11</mockito.version> + <logback.version>1.2.3</logback.version> + <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> + <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <spotless.version>2.2.0</spotless.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>stream-client</artifactId> + <version>${stream-client.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + <version>${proton-j.version}</version> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>${logback.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + + <plugins> + + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven.compiler.plugin.version}</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <compilerArgs> + <arg>-Xlint:deprecation</arg> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + + <plugin> + <groupId>com.diffplug.spotless</groupId> + <artifactId>spotless-maven-plugin</artifactId> + <version>${spotless.version}</version> + <configuration> + <java> + <googleJavaFormat> + <version>1.9</version> + <style>GOOGLE</style> + </googleJavaFormat> + </java> + </configuration> + </plugin> + + </plugins> + + </build> + + <repositories> + + <repository> + <id>ossrh</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + <snapshots><enabled>true</enabled></snapshots> + <releases><enabled>false</enabled></releases> + </repository> + + </repositories> + +</project>
\ No newline at end of file diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java new file mode 100644 index 0000000000..993c19b852 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java @@ -0,0 +1,65 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.Response; +import com.rabbitmq.stream.impl.Client.StreamMetadata; +import java.util.Collections; +import java.util.UUID; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class ClusterSizeTest { + + TestUtils.ClientFactory cf; + + @ParameterizedTest + @ValueSource(strings = {"-1", "0"}) + void clusterSizeZeroShouldReturnError(String clusterSize) { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + String s = UUID.randomUUID().toString(); + Response response = + client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize)); + assertThat(response.isOk()).isFalse(); + assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED); + } + + @ParameterizedTest + @CsvSource({"1,1", "2,2", "3,3", "5,3"}) + void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize) { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + String s = UUID.randomUUID().toString(); + try { + Response response = + client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize)); + assertThat(response.isOk()).isTrue(); + StreamMetadata metadata = client.metadata(s).get(s); + assertThat(metadata).isNotNull(); + assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + int actualClusterSize = metadata.getLeader() == null ? 0 : 1 + metadata.getReplicas().size(); + assertThat(actualClusterSize).isEqualTo(expectedClusterSize); + } finally { + client.delete(s); + } + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java new file mode 100644 index 0000000000..c7a390f00d --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -0,0 +1,541 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import com.rabbitmq.stream.codec.WrapperMessageBuilder; +import com.rabbitmq.stream.impl.Client; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class FailureTest { + + TestUtils.ClientFactory cf; + String stream; + ExecutorService executorService; + + static void wait(Duration duration) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @AfterEach + void tearDown() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + @Test + void leaderFailureWhenPublisherConnectedToReplica() throws Exception { + Set<String> messages = new HashSet<>(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + assertThat(streamMetadata.getReplicas()).isNotEmpty(); + Client.Broker replica = streamMetadata.getReplicas().get(0); + assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1()); + + AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1)); + + CountDownLatch metadataLatch = new CountDownLatch(1); + Client publisher = + cf.get( + new Client.ClientParameters() + .port(replica.getPort()) + .metadataListener((stream, code) -> metadataLatch.countDown()) + .publishConfirmListener( + (publisherId, publishingId) -> confirmLatch.get().countDown())); + String message = "all nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + + try { + Host.rabbitmqctl("stop_app"); + try { + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + fail("Node app stopped, connecting should not be possible"); + } catch (Exception e) { + // OK + } + + assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // wait until there's a new leader + TestUtils.waitAtMost( + Duration.ofSeconds(10), + () -> { + Client.StreamMetadata m = publisher.metadata(stream).get(stream); + return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); + }); + + confirmLatch.set(new CountDownLatch(1)); + message = "2 nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(message.getBytes(StandardCharsets.UTF_8)) + .build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + } finally { + Host.rabbitmqctl("start_app"); + } + + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = publisher.metadata(stream).get(stream); + return m.getReplicas().size() == 2; + }); + + confirmLatch.set(new CountDownLatch(1)); + message = "all nodes are back"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + + CountDownLatch consumeLatch = new CountDownLatch(2); + Set<String> bodies = ConcurrentHashMap.newKeySet(); + Client consumer = + cf.get( + new Client.ClientParameters() + .port(TestUtils.streamPortNode1()) + .messageListener( + (subscriptionId, offset, msg) -> { + bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumeLatch.countDown(); + })); + + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + return response.isOk(); + }); + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies) + .hasSize(3) + .contains("all nodes available", "2 nodes available", "all nodes are back"); + } + + @Test + void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + + Map<Long, Message> published = new ConcurrentHashMap<>(); + Set<Message> confirmed = ConcurrentHashMap.newKeySet(); + + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); + }; + + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean connected = new AtomicBoolean(true); + AtomicReference<Client> publisher = new AtomicReference<>(); + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + connected.set(false); + + Client locator = + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until there's a new leader + try { + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = locator.metadata(stream).get(stream); + return m.getLeader() != null + && m.getLeader().getPort() != TestUtils.streamPortNode1(); + }); + } catch (Throwable e) { + reconnectionLatch.countDown(); + return; + } + + int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); + Client newPublisher = + cf.get( + new Client.ClientParameters() + .port(newLeaderPort) + .shutdownListener(shutdownListenerReference.get()) + .publishConfirmListener(publishConfirmListener)); + + generation.incrementAndGet(); + published.clear(); + publisher.set(newPublisher); + connected.set(true); + + reconnectionLatch.countDown(); + }); + } + }; + shutdownListenerReference.set(shutdownListener); + + client = + cf.get( + new Client.ClientParameters() + .port(streamMetadata.getLeader().getPort()) + .shutdownListener(shutdownListener) + .publishConfirmListener(publishConfirmListener)); + + publisher.set(client); + + AtomicBoolean keepPublishing = new AtomicBoolean(true); + + executorService.submit( + () -> { + while (keepPublishing.get()) { + if (connected.get()) { + Message message = + publisher + .get() + .messageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher + .get() + .publish(stream, (byte) 1, Collections.singletonList(message)) + .get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going + } + wait(Duration.ofMillis(10)); + } else { + wait(Duration.ofSeconds(1)); + } + } + }); + + // let's publish for a bit of time + Thread.sleep(2000); + + assertThat(confirmed).isNotEmpty(); + int confirmedCount = confirmed.size(); + + try { + Host.rabbitmqctl("stop_app"); + + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // let's publish for a bit of time + Thread.sleep(2000); + + } finally { + Host.rabbitmqctl("start_app"); + } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); + + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; + }); + + // let's publish for a bit of time + Thread.sleep(2000); + + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + + keepPublishing.set(false); + + Queue<Message> consumed = new ConcurrentLinkedQueue<>(); + Set<Long> generations = ConcurrentHashMap.newKeySet(); + CountDownLatch consumedLatch = new CountDownLatch(1); + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + Client consumer = + cf.get( + new Client.ClientParameters() + .port(m.getReplicas().get(0).getPort()) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + if (consumed.size() == confirmed.size()) { + consumedLatch.countDown(); + } + })); + + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); + + assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; + } + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + } + + @Test + void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + + Map<Long, Message> published = new ConcurrentHashMap<>(); + Set<Message> confirmed = ConcurrentHashMap.newKeySet(); + Set<Long> confirmedIds = ConcurrentHashMap.newKeySet(); + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); + confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong()); + }; + + Client publisher = + cf.get( + new Client.ClientParameters() + .port(streamMetadata.getLeader().getPort()) + .publishConfirmListener(publishConfirmListener)); + + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean keepPublishing = new AtomicBoolean(true); + CountDownLatch publishingLatch = new CountDownLatch(1); + + executorService.submit( + () -> { + while (keepPublishing.get()) { + Message message = + new WrapperMessageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going + } + wait(Duration.ofMillis(10)); + } + publishingLatch.countDown(); + }); + + Queue<Message> consumed = new ConcurrentLinkedQueue<>(); + + Client.Broker replica = + streamMetadata.getReplicas().stream() + .filter(broker -> broker.getPort() == TestUtils.streamPortNode2()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException()); + + AtomicLong lastProcessedOffset = new AtomicLong(-1); + Set<Long> generations = ConcurrentHashMap.newKeySet(); + Set<Long> consumedIds = ConcurrentHashMap.newKeySet(); + Client.MessageListener messageListener = + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + consumedIds.add(message.getProperties().getMessageIdAsLong()); + lastProcessedOffset.set(offset); + }; + + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + int newReplicaPort = m.getReplicas().get(0).getPort(); + + Client newConsumer = + cf.get( + new Client.ClientParameters() + .port(newReplicaPort) + .shutdownListener(shutdownListenerReference.get()) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); + + newConsumer.subscribe( + (byte) 1, + stream, + OffsetSpecification.offset(lastProcessedOffset.get() + 1), + 10); + + generation.incrementAndGet(); + reconnectionLatch.countDown(); + }); + } + }; + shutdownListenerReference.set(shutdownListener); + + Client consumer = + cf.get( + new Client.ClientParameters() + .port(replica.getPort()) + .shutdownListener(shutdownListener) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); + + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); + + // let's publish for a bit of time + Thread.sleep(2000); + + assertThat(confirmed).isNotEmpty(); + assertThat(consumed).isNotEmpty(); + int confirmedCount = confirmed.size(); + + try { + Host.rabbitmqctl("stop_app", Host.node2name()); + + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // let's publish for a bit of time + Thread.sleep(2000); + + } finally { + Host.rabbitmqctl("start_app", Host.node2name()); + } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); + + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; + }); + + // let's publish for a bit of time + Thread.sleep(2000); + + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + + keepPublishing.set(false); + + assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); + + TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); + + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; + } + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + + confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java new file mode 100644 index 0000000000..0134038a8b --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java @@ -0,0 +1,117 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class Host { + + private static String capture(InputStream is) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + String line; + StringBuilder buff = new StringBuilder(); + while ((line = br.readLine()) != null) { + buff.append(line).append("\n"); + } + return buff.toString(); + } + + private static Process executeCommand(String command) throws IOException { + Process pr = executeCommandProcess(command); + + int ev = waitForExitValue(pr); + if (ev != 0) { + String stdout = capture(pr.getInputStream()); + String stderr = capture(pr.getErrorStream()); + throw new IOException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + stdout + + "\nstderr:\n" + + stderr + + "\n"); + } + return pr; + } + + private static int waitForExitValue(Process pr) { + while (true) { + try { + pr.waitFor(); + break; + } catch (InterruptedException ignored) { + } + } + return pr.exitValue(); + } + + private static Process executeCommandProcess(String command) throws IOException { + String[] finalCommand; + if (System.getProperty("os.name").toLowerCase().contains("windows")) { + finalCommand = new String[4]; + finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[1] = "/y"; + finalCommand[2] = "/c"; + finalCommand[3] = command; + } else { + finalCommand = new String[3]; + finalCommand[0] = "/bin/sh"; + finalCommand[1] = "-c"; + finalCommand[2] = command; + } + return Runtime.getRuntime().exec(finalCommand); + } + + public static Process rabbitmqctl(String command) throws IOException { + return rabbitmqctl(command, node1name()); + } + + public static Process rabbitmqctl(String command, String nodename) throws IOException { + return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command); + } + + public static String node1name() { + try { + return System.getProperty( + "node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + public static String node2name() { + try { + return System.getProperty( + "node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + static String rabbitmqctlCommand() { + return System.getProperty("rabbitmqctl.bin"); + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java new file mode 100644 index 0000000000..5dc2256643 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java @@ -0,0 +1,170 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.Broker; +import com.rabbitmq.stream.impl.Client.ClientParameters; +import com.rabbitmq.stream.impl.Client.Response; +import com.rabbitmq.stream.impl.Client.StreamMetadata; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.LoggerFactory; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class LeaderLocatorTest { + + TestUtils.ClientFactory cf; + + @Test + void invalidLocatorShouldReturnError() { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + String s = UUID.randomUUID().toString(); + Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo")); + assertThat(response.isOk()).isFalse(); + assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED); + } + + @Test + void clientLocalLocatorShouldMakeLeaderOnConnectedNode() { + int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()}; + for (int port : ports) { + Client client = cf.get(new Client.ClientParameters().port(port)); + String s = UUID.randomUUID().toString(); + try { + Response response = + client.create(s, Collections.singletonMap("queue-leader-locator", "client-local")); + assertThat(response.isOk()).isTrue(); + StreamMetadata metadata = client.metadata(s).get(s); + assertThat(metadata).isNotNull(); + assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(metadata.getLeader()).isNotNull().extracting(b -> b.getPort()).isEqualTo(port); + } finally { + client.delete(s); + } + } + } + + @Test + void randomLocatorShouldCreateOnAllNodesAfterSomeTime() throws Exception { + int clusterSize = 3; + Set<String> createdStreams = ConcurrentHashMap.newKeySet(); + Set<Broker> leaderNodes = ConcurrentHashMap.newKeySet(clusterSize); + CountDownLatch latch = new CountDownLatch(1); + Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1())); + Runnable runnable = + () -> { + while (leaderNodes.size() < clusterSize && !Thread.interrupted()) { + String s = UUID.randomUUID().toString(); + Response response = + client.create(s, Collections.singletonMap("queue-leader-locator", "random")); + if (!response.isOk()) { + break; + } + createdStreams.add(s); + StreamMetadata metadata = client.metadata(s).get(s); + if (metadata == null || !metadata.isResponseOk() || metadata.getLeader() == null) { + break; + } + leaderNodes.add(metadata.getLeader()); + } + latch.countDown(); + }; + + Thread worker = new Thread(runnable); + worker.start(); + + try { + assertThat(latch.await(10, SECONDS)).isTrue(); + assertThat(leaderNodes).hasSize(clusterSize); + // in case Broker class is broken + assertThat(leaderNodes.stream().map(b -> b.getPort()).collect(Collectors.toSet())) + .hasSize(clusterSize); + } finally { + if (worker.isAlive()) { + worker.interrupt(); + } + createdStreams.forEach( + s -> { + Response response = client.delete(s); + if (!response.isOk()) { + LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream"); + } + }); + } + } + + @Test + void leastLeadersShouldStreamLeadersOnTheCluster() { + int clusterSize = 3; + int streamsByNode = 5; + int streamCount = clusterSize * streamsByNode; + Set<String> createdStreams = ConcurrentHashMap.newKeySet(); + Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1())); + + try { + IntStream.range(0, streamCount) + .forEach( + i -> { + String s = UUID.randomUUID().toString(); + Response response = + client.create( + s, Collections.singletonMap("queue-leader-locator", "least-leaders")); + assertThat(response.isOk()).isTrue(); + createdStreams.add(s); + }); + + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<String, StreamMetadata> metadata = + client.metadata(createdStreams.toArray(new String[] {})); + assertThat(metadata).hasSize(streamCount); + + metadata + .values() + .forEach( + streamMetadata -> { + assertThat(streamMetadata.isResponseOk()).isTrue(); + assertThat(streamMetadata.getLeader()).isNotNull(); + leaderCount.compute( + streamMetadata.getLeader().getPort(), + (port, value) -> value == null ? 1 : ++value); + }); + assertThat(leaderCount).hasSize(clusterSize); + leaderCount.values().forEach(count -> assertThat(count).isEqualTo(streamsByNode)); + } finally { + createdStreams.forEach( + s -> { + Response response = client.delete(s); + if (!response.isOk()) { + LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream"); + } + }); + } + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java new file mode 100644 index 0000000000..08024a12bf --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -0,0 +1,173 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.impl.Client; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class StreamTest { + + String stream; + TestUtils.ClientFactory cf; + + static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() { + return Stream.of( + brokers( + "leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()), + brokers( + "leader", + metadata -> metadata.getLeader(), + "replica", + metadata -> metadata.getReplicas().iterator().next()), + brokers( + "replica", + metadata -> metadata.getReplicas().iterator().next(), + "leader", + metadata -> metadata.getLeader()), + brokers( + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(0), + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(1))); + } + + static Arguments brokers( + String dp, + Function<Client.StreamMetadata, Client.Broker> publisherBroker, + String dc, + Function<Client.StreamMetadata, Client.Broker> consumerBroker) { + return Arguments.of( + new FunctionWithToString<>(dp, publisherBroker), + new FunctionWithToString<>(dc, consumerBroker)); + } + + @ParameterizedTest + @MethodSource + void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember( + Function<Client.StreamMetadata, Client.Broker> publisherBroker, + Function<Client.StreamMetadata, Client.Broker> consumerBroker) + throws Exception { + + int messageCount = 10_000; + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + + CountDownLatch publishingLatch = new CountDownLatch(messageCount); + Client publisher = + cf.get( + new Client.ClientParameters() + .port(publisherBroker.apply(streamMetadata).getPort()) + .publishConfirmListener( + (publisherId, publishingId) -> publishingLatch.countDown())); + + IntStream.range(0, messageCount) + .forEach( + i -> + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(("hello " + i).getBytes(StandardCharsets.UTF_8)) + .build()))); + + assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + CountDownLatch consumingLatch = new CountDownLatch(messageCount); + Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount); + Client consumer = + cf.get( + new Client.ClientParameters() + .port(consumerBroker.apply(streamMetadata).getPort()) + .chunkListener( + (client1, subscriptionId, offset, messageCount1, dataSize) -> + client1.credit(subscriptionId, 10)) + .messageListener( + (subscriptionId, offset, message) -> { + bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumingLatch.countDown(); + })); + + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + + assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies).hasSize(messageCount); + IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); + } + + @Test + void metadataOnClusterShouldReturnLeaderAndReplicas() { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(streamMetadata.getReplicas()).hasSize(2); + + BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent = + (node, anotherNode) -> { + assertThat(node.getHost()).isEqualTo(anotherNode.getHost()); + assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort()); + }; + + streamMetadata + .getReplicas() + .forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader())); + List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas()); + assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1)); + } + + static class FunctionWithToString<T, R> implements Function<T, R> { + + final String toString; + final Function<T, R> delegate; + + FunctionWithToString(String toString, Function<T, R> delegate) { + this.toString = toString; + this.delegate = delegate; + } + + @Override + public R apply(T t) { + return delegate.apply(t); + } + + @Override + public String toString() { + return toString; + } + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java new file mode 100644 index 0000000000..c49a8d5832 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -0,0 +1,179 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import com.rabbitmq.stream.impl.Client; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BooleanSupplier; +import org.junit.jupiter.api.extension.*; + +public class TestUtils { + + static int streamPortNode1() { + String port = System.getProperty("node1.stream.port", "5555"); + return Integer.valueOf(port); + } + + static int streamPortNode2() { + String port = System.getProperty("node2.stream.port", "5556"); + return Integer.valueOf(port); + } + + static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { + if (condition.getAsBoolean()) { + return; + } + int waitTime = 100; + int waitedTime = 0; + long timeoutInMs = duration.toMillis(); + while (waitedTime <= timeoutInMs) { + Thread.sleep(waitTime); + if (condition.getAsBoolean()) { + return; + } + waitedTime += waitTime; + } + fail("Waited " + duration.getSeconds() + " second(s), condition never got true"); + } + + static class StreamTestInfrastructureExtension + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); + + private static ExtensionContext.Store store(ExtensionContext extensionContext) { + return extensionContext.getRoot().getStore(NAMESPACE); + } + + private static EventLoopGroup eventLoopGroup(ExtensionContext context) { + return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); + } + + @Override + public void beforeAll(ExtensionContext context) { + store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + try { + Field streamField = + context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); + streamField.setAccessible(true); + streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); + } catch (NoSuchFieldException e) { + + } + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = UUID.randomUUID().toString(); + streamField.set(context.getTestInstance().get(), stream); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.create(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).put("testMethodStream", stream); + } catch (NoSuchFieldException e) { + + } + + for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { + if (declaredField.getType().equals(ClientFactory.class)) { + declaredField.setAccessible(true); + ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); + declaredField.set(context.getTestInstance().get(), clientFactory); + store(context).put("testClientFactory", clientFactory); + break; + } + } + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = (String) streamField.get(context.getTestInstance().get()); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.delete(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).remove("testMethodStream"); + } catch (NoSuchFieldException e) { + + } + + ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); + if (clientFactory != null) { + clientFactory.close(); + } + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + EventLoopGroup eventLoopGroup = eventLoopGroup(context); + eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); + } + } + + static class ClientFactory { + + private final EventLoopGroup eventLoopGroup; + private final Set<Client> clients = ConcurrentHashMap.newKeySet(); + + public ClientFactory(EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + } + + public Client get() { + return get(new Client.ClientParameters()); + } + + public Client get(Client.ClientParameters parameters) { + // don't set the port, it would override the caller's port setting + Client client = new Client(parameters.eventLoopGroup(eventLoopGroup)); + clients.add(client); + return client; + } + + private void close() { + for (Client c : clients) { + c.close(); + } + } + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..45d598991d --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml @@ -0,0 +1,13 @@ +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="com.rabbitmq.stream" level="info" /> + + <root level="info"> + <appender-ref ref="STDOUT" /> + </root> +</configuration>
\ No newline at end of file |