summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stream/test
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_stream/test
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_stream/test')
-rw-r--r--deps/rabbitmq_stream/test/command_SUITE.erl136
-rw-r--r--deps/rabbitmq_stream/test/config_schema_SUITE.erl53
-rw-r--r--deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets73
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl266
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore3
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java117
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties2
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile18
-rwxr-xr-xdeps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw310
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd182
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml143
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java65
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java541
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java117
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java170
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java173
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java179
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml13
18 files changed, 2561 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl
new file mode 100644
index 0000000000..41ab5904ff
--- /dev/null
+++ b/deps/rabbitmq_stream/test/command_SUITE.erl
@@ -0,0 +1,136 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(command_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stream.hrl").
+
+
+-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ merge_defaults,
+ run
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+merge_defaults(_Config) ->
+ {[<<"conn_name">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([], #{}),
+
+ {[<<"other_key">>], #{verbose := true}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
+
+ {[<<"other_key">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
+
+
+run(Config) ->
+
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts = #{node => Node, timeout => 10000, verbose => false},
+
+ %% No connections
+ [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
+
+ StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
+
+ S1 = start_stream_connection(StreamPort),
+ ct:sleep(100),
+
+ [[{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ S2 = start_stream_connection(StreamPort),
+ ct:sleep(100),
+
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ start_amqp_connection(network, Node, Port),
+
+ %% There are still just two connections
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ start_amqp_connection(direct, Node, Port),
+
+ %% Still two MQTT connections, one direct AMQP 0-9-1 connection
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ %% Verbose returns all keys
+ Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
+
+ %% There are two connections
+ [First, _Second] = AllKeys,
+
+ %% Keys are INFO_ITEMS
+ KeysCount = length(?INFO_ITEMS),
+ KeysCount = length(First),
+
+ {Keys, _} = lists:unzip(First),
+
+ [] = Keys -- ?INFO_ITEMS,
+ [] = ?INFO_ITEMS -- Keys,
+
+ rabbit_stream_SUITE:test_close(S1),
+ rabbit_stream_SUITE:test_close(S2),
+ ok.
+
+start_stream_connection(Port) ->
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
+ {mode, binary}]),
+ rabbit_stream_SUITE:test_peer_properties(S),
+ rabbit_stream_SUITE:test_authenticate(S),
+ S.
+
+start_amqp_connection(Type, Node, Port) ->
+ Params = amqp_params(Type, Node, Port),
+ {ok, _Connection} = amqp_connection:start(Params).
+
+amqp_params(network, _, Port) ->
+ #amqp_params_network{port = Port};
+amqp_params(direct, Node, _) ->
+ #amqp_params_direct{node = Node}.
diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE.erl b/deps/rabbitmq_stream/test/config_schema_SUITE.erl
new file mode 100644
index 0000000000..a298811541
--- /dev/null
+++ b/deps/rabbitmq_stream/test/config_schema_SUITE.erl
@@ -0,0 +1,53 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(config_schema_SUITE).
+
+-compile(export_all).
+
+all() ->
+ [
+ run_snippets
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:run_setup_steps(Config),
+ rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+run_snippets(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, run_snippets1, [Config]).
+
+run_snippets1(Config) ->
+ rabbit_ct_config_schema:run_snippets(Config).
diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets b/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets
new file mode 100644
index 0000000000..8f60ef9710
--- /dev/null
+++ b/deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets
@@ -0,0 +1,73 @@
+[{listener_port,
+ "stream.listeners.tcp.1 = 12345",
+ [{rabbitmq_stream,[{tcp_listeners,[12345]}]}],
+ [rabbitmq_stream]},
+ {listeners_ip,
+ "stream.listeners.tcp.1 = 127.0.0.1:5555
+ stream.listeners.tcp.2 = ::1:5555",
+ [{rabbitmq_stream,[{tcp_listeners,[{"127.0.0.1",5555},{"::1",5555}]}]}],
+ [rabbitmq_stream]},
+
+ {listener_tcp_options,
+ "stream.listeners.tcp.1 = 127.0.0.1:5555
+ stream.listeners.tcp.2 = ::1:5555
+
+ stream.tcp_listen_options.backlog = 2048
+ stream.tcp_listen_options.recbuf = 8192
+ stream.tcp_listen_options.sndbuf = 8192
+
+ stream.tcp_listen_options.keepalive = true
+ stream.tcp_listen_options.nodelay = true
+
+ stream.tcp_listen_options.exit_on_close = true
+
+ stream.tcp_listen_options.send_timeout = 120
+",
+ [{rabbitmq_stream,[
+ {tcp_listeners,[
+ {"127.0.0.1",5555},
+ {"::1",5555}
+ ]}
+ , {tcp_listen_options, [
+ {backlog, 2048},
+ {exit_on_close, true},
+
+ {recbuf, 8192},
+ {sndbuf, 8192},
+
+ {send_timeout, 120},
+
+ {keepalive, true},
+ {nodelay, true}
+ ]}
+ ]}],
+ [rabbitmq_stream]},
+ {defaults,
+ "stream.frame_max = 1048576
+ stream.heartbeat = 60
+ stream.initial_credits = 50000
+ stream.credits_required_for_unblocking = 12500",
+ [{rabbitmq_stream,[{initial_credits, 50000},
+ {credits_required_for_unblocking, 12500},
+ {frame_max, 1048576},
+ {heartbeat, 60}]}],
+ [rabbitmq_stream]},
+ {advertised_host_port,
+ "stream.advertised_host = some-host
+ stream.advertised_port = 5556",
+ [{rabbitmq_stream,[{advertised_host, <<"some-host">>},
+ {advertised_port, 5556}]}],
+ [rabbitmq_stream]},
+ {credits,
+ "stream.frame_max = 2097152
+ stream.heartbeat = 120",
+ [{rabbitmq_stream,[{frame_max, 2097152},
+ {heartbeat, 120}]}],
+ [rabbitmq_stream]},
+ {protocol,
+ "stream.initial_credits = 100000
+ stream.credits_required_for_unblocking = 25000",
+ [{rabbitmq_stream,[{initial_credits, 100000},
+ {credits_required_for_unblocking, 25000}]}],
+ [rabbitmq_stream]}
+]. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
new file mode 100644
index 0000000000..4197b1de71
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -0,0 +1,266 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include("rabbit_stream.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, single_node},
+ {group, cluster}
+ ].
+
+groups() ->
+ [
+ {single_node, [], [test_stream]},
+ {cluster, [], [test_stream, java]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(single_node, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps());
+init_per_group(cluster = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ rabbit_ct_helpers:run_setup_steps(Config2,
+ [fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {aten, [{poll_interval, 1000}]})
+ end] ++
+ rabbit_ct_broker_helpers:setup_steps());
+init_per_group(_, Config) ->
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_group(java, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config);
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_Test, _Config) ->
+ ok.
+
+test_stream(Config) ->
+ Port = get_stream_port(Config),
+ test_server(Port),
+ ok.
+
+java(Config) ->
+ StreamPortNode1 = get_stream_port(Config, 0),
+ StreamPortNode2 = get_stream_port(Config, 1),
+ Node1Name = get_node_name(Config, 0),
+ Node2Name = get_node_name(Config, 1),
+ RabbitMqCtl = get_rabbitmqctl(Config),
+ DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
+ MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests",
+ {"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
+ {"NODE1_NAME=~p", [Node1Name]},
+ {"NODE2_NAME=~p", [Node2Name]},
+ {"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
+ {"RABBITMQCTL=~p", [RabbitMqCtl]}
+ ]),
+ {ok, _} = MakeResult.
+
+get_rabbitmqctl(Config) ->
+ rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd).
+
+get_stream_port(Config) ->
+ get_stream_port(Config, 0).
+
+get_stream_port(Config, Node) ->
+ rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream).
+
+get_node_name(Config) ->
+ get_node_name(Config, 0).
+
+get_node_name(Config, Node) ->
+ rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
+
+test_server(Port) ->
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
+ {mode, binary}]),
+ test_peer_properties(S),
+ test_authenticate(S),
+ Stream = <<"stream1">>,
+ test_create_stream(S, Stream),
+ Body = <<"hello">>,
+ test_publish_confirm(S, Stream, Body),
+ SubscriptionId = 42,
+ Rest = test_subscribe(S, SubscriptionId, Stream),
+ test_deliver(S, Rest, SubscriptionId, Body),
+ test_delete_stream(S, Stream),
+ test_metadata_update_stream_deleted(S, Stream),
+ test_close(S),
+ closed = wait_for_socket_close(S, 10),
+ ok.
+
+test_peer_properties(S) ->
+ PeerPropertiesFrame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
+ PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
+ gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
+ {ok, <<_Size:32, ?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, _Rest/binary>>} = gen_tcp:recv(S, 0, 5000).
+
+test_authenticate(S) ->
+ SaslHandshakeFrame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
+ SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
+ gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
+ Plain = <<"PLAIN">>,
+ AmqPlain = <<"AMQPLAIN">>,
+ {ok, SaslAvailable} = gen_tcp:recv(S, 0, 5000),
+ %% mechanisms order is not deterministic, so checking both orders
+ ok = case SaslAvailable of
+ <<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32,
+ 5:16, Plain:5/binary, 8:16, AmqPlain:8/binary>> ->
+ ok;
+ <<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32,
+ 8:16, AmqPlain:8/binary, 5:16, Plain:5/binary>> ->
+ ok;
+ _ ->
+ failed
+ end,
+
+ Username = <<"guest">>,
+ Password = <<"guest">>,
+ Null = 0,
+ PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
+ PlainSaslSize = byte_size(PlainSasl),
+
+ SaslAuthenticateFrame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32,
+ 5:16, Plain/binary, PlainSaslSize:32, PlainSasl/binary>>,
+
+ SaslAuthenticateFrameSize = byte_size(SaslAuthenticateFrame),
+
+ gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>),
+
+ {ok, <<10:32, ?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32, ?RESPONSE_CODE_OK:16, RestTune/binary>>} = gen_tcp:recv(S, 0, 5000),
+
+ TuneExpected = <<12:32, ?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>,
+ case RestTune of
+ <<>> ->
+ {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000);
+ TuneReceived ->
+ TuneExpected = TuneReceived
+ end,
+
+ TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
+ TuneFrameSize = byte_size(TuneFrame),
+ gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>),
+
+ VirtualHost = <<"/">>,
+ VirtualHostLength = byte_size(VirtualHost),
+ OpenFrame = <<?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>,
+ OpenFrameSize = byte_size(OpenFrame),
+ gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>),
+ {ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
+
+
+test_create_stream(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary, 0:32>>,
+ FrameSize = byte_size(CreateStreamFrame),
+ gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
+ {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
+
+test_delete_stream(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
+ FrameSize = byte_size(DeleteStreamFrame),
+ gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>),
+ ResponseFrameSize = 10,
+ {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
+
+test_publish_confirm(S, Stream, Body) ->
+ BodySize = byte_size(Body),
+ StreamSize = byte_size(Stream),
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
+ FrameSize = byte_size(PublishFrame),
+ gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
+ {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
+
+test_subscribe(S, SubscriptionId, Stream) ->
+ StreamSize = byte_size(Stream),
+ SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:8, StreamSize:16, Stream:StreamSize/binary,
+ ?OFFSET_TYPE_OFFSET:16, 0:64, 10:16>>,
+ FrameSize = byte_size(SubscribeFrame),
+ gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
+ Res = gen_tcp:recv(S, 0, 5000),
+ {ok, <<_Size:32, ?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
+ Rest.
+
+test_deliver(S, Rest, SubscriptionId, Body) ->
+ BodySize = byte_size(Body),
+ Frame = read_frame(S, Rest),
+ <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
+ 1:16, 1:32,
+ _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
+ 0:1, BodySize:31/unsigned, Body/binary>> = Frame.
+
+test_metadata_update_stream_deleted(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000).
+
+test_close(S) ->
+ CloseReason = <<"OK">>,
+ CloseReasonSize = byte_size(CloseReason),
+ CloseFrame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, CloseReasonSize:16, CloseReason/binary>>,
+ CloseFrameSize = byte_size(CloseFrame),
+ gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>),
+ {ok, <<10:32, ?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
+
+wait_for_socket_close(_S, 0) ->
+ not_closed;
+wait_for_socket_close(S, Attempt) ->
+ case gen_tcp:recv(S, 0, 1000) of
+ {error, timeout} ->
+ wait_for_socket_close(S, Attempt - 1);
+ {error, closed} ->
+ closed
+ end.
+
+read_frame(S, Buffer) ->
+ inet:setopts(S, [{active, once}]),
+ receive
+ {tcp, S, Received} ->
+ Data = <<Buffer/binary, Received/binary>>,
+ case Data of
+ <<Size:32, _Body:Size/binary>> ->
+ Data;
+ _ ->
+ read_frame(S, Data)
+ end
+ after
+ 1000 ->
+ inet:setopts(S, [{active, false}]),
+ Buffer
+ end. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore
new file mode 100644
index 0000000000..4c70cdb707
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.gitignore
@@ -0,0 +1,3 @@
+/build/
+/lib/
+/target/
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java
new file mode 100644
index 0000000000..b901097f2d
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2007-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+ private static final String WRAPPER_VERSION = "0.5.6";
+ /**
+ * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+ */
+ private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+ + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
+
+ /**
+ * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+ * use instead of the default one.
+ */
+ private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+ ".mvn/wrapper/maven-wrapper.properties";
+
+ /**
+ * Path where the maven-wrapper.jar will be saved to.
+ */
+ private static final String MAVEN_WRAPPER_JAR_PATH =
+ ".mvn/wrapper/maven-wrapper.jar";
+
+ /**
+ * Name of the property which should be used to override the default download url for the wrapper.
+ */
+ private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+ public static void main(String args[]) {
+ System.out.println("- Downloader started");
+ File baseDirectory = new File(args[0]);
+ System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+ // If the maven-wrapper.properties exists, read it and check if it contains a custom
+ // wrapperUrl parameter.
+ File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+ String url = DEFAULT_DOWNLOAD_URL;
+ if(mavenWrapperPropertyFile.exists()) {
+ FileInputStream mavenWrapperPropertyFileInputStream = null;
+ try {
+ mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+ Properties mavenWrapperProperties = new Properties();
+ mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+ url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+ } catch (IOException e) {
+ System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+ } finally {
+ try {
+ if(mavenWrapperPropertyFileInputStream != null) {
+ mavenWrapperPropertyFileInputStream.close();
+ }
+ } catch (IOException e) {
+ // Ignore ...
+ }
+ }
+ }
+ System.out.println("- Downloading from: " + url);
+
+ File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+ if(!outputFile.getParentFile().exists()) {
+ if(!outputFile.getParentFile().mkdirs()) {
+ System.out.println(
+ "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+ }
+ }
+ System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+ try {
+ downloadFileFromURL(url, outputFile);
+ System.out.println("Done");
+ System.exit(0);
+ } catch (Throwable e) {
+ System.out.println("- Error downloading");
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+ if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
+ String username = System.getenv("MVNW_USERNAME");
+ char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
+ Authenticator.setDefault(new Authenticator() {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(username, password);
+ }
+ });
+ }
+ URL website = new URL(urlString);
+ ReadableByteChannel rbc;
+ rbc = Channels.newChannel(website.openStream());
+ FileOutputStream fos = new FileOutputStream(destination);
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+ fos.close();
+ rbc.close();
+ }
+
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000000..642d572ce9
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,2 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile
new file mode 100644
index 0000000000..89be00931c
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile
@@ -0,0 +1,18 @@
+export PATH :=$(CURDIR):$(PATH)
+HOSTNAME := $(shell hostname)
+MVN_FLAGS += -Dhostname=$(HOSTNAME) \
+ -Dnode1.stream.port=$(NODE1_STREAM_PORT) \
+ -Dnode1.name=$(NODE1_NAME) \
+ -Dnode2.name=$(NODE2_NAME) \
+ -Dnode2.stream.port=$(NODE2_STREAM_PORT) \
+ -Drabbitmqctl.bin=$(RABBITMQCTL)
+
+.PHONY: tests clean
+
+tests:
+ # Note: to run a single test
+ # @mvnw -q $(MVN_FLAGS) -Dtest=StreamTest#metadataOnClusterShouldReturnLeaderAndReplicas test
+ @mvnw $(MVN_FLAGS) test
+
+clean:
+ @mvnw clean
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw
new file mode 100755
index 0000000000..41c0f0c23d
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ if [ -n "$MVNW_REPOURL" ]; then
+ jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
+ else
+ jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
+ fi
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+ if $cygwin; then
+ wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+ fi
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ wget "$jarUrl" -O "$wrapperJarPath"
+ else
+ wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+ fi
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ curl -o "$wrapperJarPath" "$jarUrl" -f
+ else
+ curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+ fi
+
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ # For Cygwin, switch paths to Windows format before running javac
+ if $cygwin; then
+ javaClass=`cygpath --path --windows "$javaClass"`
+ fi
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd
new file mode 100644
index 0000000000..86115719e5
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Found %WRAPPER_JAR%
+ )
+) else (
+ if not "%MVNW_REPOURL%" == "" (
+ SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
+ )
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ )
+
+ powershell -Command "&{"^
+ "$webclient = new-object System.Net.WebClient;"^
+ "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+ "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+ "}"^
+ "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+ "}"
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Finished downloading %WRAPPER_JAR%
+ )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
new file mode 100644
index 0000000000..aa27c29baf
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.rabbitmq.stream</groupId>
+ <artifactId>rabbitmq-stream-tests</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <licenses>
+ <license>
+ <name>MPL 2.0</name>
+ <url>https://www.mozilla.org/en-US/MPL/2.0/</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <developers>
+ <developer>
+ <email>info@rabbitmq.com</email>
+ <name>Team RabbitMQ</name>
+ <organization>VMware, Inc. or its affiliates.</organization>
+ <organizationUrl>https://rabbitmq.com</organizationUrl>
+ </developer>
+ </developers>
+
+ <properties>
+ <stream-client.version>0.1.0-SNAPSHOT</stream-client.version>
+ <proton-j.version>0.33.6</proton-j.version>
+ <junit.jupiter.version>5.7.0</junit.jupiter.version>
+ <assertj.version>3.17.2</assertj.version>
+ <mockito.version>3.5.11</mockito.version>
+ <logback.version>1.2.3</logback.version>
+ <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
+ <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
+ <spotless.version>2.2.0</spotless.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>stream-client</artifactId>
+ <version>${stream-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>proton-j</artifactId>
+ <version>${proton-j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+
+ <plugins>
+
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven.compiler.plugin.version}</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <compilerArgs>
+ <arg>-Xlint:deprecation</arg>
+ <arg>-Xlint:unchecked</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ </plugin>
+
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.9</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ </java>
+ </configuration>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+ <repositories>
+
+ <repository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <snapshots><enabled>true</enabled></snapshots>
+ <releases><enabled>false</enabled></releases>
+ </repository>
+
+ </repositories>
+
+</project> \ No newline at end of file
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
new file mode 100644
index 0000000000..993c19b852
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
@@ -0,0 +1,65 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.Client.StreamMetadata;
+import java.util.Collections;
+import java.util.UUID;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class ClusterSizeTest {
+
+ TestUtils.ClientFactory cf;
+
+ @ParameterizedTest
+ @ValueSource(strings = {"-1", "0"})
+ void clusterSizeZeroShouldReturnError(String clusterSize) {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize));
+ assertThat(response.isOk()).isFalse();
+ assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"1,1", "2,2", "3,3", "5,3"})
+ void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize) {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ try {
+ Response response =
+ client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize));
+ assertThat(response.isOk()).isTrue();
+ StreamMetadata metadata = client.metadata(s).get(s);
+ assertThat(metadata).isNotNull();
+ assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ int actualClusterSize = metadata.getLeader() == null ? 0 : 1 + metadata.getReplicas().size();
+ assertThat(actualClusterSize).isEqualTo(expectedClusterSize);
+ } finally {
+ client.delete(s);
+ }
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
new file mode 100644
index 0000000000..c7a390f00d
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -0,0 +1,541 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import com.rabbitmq.stream.codec.WrapperMessageBuilder;
+import com.rabbitmq.stream.impl.Client;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class FailureTest {
+
+ TestUtils.ClientFactory cf;
+ String stream;
+ ExecutorService executorService;
+
+ static void wait(Duration duration) {
+ try {
+ Thread.sleep(duration.toMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Test
+ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
+ Set<String> messages = new HashSet<>();
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+ assertThat(streamMetadata.getReplicas()).isNotEmpty();
+ Client.Broker replica = streamMetadata.getReplicas().get(0);
+ assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1());
+
+ AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1));
+
+ CountDownLatch metadataLatch = new CountDownLatch(1);
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
+ .port(replica.getPort())
+ .metadataListener((stream, code) -> metadataLatch.countDown())
+ .publishConfirmListener(
+ (publisherId, publishingId) -> confirmLatch.get().countDown()));
+ String message = "all nodes available";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+
+ try {
+ Host.rabbitmqctl("stop_app");
+ try {
+ cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ fail("Node app stopped, connecting should not be possible");
+ } catch (Exception e) {
+ // OK
+ }
+
+ assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ // wait until there's a new leader
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(10),
+ () -> {
+ Client.StreamMetadata m = publisher.metadata(stream).get(stream);
+ return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
+ });
+
+ confirmLatch.set(new CountDownLatch(1));
+ message = "2 nodes available";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher
+ .messageBuilder()
+ .addData(message.getBytes(StandardCharsets.UTF_8))
+ .build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+ } finally {
+ Host.rabbitmqctl("start_app");
+ }
+
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = publisher.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
+ });
+
+ confirmLatch.set(new CountDownLatch(1));
+ message = "all nodes are back";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+
+ CountDownLatch consumeLatch = new CountDownLatch(2);
+ Set<String> bodies = ConcurrentHashMap.newKeySet();
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(TestUtils.streamPortNode1())
+ .messageListener(
+ (subscriptionId, offset, msg) -> {
+ bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
+ consumeLatch.countDown();
+ }));
+
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ return response.isOk();
+ });
+ assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(bodies)
+ .hasSize(3)
+ .contains("all nodes available", "2 nodes available", "all nodes are back");
+ }
+
+ @Test
+ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+
+ Map<Long, Message> published = new ConcurrentHashMap<>();
+ Set<Message> confirmed = ConcurrentHashMap.newKeySet();
+
+ Client.PublishConfirmListener publishConfirmListener =
+ (publisherId, publishingId) -> {
+ Message confirmedMessage;
+ int attempts = 0;
+ while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
+ wait(Duration.ofMillis(5));
+ attempts++;
+ }
+ confirmed.add(confirmedMessage);
+ };
+
+ AtomicLong generation = new AtomicLong(0);
+ AtomicLong sequence = new AtomicLong(0);
+ AtomicBoolean connected = new AtomicBoolean(true);
+ AtomicReference<Client> publisher = new AtomicReference<>();
+ CountDownLatch reconnectionLatch = new CountDownLatch(1);
+ AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
+ Client.ShutdownListener shutdownListener =
+ shutdownContext -> {
+ if (shutdownContext.getShutdownReason()
+ == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
+ // avoid long-running task in the IO thread
+ executorService.submit(
+ () -> {
+ connected.set(false);
+
+ Client locator =
+ cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
+ // wait until there's a new leader
+ try {
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = locator.metadata(stream).get(stream);
+ return m.getLeader() != null
+ && m.getLeader().getPort() != TestUtils.streamPortNode1();
+ });
+ } catch (Throwable e) {
+ reconnectionLatch.countDown();
+ return;
+ }
+
+ int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
+ Client newPublisher =
+ cf.get(
+ new Client.ClientParameters()
+ .port(newLeaderPort)
+ .shutdownListener(shutdownListenerReference.get())
+ .publishConfirmListener(publishConfirmListener));
+
+ generation.incrementAndGet();
+ published.clear();
+ publisher.set(newPublisher);
+ connected.set(true);
+
+ reconnectionLatch.countDown();
+ });
+ }
+ };
+ shutdownListenerReference.set(shutdownListener);
+
+ client =
+ cf.get(
+ new Client.ClientParameters()
+ .port(streamMetadata.getLeader().getPort())
+ .shutdownListener(shutdownListener)
+ .publishConfirmListener(publishConfirmListener));
+
+ publisher.set(client);
+
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+
+ executorService.submit(
+ () -> {
+ while (keepPublishing.get()) {
+ if (connected.get()) {
+ Message message =
+ publisher
+ .get()
+ .messageBuilder()
+ .properties()
+ .messageId(sequence.getAndIncrement())
+ .messageBuilder()
+ .applicationProperties()
+ .entry("generation", generation.get())
+ .messageBuilder()
+ .build();
+ try {
+ long publishingId =
+ publisher
+ .get()
+ .publish(stream, (byte) 1, Collections.singletonList(message))
+ .get(0);
+ published.put(publishingId, message);
+ } catch (Exception e) {
+ // keep going
+ }
+ wait(Duration.ofMillis(10));
+ } else {
+ wait(Duration.ofSeconds(1));
+ }
+ }
+ });
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ assertThat(confirmed).isNotEmpty();
+ int confirmedCount = confirmed.size();
+
+ try {
+ Host.rabbitmqctl("stop_app");
+
+ assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ } finally {
+ Host.rabbitmqctl("start_app");
+ }
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ confirmedCount = confirmed.size();
+
+ Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
+ });
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+
+ keepPublishing.set(false);
+
+ Queue<Message> consumed = new ConcurrentLinkedQueue<>();
+ Set<Long> generations = ConcurrentHashMap.newKeySet();
+ CountDownLatch consumedLatch = new CountDownLatch(1);
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(m.getReplicas().get(0).getPort())
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(
+ (subscriptionId, offset, message) -> {
+ consumed.add(message);
+ generations.add((Long) message.getApplicationProperties().get("generation"));
+ if (consumed.size() == confirmed.size()) {
+ consumedLatch.countDown();
+ }
+ }));
+
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ assertThat(response.isOk()).isTrue();
+
+ assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(generations).hasSize(2).contains(0L, 1L);
+ assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
+ long lastMessageId = -1;
+ for (Message message : consumed) {
+ long messageId = message.getProperties().getMessageIdAsLong();
+ assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
+ lastMessageId = messageId;
+ }
+ assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+ }
+
+ @Test
+ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+
+ Map<Long, Message> published = new ConcurrentHashMap<>();
+ Set<Message> confirmed = ConcurrentHashMap.newKeySet();
+ Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
+ Client.PublishConfirmListener publishConfirmListener =
+ (publisherId, publishingId) -> {
+ Message confirmedMessage;
+ int attempts = 0;
+ while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
+ wait(Duration.ofMillis(5));
+ attempts++;
+ }
+ confirmed.add(confirmedMessage);
+ confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong());
+ };
+
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
+ .port(streamMetadata.getLeader().getPort())
+ .publishConfirmListener(publishConfirmListener));
+
+ AtomicLong generation = new AtomicLong(0);
+ AtomicLong sequence = new AtomicLong(0);
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ CountDownLatch publishingLatch = new CountDownLatch(1);
+
+ executorService.submit(
+ () -> {
+ while (keepPublishing.get()) {
+ Message message =
+ new WrapperMessageBuilder()
+ .properties()
+ .messageId(sequence.getAndIncrement())
+ .messageBuilder()
+ .applicationProperties()
+ .entry("generation", generation.get())
+ .messageBuilder()
+ .build();
+ try {
+ long publishingId =
+ publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
+ published.put(publishingId, message);
+ } catch (Exception e) {
+ // keep going
+ }
+ wait(Duration.ofMillis(10));
+ }
+ publishingLatch.countDown();
+ });
+
+ Queue<Message> consumed = new ConcurrentLinkedQueue<>();
+
+ Client.Broker replica =
+ streamMetadata.getReplicas().stream()
+ .filter(broker -> broker.getPort() == TestUtils.streamPortNode2())
+ .findFirst()
+ .orElseThrow(() -> new NoSuchElementException());
+
+ AtomicLong lastProcessedOffset = new AtomicLong(-1);
+ Set<Long> generations = ConcurrentHashMap.newKeySet();
+ Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
+ Client.MessageListener messageListener =
+ (subscriptionId, offset, message) -> {
+ consumed.add(message);
+ generations.add((Long) message.getApplicationProperties().get("generation"));
+ consumedIds.add(message.getProperties().getMessageIdAsLong());
+ lastProcessedOffset.set(offset);
+ };
+
+ CountDownLatch reconnectionLatch = new CountDownLatch(1);
+ AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
+ Client.ShutdownListener shutdownListener =
+ shutdownContext -> {
+ if (shutdownContext.getShutdownReason()
+ == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
+ // avoid long-running task in the IO thread
+ executorService.submit(
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ int newReplicaPort = m.getReplicas().get(0).getPort();
+
+ Client newConsumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(newReplicaPort)
+ .shutdownListener(shutdownListenerReference.get())
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(messageListener));
+
+ newConsumer.subscribe(
+ (byte) 1,
+ stream,
+ OffsetSpecification.offset(lastProcessedOffset.get() + 1),
+ 10);
+
+ generation.incrementAndGet();
+ reconnectionLatch.countDown();
+ });
+ }
+ };
+ shutdownListenerReference.set(shutdownListener);
+
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(replica.getPort())
+ .shutdownListener(shutdownListener)
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(messageListener));
+
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ assertThat(response.isOk()).isTrue();
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ assertThat(confirmed).isNotEmpty();
+ assertThat(consumed).isNotEmpty();
+ int confirmedCount = confirmed.size();
+
+ try {
+ Host.rabbitmqctl("stop_app", Host.node2name());
+
+ assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ } finally {
+ Host.rabbitmqctl("start_app", Host.node2name());
+ }
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ confirmedCount = confirmed.size();
+
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
+ });
+
+ // let's publish for a bit of time
+ Thread.sleep(2000);
+
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+
+ keepPublishing.set(false);
+
+ assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
+
+ assertThat(generations).hasSize(2).contains(0L, 1L);
+ assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
+ long lastMessageId = -1;
+ for (Message message : consumed) {
+ long messageId = message.getProperties().getMessageIdAsLong();
+ assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
+ lastMessageId = messageId;
+ }
+ assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+
+ confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java
new file mode 100644
index 0000000000..0134038a8b
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java
@@ -0,0 +1,117 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class Host {
+
+ private static String capture(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ String line;
+ StringBuilder buff = new StringBuilder();
+ while ((line = br.readLine()) != null) {
+ buff.append(line).append("\n");
+ }
+ return buff.toString();
+ }
+
+ private static Process executeCommand(String command) throws IOException {
+ Process pr = executeCommandProcess(command);
+
+ int ev = waitForExitValue(pr);
+ if (ev != 0) {
+ String stdout = capture(pr.getInputStream());
+ String stderr = capture(pr.getErrorStream());
+ throw new IOException(
+ "unexpected command exit value: "
+ + ev
+ + "\ncommand: "
+ + command
+ + "\n"
+ + "\nstdout:\n"
+ + stdout
+ + "\nstderr:\n"
+ + stderr
+ + "\n");
+ }
+ return pr;
+ }
+
+ private static int waitForExitValue(Process pr) {
+ while (true) {
+ try {
+ pr.waitFor();
+ break;
+ } catch (InterruptedException ignored) {
+ }
+ }
+ return pr.exitValue();
+ }
+
+ private static Process executeCommandProcess(String command) throws IOException {
+ String[] finalCommand;
+ if (System.getProperty("os.name").toLowerCase().contains("windows")) {
+ finalCommand = new String[4];
+ finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
+ finalCommand[1] = "/y";
+ finalCommand[2] = "/c";
+ finalCommand[3] = command;
+ } else {
+ finalCommand = new String[3];
+ finalCommand[0] = "/bin/sh";
+ finalCommand[1] = "-c";
+ finalCommand[2] = command;
+ }
+ return Runtime.getRuntime().exec(finalCommand);
+ }
+
+ public static Process rabbitmqctl(String command) throws IOException {
+ return rabbitmqctl(command, node1name());
+ }
+
+ public static Process rabbitmqctl(String command, String nodename) throws IOException {
+ return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
+ }
+
+ public static String node1name() {
+ try {
+ return System.getProperty(
+ "node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String node2name() {
+ try {
+ return System.getProperty(
+ "node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static String rabbitmqctlCommand() {
+ return System.getProperty("rabbitmqctl.bin");
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
new file mode 100644
index 0000000000..5dc2256643
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
@@ -0,0 +1,170 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Broker;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.Client.StreamMetadata;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class LeaderLocatorTest {
+
+ TestUtils.ClientFactory cf;
+
+ @Test
+ void invalidLocatorShouldReturnError() {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo"));
+ assertThat(response.isOk()).isFalse();
+ assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ }
+
+ @Test
+ void clientLocalLocatorShouldMakeLeaderOnConnectedNode() {
+ int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()};
+ for (int port : ports) {
+ Client client = cf.get(new Client.ClientParameters().port(port));
+ String s = UUID.randomUUID().toString();
+ try {
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "client-local"));
+ assertThat(response.isOk()).isTrue();
+ StreamMetadata metadata = client.metadata(s).get(s);
+ assertThat(metadata).isNotNull();
+ assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ assertThat(metadata.getLeader()).isNotNull().extracting(b -> b.getPort()).isEqualTo(port);
+ } finally {
+ client.delete(s);
+ }
+ }
+ }
+
+ @Test
+ void randomLocatorShouldCreateOnAllNodesAfterSomeTime() throws Exception {
+ int clusterSize = 3;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Set<Broker> leaderNodes = ConcurrentHashMap.newKeySet(clusterSize);
+ CountDownLatch latch = new CountDownLatch(1);
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+ Runnable runnable =
+ () -> {
+ while (leaderNodes.size() < clusterSize && !Thread.interrupted()) {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "random"));
+ if (!response.isOk()) {
+ break;
+ }
+ createdStreams.add(s);
+ StreamMetadata metadata = client.metadata(s).get(s);
+ if (metadata == null || !metadata.isResponseOk() || metadata.getLeader() == null) {
+ break;
+ }
+ leaderNodes.add(metadata.getLeader());
+ }
+ latch.countDown();
+ };
+
+ Thread worker = new Thread(runnable);
+ worker.start();
+
+ try {
+ assertThat(latch.await(10, SECONDS)).isTrue();
+ assertThat(leaderNodes).hasSize(clusterSize);
+ // in case Broker class is broken
+ assertThat(leaderNodes.stream().map(b -> b.getPort()).collect(Collectors.toSet()))
+ .hasSize(clusterSize);
+ } finally {
+ if (worker.isAlive()) {
+ worker.interrupt();
+ }
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+
+ @Test
+ void leastLeadersShouldStreamLeadersOnTheCluster() {
+ int clusterSize = 3;
+ int streamsByNode = 5;
+ int streamCount = clusterSize * streamsByNode;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+
+ try {
+ IntStream.range(0, streamCount)
+ .forEach(
+ i -> {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(
+ s, Collections.singletonMap("queue-leader-locator", "least-leaders"));
+ assertThat(response.isOk()).isTrue();
+ createdStreams.add(s);
+ });
+
+ Map<Integer, Integer> leaderCount = new HashMap<>();
+ Map<String, StreamMetadata> metadata =
+ client.metadata(createdStreams.toArray(new String[] {}));
+ assertThat(metadata).hasSize(streamCount);
+
+ metadata
+ .values()
+ .forEach(
+ streamMetadata -> {
+ assertThat(streamMetadata.isResponseOk()).isTrue();
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ leaderCount.compute(
+ streamMetadata.getLeader().getPort(),
+ (port, value) -> value == null ? 1 : ++value);
+ });
+ assertThat(leaderCount).hasSize(clusterSize);
+ leaderCount.values().forEach(count -> assertThat(count).isEqualTo(streamsByNode));
+ } finally {
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
new file mode 100644
index 0000000000..08024a12bf
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -0,0 +1,173 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class StreamTest {
+
+ String stream;
+ TestUtils.ClientFactory cf;
+
+ static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() {
+ return Stream.of(
+ brokers(
+ "leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()),
+ brokers(
+ "leader",
+ metadata -> metadata.getLeader(),
+ "replica",
+ metadata -> metadata.getReplicas().iterator().next()),
+ brokers(
+ "replica",
+ metadata -> metadata.getReplicas().iterator().next(),
+ "leader",
+ metadata -> metadata.getLeader()),
+ brokers(
+ "replica",
+ metadata -> new ArrayList<>(metadata.getReplicas()).get(0),
+ "replica",
+ metadata -> new ArrayList<>(metadata.getReplicas()).get(1)));
+ }
+
+ static Arguments brokers(
+ String dp,
+ Function<Client.StreamMetadata, Client.Broker> publisherBroker,
+ String dc,
+ Function<Client.StreamMetadata, Client.Broker> consumerBroker) {
+ return Arguments.of(
+ new FunctionWithToString<>(dp, publisherBroker),
+ new FunctionWithToString<>(dc, consumerBroker));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(
+ Function<Client.StreamMetadata, Client.Broker> publisherBroker,
+ Function<Client.StreamMetadata, Client.Broker> consumerBroker)
+ throws Exception {
+
+ int messageCount = 10_000;
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ assertThat(metadata).hasSize(1).containsKey(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+
+ CountDownLatch publishingLatch = new CountDownLatch(messageCount);
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
+ .port(publisherBroker.apply(streamMetadata).getPort())
+ .publishConfirmListener(
+ (publisherId, publishingId) -> publishingLatch.countDown()));
+
+ IntStream.range(0, messageCount)
+ .forEach(
+ i ->
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher
+ .messageBuilder()
+ .addData(("hello " + i).getBytes(StandardCharsets.UTF_8))
+ .build())));
+
+ assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ CountDownLatch consumingLatch = new CountDownLatch(messageCount);
+ Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount);
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(consumerBroker.apply(streamMetadata).getPort())
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount1, dataSize) ->
+ client1.credit(subscriptionId, 10))
+ .messageListener(
+ (subscriptionId, offset, message) -> {
+ bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
+ consumingLatch.countDown();
+ }));
+
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+
+ assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(bodies).hasSize(messageCount);
+ IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
+ }
+
+ @Test
+ void metadataOnClusterShouldReturnLeaderAndReplicas() {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ assertThat(metadata).hasSize(1).containsKey(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ assertThat(streamMetadata.getReplicas()).hasSize(2);
+
+ BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent =
+ (node, anotherNode) -> {
+ assertThat(node.getHost()).isEqualTo(anotherNode.getHost());
+ assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
+ };
+
+ streamMetadata
+ .getReplicas()
+ .forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));
+ List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas());
+ assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1));
+ }
+
+ static class FunctionWithToString<T, R> implements Function<T, R> {
+
+ final String toString;
+ final Function<T, R> delegate;
+
+ FunctionWithToString(String toString, Function<T, R> delegate) {
+ this.toString = toString;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public R apply(T t) {
+ return delegate.apply(t);
+ }
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
new file mode 100644
index 0000000000..c49a8d5832
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
@@ -0,0 +1,179 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import com.rabbitmq.stream.impl.Client;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BooleanSupplier;
+import org.junit.jupiter.api.extension.*;
+
+public class TestUtils {
+
+ static int streamPortNode1() {
+ String port = System.getProperty("node1.stream.port", "5555");
+ return Integer.valueOf(port);
+ }
+
+ static int streamPortNode2() {
+ String port = System.getProperty("node2.stream.port", "5556");
+ return Integer.valueOf(port);
+ }
+
+ static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ int waitTime = 100;
+ int waitedTime = 0;
+ long timeoutInMs = duration.toMillis();
+ while (waitedTime <= timeoutInMs) {
+ Thread.sleep(waitTime);
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ waitedTime += waitTime;
+ }
+ fail("Waited " + duration.getSeconds() + " second(s), condition never got true");
+ }
+
+ static class StreamTestInfrastructureExtension
+ implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
+
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
+
+ private static ExtensionContext.Store store(ExtensionContext extensionContext) {
+ return extensionContext.getRoot().getStore(NAMESPACE);
+ }
+
+ private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
+ return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) {
+ store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ try {
+ Field streamField =
+ context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
+ streamField.setAccessible(true);
+ streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
+ } catch (NoSuchFieldException e) {
+
+ }
+ try {
+ Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
+ streamField.setAccessible(true);
+ String stream = UUID.randomUUID().toString();
+ streamField.set(context.getTestInstance().get(), stream);
+ Client client =
+ new Client(
+ new Client.ClientParameters()
+ .eventLoopGroup(eventLoopGroup(context))
+ .port(streamPortNode1()));
+ Client.Response response = client.create(stream);
+ assertThat(response.isOk()).isTrue();
+ client.close();
+ store(context).put("testMethodStream", stream);
+ } catch (NoSuchFieldException e) {
+
+ }
+
+ for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
+ if (declaredField.getType().equals(ClientFactory.class)) {
+ declaredField.setAccessible(true);
+ ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
+ declaredField.set(context.getTestInstance().get(), clientFactory);
+ store(context).put("testClientFactory", clientFactory);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ try {
+ Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
+ streamField.setAccessible(true);
+ String stream = (String) streamField.get(context.getTestInstance().get());
+ Client client =
+ new Client(
+ new Client.ClientParameters()
+ .eventLoopGroup(eventLoopGroup(context))
+ .port(streamPortNode1()));
+ Client.Response response = client.delete(stream);
+ assertThat(response.isOk()).isTrue();
+ client.close();
+ store(context).remove("testMethodStream");
+ } catch (NoSuchFieldException e) {
+
+ }
+
+ ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
+ if (clientFactory != null) {
+ clientFactory.close();
+ }
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ EventLoopGroup eventLoopGroup = eventLoopGroup(context);
+ eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
+ }
+ }
+
+ static class ClientFactory {
+
+ private final EventLoopGroup eventLoopGroup;
+ private final Set<Client> clients = ConcurrentHashMap.newKeySet();
+
+ public ClientFactory(EventLoopGroup eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ }
+
+ public Client get() {
+ return get(new Client.ClientParameters());
+ }
+
+ public Client get(Client.ClientParameters parameters) {
+ // don't set the port, it would override the caller's port setting
+ Client client = new Client(parameters.eventLoopGroup(eventLoopGroup));
+ clients.add(client);
+ return client;
+ }
+
+ private void close() {
+ for (Client c : clients) {
+ c.close();
+ }
+ }
+ }
+}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000..45d598991d
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml
@@ -0,0 +1,13 @@
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="com.rabbitmq.stream" level="info" />
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration> \ No newline at end of file