summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/test
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_mqtt/test')
-rw-r--r--deps/rabbitmq_mqtt/test/auth_SUITE.erl493
-rw-r--r--deps/rabbitmq_mqtt/test/cluster_SUITE.erl188
-rw-r--r--deps/rabbitmq_mqtt/test/command_SUITE.erl158
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE.erl55
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem1
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem1
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem1
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets144
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE.erl127
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore3
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java110
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jarbin0 -> 48337 bytes
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties1
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile27
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/mvnw286
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd161
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml137
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config14
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java1030
-rwxr-xr-xdeps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh8
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh2
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java157
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java89
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy10
-rw-r--r--deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl73
-rw-r--r--deps/rabbitmq_mqtt/test/processor_SUITE.erl211
-rw-r--r--deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl125
-rw-r--r--deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl45
-rw-r--r--deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app19
-rw-r--r--deps/rabbitmq_mqtt/test/reader_SUITE.erl166
-rw-r--r--deps/rabbitmq_mqtt/test/retainer_SUITE.erl144
-rw-r--r--deps/rabbitmq_mqtt/test/util_SUITE.erl80
32 files changed, 4066 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
new file mode 100644
index 0000000000..7368139d95
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
@@ -0,0 +1,493 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(auth_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-define(CONNECT_TIMEOUT, 10000).
+
+all() ->
+ [{group, anonymous_no_ssl_user},
+ {group, anonymous_ssl_user},
+ {group, no_ssl_user},
+ {group, ssl_user},
+ {group, client_id_propagation}].
+
+groups() ->
+ [{anonymous_ssl_user, [],
+ [anonymous_auth_success,
+ user_credentials_auth,
+ ssl_user_auth_success,
+ ssl_user_vhost_not_allowed,
+ ssl_user_vhost_parameter_mapping_success,
+ ssl_user_vhost_parameter_mapping_not_allowed,
+ ssl_user_vhost_parameter_mapping_vhost_does_not_exist,
+ ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping
+ ]},
+ {anonymous_no_ssl_user, [],
+ [anonymous_auth_success,
+ user_credentials_auth,
+ port_vhost_mapping_success,
+ port_vhost_mapping_success_no_mapping,
+ port_vhost_mapping_not_allowed,
+ port_vhost_mapping_vhost_does_not_exist
+ %% SSL auth will succeed, because we cannot ignore anonymous
+ ]},
+ {ssl_user, [],
+ [anonymous_auth_failure,
+ user_credentials_auth,
+ ssl_user_auth_success,
+ ssl_user_vhost_not_allowed,
+ ssl_user_vhost_parameter_mapping_success,
+ ssl_user_vhost_parameter_mapping_not_allowed,
+ ssl_user_vhost_parameter_mapping_vhost_does_not_exist,
+ ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping
+ ]},
+ {no_ssl_user, [],
+ [anonymous_auth_failure,
+ user_credentials_auth,
+ ssl_user_auth_failure,
+ port_vhost_mapping_success,
+ port_vhost_mapping_success_no_mapping,
+ port_vhost_mapping_not_allowed,
+ port_vhost_mapping_vhost_does_not_exist
+ ]},
+ {client_id_propagation, [],
+ [client_id_propagation]
+ }
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(Group, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_certspwd, "bunnychow"}
+ ]),
+ MqttConfig = mqtt_config(Group),
+ AuthConfig = auth_config(Group),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++
+ [ fun(Conf) -> case AuthConfig of
+ undefined -> Conf;
+ _ -> merge_app_env(AuthConfig, Conf)
+ end
+ end ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+merge_app_env(MqttConfig, Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, MqttConfig).
+
+mqtt_config(anonymous_ssl_user) ->
+ {rabbitmq_mqtt, [{ssl_cert_login, true},
+ {allow_anonymous, true}]};
+mqtt_config(anonymous_no_ssl_user) ->
+ {rabbitmq_mqtt, [{ssl_cert_login, false},
+ {allow_anonymous, true}]};
+mqtt_config(ssl_user) ->
+ {rabbitmq_mqtt, [{ssl_cert_login, true},
+ {allow_anonymous, false}]};
+mqtt_config(no_ssl_user) ->
+ {rabbitmq_mqtt, [{ssl_cert_login, false},
+ {allow_anonymous, false}]};
+mqtt_config(client_id_propagation) ->
+ {rabbitmq_mqtt, [{ssl_cert_login, true},
+ {allow_anonymous, true}]}.
+
+auth_config(client_id_propagation) ->
+ {rabbit, [
+ {auth_backends, [rabbit_auth_backend_mqtt_mock]}
+ ]
+ };
+auth_config(_) ->
+ undefined.
+
+init_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success;
+ Testcase == ssl_user_auth_failure ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase);
+init_per_testcase(ssl_user_vhost_parameter_mapping_success, Config) ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ User = ?config(temp_ssl_user, Config1),
+ ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
+ Config2 = set_vhost_for_cert_user(Config1, User),
+ rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_success);
+init_per_testcase(ssl_user_vhost_parameter_mapping_not_allowed, Config) ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ User = ?config(temp_ssl_user, Config1),
+ Config2 = set_vhost_for_cert_user(Config1, User),
+ VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2),
+ ok = rabbit_ct_broker_helpers:clear_permissions(Config2, User, VhostForCertUser),
+ rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_not_allowed);
+init_per_testcase(user_credentials_auth, Config) ->
+ User = <<"new-user">>,
+ Pass = <<"new-user-pass">>,
+ ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, Pass),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{new_user, User},
+ {new_user_pass, Pass}]),
+ rabbit_ct_helpers:testcase_started(Config1, user_credentials_auth);
+init_per_testcase(ssl_user_vhost_not_allowed, Config) ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ User = ?config(temp_ssl_user, Config1),
+ ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
+ rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_not_allowed);
+init_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ User = ?config(temp_ssl_user, Config1),
+ Config2 = set_vhost_for_cert_user(Config1, User),
+ VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
+ rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_parameter_mapping_vhost_does_not_exist);
+init_per_testcase(port_vhost_mapping_success, Config) ->
+ User = <<"guest">>,
+ Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
+ rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
+ rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success);
+init_per_testcase(port_vhost_mapping_success_no_mapping, Config) ->
+ User = <<"guest">>,
+ Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
+ PortToVHostMappingParameter = [
+ {<<"1">>, <<"unlikely to exist">>},
+ {<<"2">>, <<"unlikely to exist">>}],
+ ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter),
+ VHost = ?config(temp_vhost_for_port_mapping, Config1),
+ rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost),
+ rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success_no_mapping);
+init_per_testcase(port_vhost_mapping_not_allowed, Config) ->
+ User = <<"guest">>,
+ Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
+ rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
+ VHost = ?config(temp_vhost_for_port_mapping, Config1),
+ rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost),
+ rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_not_allowed);
+init_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) ->
+ User = <<"guest">>,
+ Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
+ rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
+ VHost = ?config(temp_vhost_for_port_mapping, Config1),
+ rabbit_ct_broker_helpers:delete_vhost(Config1, VHost),
+ rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_vhost_does_not_exist);
+init_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) ->
+ Config1 = set_cert_user_on_default_vhost(Config),
+ User = ?config(temp_ssl_user, Config1),
+ Config2 = set_vhost_for_cert_user(Config1, User),
+
+ Config3 = set_vhost_for_port_vhost_mapping_user(Config2, User),
+ VhostForPortMapping = ?config(mqtt_port_to_vhost_mapping, Config2),
+ rabbit_ct_broker_helpers:clear_permissions(Config3, User, VhostForPortMapping),
+
+ rabbit_ct_broker_helpers:clear_permissions(Config3, User, <<"/">>),
+ rabbit_ct_helpers:testcase_started(Config3, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+set_cert_user_on_default_vhost(Config) ->
+ CertsDir = ?config(rmq_certsdir, Config),
+ CertFile = filename:join([CertsDir, "client", "cert.pem"]),
+ {ok, CertBin} = file:read_file(CertFile),
+ [{'Certificate', Cert, not_encrypted}] = public_key:pem_decode(CertBin),
+ UserBin = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_ssl,
+ peer_cert_auth_name,
+ [Cert]),
+ User = binary_to_list(UserBin),
+ ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, ""),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
+ rabbit_ct_helpers:set_config(Config, [{temp_ssl_user, User}]).
+
+set_vhost_for_cert_user(Config, User) ->
+ VhostForCertUser = <<"vhost_for_cert_user">>,
+ UserToVHostMappingParameter = [
+ {rabbit_data_coercion:to_binary(User), VhostForCertUser},
+ {<<"O=client,CN=unlikelytoexistuser">>, <<"vhost2">>}
+ ],
+ ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForCertUser),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForCertUser),
+ ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_default_vhosts, UserToVHostMappingParameter),
+ rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_ssl_user, VhostForCertUser}]).
+
+set_vhost_for_port_vhost_mapping_user(Config, User) ->
+ VhostForPortMapping = <<"vhost_for_port_vhost_mapping">>,
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ TlsPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
+ PortToVHostMappingParameter = [
+ {integer_to_binary(Port), VhostForPortMapping},
+ {<<"1884">>, <<"vhost2">>},
+ {integer_to_binary(TlsPort), VhostForPortMapping},
+ {<<"8884">>, <<"vhost2">>}
+
+ ],
+ ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForPortMapping),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForPortMapping),
+ ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter),
+ rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_port_mapping, VhostForPortMapping}]).
+
+end_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success;
+ Testcase == ssl_user_auth_failure;
+ Testcase == ssl_user_vhost_not_allowed ->
+ delete_cert_user(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
+end_per_testcase(TestCase, Config) when TestCase == ssl_user_vhost_parameter_mapping_success;
+ TestCase == ssl_user_vhost_parameter_mapping_not_allowed ->
+ delete_cert_user(Config),
+ VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),
+ rabbit_ct_helpers:testcase_finished(Config, TestCase);
+end_per_testcase(user_credentials_auth, Config) ->
+ User = ?config(new_user, Config),
+ {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]),
+ rabbit_ct_helpers:testcase_finished(Config, user_credentials_auth);
+end_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) ->
+ delete_cert_user(Config),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),
+ rabbit_ct_helpers:testcase_finished(Config, ssl_user_vhost_parameter_mapping_vhost_does_not_exist);
+end_per_testcase(Testcase, Config) when Testcase == port_vhost_mapping_success;
+ Testcase == port_vhost_mapping_not_allowed;
+ Testcase == port_vhost_mapping_success_no_mapping ->
+ User = <<"guest">>,
+ rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
+ VHost = ?config(temp_vhost_for_port_mapping, Config),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
+end_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) ->
+ User = <<"guest">>,
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
+ rabbit_ct_helpers:testcase_finished(Config, port_vhost_mapping_vhost_does_not_exist);
+end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) ->
+ delete_cert_user(Config),
+ VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),
+
+ VHostForPortVHostMapping = ?config(temp_vhost_for_port_mapping, Config),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping),
+ ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
+ rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+delete_cert_user(Config) ->
+ User = ?config(temp_ssl_user, Config),
+ {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]).
+
+anonymous_auth_success(Config) ->
+ expect_successful_connection(fun connect_anonymous/1, Config).
+
+anonymous_auth_failure(Config) ->
+ expect_authentication_failure(fun connect_anonymous/1, Config).
+
+
+ssl_user_auth_success(Config) ->
+ expect_successful_connection(fun connect_ssl/1, Config).
+
+ssl_user_auth_failure(Config) ->
+ expect_authentication_failure(fun connect_ssl/1, Config).
+
+user_credentials_auth(Config) ->
+ NewUser = ?config(new_user, Config),
+ NewUserPass = ?config(new_user_pass, Config),
+
+ expect_successful_connection(
+ fun(Conf) -> connect_user(NewUser, NewUserPass, Conf) end,
+ Config),
+
+ expect_successful_connection(
+ fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
+ Config),
+
+ expect_successful_connection(
+ fun(Conf) -> connect_user(<<"/:guest">>, <<"guest">>, Conf) end,
+ Config),
+
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(NewUser, <<"invalid_pass">>, Conf) end,
+ Config),
+
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(undefined, <<"pass">>, Conf) end,
+ Config),
+
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(NewUser, undefined, Conf) end,
+ Config),
+
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(<<"non-existing-vhost:guest">>, <<"guest">>, Conf) end,
+ Config).
+
+ssl_user_vhost_parameter_mapping_success(Config) ->
+ expect_successful_connection(fun connect_ssl/1, Config).
+
+ssl_user_vhost_parameter_mapping_not_allowed(Config) ->
+ expect_authentication_failure(fun connect_ssl/1, Config).
+
+ssl_user_vhost_not_allowed(Config) ->
+ expect_authentication_failure(fun connect_ssl/1, Config).
+
+ssl_user_vhost_parameter_mapping_vhost_does_not_exist(Config) ->
+ expect_authentication_failure(fun connect_ssl/1, Config).
+
+port_vhost_mapping_success(Config) ->
+ expect_successful_connection(
+ fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
+ Config).
+
+port_vhost_mapping_success_no_mapping(Config) ->
+ %% no vhost mapping for the port, falling back to default vhost
+ %% where the user can connect
+ expect_successful_connection(
+ fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
+ Config
+ ).
+
+port_vhost_mapping_not_allowed(Config) ->
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
+ Config
+ ).
+
+port_vhost_mapping_vhost_does_not_exist(Config) ->
+ expect_authentication_failure(
+ fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
+ Config
+ ).
+
+ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping(Config) ->
+ expect_successful_connection(fun connect_ssl/1, Config).
+
+connect_anonymous(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info}]).
+
+connect_ssl(Config) ->
+ CertsDir = ?config(rmq_certsdir, Config),
+ SSLConfig = [{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
+ {certfile, filename:join([CertsDir, "client", "cert.pem"])},
+ {keyfile, filename:join([CertsDir, "client", "key.pem"])}],
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
+ emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {ssl, SSLConfig}]).
+
+client_id_propagation(Config) ->
+ ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config,
+ rabbit_auth_backend_mqtt_mock),
+ ClientId = <<"client-id-propagation">>,
+ {ok, C} = connect_user(<<"client-id-propagation">>, <<"client-id-propagation">>,
+ Config, ClientId),
+ receive {mqttc, C, connected} -> ok
+ after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ [{authentication, AuthProps}] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_mqtt_mock,
+ get,
+ [authentication]),
+ ?assertEqual(ClientId, proplists:get_value(client_id, AuthProps)),
+
+ [{vhost_access, AuthzData}] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_mqtt_mock,
+ get,
+ [vhost_access]),
+ ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzData)),
+
+ [{resource_access, AuthzContext}] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_mqtt_mock,
+ get,
+ [resource_access]),
+ ?assertEqual(true, maps:size(AuthzContext) > 0),
+ ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzContext)),
+
+ [{topic_access, TopicContext}] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_mqtt_mock,
+ get,
+ [topic_access]),
+ VariableMap = maps:get(variable_map, TopicContext),
+ ?assertEqual(ClientId, maps:get(<<"client_id">>, VariableMap)),
+
+ emqttc:disconnect(C).
+
+connect_user(User, Pass, Config) ->
+ connect_user(User, Pass, Config, User).
+connect_user(User, Pass, Config, ClientID) ->
+ Creds = case User of
+ undefined -> [];
+ _ -> [{username, User}]
+ end ++ case Pass of
+ undefined -> [];
+ _ -> [{password, Pass}]
+ end,
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, ClientID},
+ {proto_ver, 3},
+ {logger, info}] ++ Creds).
+
+expect_successful_connection(ConnectFun, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
+ {ok, C} = ConnectFun(Config),
+ receive {mqttc, C, connected} -> emqttc:disconnect(C)
+ after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ [Attempt] =
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []),
+ ?assertEqual(false, proplists:is_defined(remote_address, Attempt)),
+ ?assertEqual(false, proplists:is_defined(username, Attempt)),
+ ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>),
+ ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1),
+ ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 0),
+ ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 1).
+
+expect_authentication_failure(ConnectFun, Config) ->
+ process_flag(trap_exit, true),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
+ {ok, C} = ConnectFun(Config),
+ Result = receive
+ {mqttc, C, connected} -> {error, unexpected_anonymous_connection};
+ {'EXIT', C, {shutdown,{connack_error,'CONNACK_AUTH'}}} -> ok;
+ {'EXIT', C, {shutdown,{connack_error,'CONNACK_CREDENTIALS'}}} -> ok
+ after
+ ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout}
+ end,
+ [Attempt] =
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []),
+ ?assertEqual(false, proplists:is_defined(remote_address, Attempt), <<>>),
+ ?assertEqual(false, proplists:is_defined(username, Attempt)),
+ ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>),
+ ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1),
+ ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 1),
+ ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 0),
+ process_flag(trap_exit, false),
+ case Result of
+ ok -> ok;
+ {error, Err} -> exit(Err)
+ end.
diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
new file mode 100644
index 0000000000..941b195ced
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
@@ -0,0 +1,188 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(cluster_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ connection_id_tracking,
+ connection_id_tracking_on_nodedown,
+ connection_id_tracking_with_decommissioned_node
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {minutes, 5}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, basic},
+ {collect_statistics_interval, 100}
+ ]}).
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 5}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases
+%% -------------------------------------------------------------------
+
+%% Note about running this testsuite in a mixed-versions cluster:
+%% All even-numbered nodes will use the same code base when using a
+%% secondary Umbrella. Odd-numbered nodes might use an incompatible code
+%% base. When cluster-wide client ID tracking was introduced, it was not
+%% put behind a feature flag because there was no need for one. Here, we
+%% don't have a way to ensure that all nodes participate in client ID
+%% tracking. However, those using the same code should. That's why we
+%% limit our RPC calls to those nodes.
+%%
+%% That's also the reason why we use a 5-node cluster: with node 2 and
+%% 4 which might not participate, it leaves nodes 1, 3 and 5: thus 3
+%% nodes, the minimum to use Ra in proper conditions.
+
+connection_id_tracking(Config) ->
+ ID = <<"duplicate-id">>,
+ {ok, MRef1, C1} = connect_to_node(Config, 0, ID),
+ emqttc:subscribe(C1, <<"TopicA">>, qos0),
+ emqttc:publish(C1, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ %% there's one connection
+ assert_connection_count(Config, 10, 2, 1),
+
+ %% connect to the same node (A or 0)
+ {ok, MRef2, _C2} = connect_to_node(Config, 0, ID),
+
+ %% C1 is disconnected
+ await_disconnection(MRef1),
+
+ %% connect to a different node (C or 2)
+ {ok, _, C3} = connect_to_node(Config, 2, ID),
+ assert_connection_count(Config, 10, 2, 1),
+
+ %% C2 is disconnected
+ await_disconnection(MRef2),
+
+ emqttc:disconnect(C3).
+
+connection_id_tracking_on_nodedown(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, MRef, C} = connect_to_node(Config, 0, <<"simpleClient">>),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+ assert_connection_count(Config, 10, 2, 1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ await_disconnection(MRef),
+ assert_connection_count(Config, 10, 2, 0),
+ ok.
+
+connection_id_tracking_with_decommissioned_node(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, MRef, C} = connect_to_node(Config, 0, <<"simpleClient">>),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ assert_connection_count(Config, 10, 2, 1),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
+ await_disconnection(MRef),
+ assert_connection_count(Config, 10, 2, 0),
+ ok.
+
+%%
+%% Helpers
+%%
+
+assert_connection_count(_Config, 0, _, _) ->
+ ct:fail("failed to complete rabbit_mqtt_collector:list/0");
+assert_connection_count(Config, Retries, NodeId, NumElements) ->
+ List = rabbit_ct_broker_helpers:rpc(Config, NodeId, rabbit_mqtt_collector, list, []),
+ case length(List) == NumElements of
+ true ->
+ ok;
+ false ->
+ timer:sleep(200),
+ assert_connection_count(Config, Retries-1, NodeId, NumElements)
+ end.
+
+
+
+connect_to_node(Config, Node, ClientID) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt),
+ {ok, C} = connect(Port, ClientID),
+ MRef = erlang:monitor(process, C),
+ {ok, MRef, C}.
+
+connect(Port, ClientID) ->
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, Port},
+ {client_id, ClientID},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ unlink(C),
+ {ok, C}.
+
+await_disconnection(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, _} -> ok
+ after 30000 -> exit(missing_down_message)
+ end.
+
+expect_publishes(_Topic, []) -> ok;
+expect_publishes(Topic, [Payload|Rest]) ->
+ receive
+ {publish, Topic, Payload} -> expect_publishes(Topic, Rest)
+ after 5000 ->
+ throw({publish_not_delivered, Payload})
+ end.
diff --git a/deps/rabbitmq_mqtt/test/command_SUITE.erl b/deps/rabbitmq_mqtt/test/command_SUITE.erl
new file mode 100644
index 0000000000..a15c3789f7
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/command_SUITE.erl
@@ -0,0 +1,158 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+
+-module(command_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_mqtt.hrl").
+
+
+-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand').
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ merge_defaults,
+ run
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 3}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+merge_defaults(_Config) ->
+ {[<<"client_id">>, <<"conn_name">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([], #{}),
+
+ {[<<"other_key">>], #{verbose := true}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
+
+ {[<<"other_key">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
+
+
+run(Config) ->
+
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts = #{node => Node, timeout => 10000, verbose => false},
+
+ %% No connections
+ [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
+
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, _} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ ct:sleep(100),
+
+ [[{client_id, <<"simpleClient">>}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)),
+
+
+ {ok, _} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient1">>},
+ {proto_ver, 3},
+ {logger, info},
+ {username, <<"guest">>},
+ {password, <<"guest">>},
+ {puback_timeout, 1}]),
+ ct:sleep(200),
+
+ [[{client_id, <<"simpleClient">>}, {user, <<"guest">>}],
+ [{client_id, <<"simpleClient1">>}, {user, <<"guest">>}]] =
+ lists:sort(
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>, <<"user">>],
+ Opts))),
+
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ start_amqp_connection(network, Node, Port),
+
+ %% There are still just two connections
+ [[{client_id, <<"simpleClient">>}],
+ [{client_id, <<"simpleClient1">>}]] =
+ lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))),
+
+ start_amqp_connection(direct, Node, Port),
+ ct:sleep(200),
+
+ %% Still two MQTT connections, one direct AMQP 0-9-1 connection
+ [[{client_id, <<"simpleClient">>}],
+ [{client_id, <<"simpleClient1">>}]] =
+ lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))),
+
+ %% Verbose returns all keys
+ Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
+ AllKeys1 = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)),
+ AllKeys2 = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
+
+ %% There are two connections
+ [FirstPL, _] = AllKeys1,
+ [SecondPL, _] = AllKeys2,
+
+ First = maps:from_list(lists:usort(FirstPL)),
+ Second = maps:from_list(lists:usort(SecondPL)),
+
+ %% Keys are INFO_ITEMS
+ KeysCount = length(?INFO_ITEMS),
+ ?assert(KeysCount =:= maps:size(First)),
+ ?assert(KeysCount =:= maps:size(Second)),
+
+ Keys = maps:keys(First),
+
+ [] = Keys -- ?INFO_ITEMS,
+ [] = ?INFO_ITEMS -- Keys.
+
+
+start_amqp_connection(Type, Node, Port) ->
+ amqp_connection:start(amqp_params(Type, Node, Port)).
+
+amqp_params(network, _, Port) ->
+ #amqp_params_network{port = Port};
+amqp_params(direct, Node, _) ->
+ #amqp_params_direct{node = Node}.
+
+
+
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl b/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl
new file mode 100644
index 0000000000..c760148cad
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl
@@ -0,0 +1,55 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(config_schema_SUITE).
+
+-compile(export_all).
+
+all() ->
+ [
+ run_snippets
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:run_setup_steps(Config),
+ rabbit_ct_config_schema:init_schemas(rabbitmq_mqtt, Config1).
+
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+run_snippets(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, run_snippets1, [Config]).
+
+run_snippets1(Config) ->
+ rabbit_ct_config_schema:run_snippets(Config).
+
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets
new file mode 100644
index 0000000000..032cce01f9
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets
@@ -0,0 +1,144 @@
+[{defaults,
+ "listeners.tcp.default = 5672
+ mqtt.default_user = guest
+ mqtt.default_pass = guest
+ mqtt.allow_anonymous = true
+ mqtt.vhost = /
+ mqtt.exchange = amq.topic
+ mqtt.subscription_ttl = 1800000
+ mqtt.prefetch = 10
+ mqtt.sparkplug = true
+ mqtt.listeners.ssl = none
+## Default MQTT with TLS port is 8883
+# mqtt.listeners.ssl.default = 8883
+ mqtt.listeners.tcp.default = 1883
+ mqtt.tcp_listen_options.backlog = 128
+ mqtt.tcp_listen_options.nodelay = true
+ mqtt.proxy_protocol = false",
+ [{rabbit,[{tcp_listeners,[5672]}]},
+ {rabbitmq_mqtt,
+ [{default_user,<<"guest">>},
+ {default_pass,<<"guest">>},
+ {allow_anonymous,true},
+ {vhost,<<"/">>},
+ {exchange,<<"amq.topic">>},
+ {subscription_ttl,1800000},
+ {prefetch,10},
+ {sparkplug,true},
+ {ssl_listeners,[]},
+ {tcp_listeners,[1883]},
+ {tcp_listen_options,[{backlog,128},{nodelay,true}]},
+ {proxy_protocol,false}]}],
+ [rabbitmq_mqtt]},
+
+ {listener_tcp_options,
+ "mqtt.listeners.tcp.1 = 127.0.0.1:61613
+ mqtt.listeners.tcp.2 = ::1:61613
+
+ mqtt.tcp_listen_options.backlog = 2048
+ mqtt.tcp_listen_options.recbuf = 8192
+ mqtt.tcp_listen_options.sndbuf = 8192
+
+ mqtt.tcp_listen_options.keepalive = true
+ mqtt.tcp_listen_options.nodelay = true
+
+ mqtt.tcp_listen_options.exit_on_close = true
+
+ mqtt.tcp_listen_options.send_timeout = 120
+",
+ [{rabbitmq_mqtt,[
+ {tcp_listeners,[
+ {"127.0.0.1",61613},
+ {"::1",61613}
+ ]}
+ , {tcp_listen_options, [
+ {backlog, 2048},
+ {exit_on_close, true},
+
+ {recbuf, 8192},
+ {sndbuf, 8192},
+
+ {send_timeout, 120},
+
+ {keepalive, true},
+ {nodelay, true}
+ ]}
+ ]}],
+ [rabbitmq_mqtt]},
+
+
+ {ssl,
+ "ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
+ ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem
+ ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem
+ ssl_options.verify = verify_peer
+ ssl_options.fail_if_no_peer_cert = true
+
+ mqtt.listeners.ssl.default = 8883
+ mqtt.listeners.tcp.default = 1883",
+ [{rabbit,
+ [{ssl_options,
+ [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
+ {certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
+ {keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
+ {verify,verify_peer},
+ {fail_if_no_peer_cert,true}]}]},
+ {rabbitmq_mqtt,[{ssl_listeners,[8883]},{tcp_listeners,[1883]}]}],
+ [rabbitmq_mqtt]},
+ {ssl_cert_login,
+ "mqtt.ssl_cert_login = true",
+ [{rabbitmq_mqtt,[{ssl_cert_login,true}]}],
+ [rabbitmq_mqtt]},
+ {ssl_cert_login_from,
+ "ssl_cert_login_from = common_name",
+ [{rabbit,[{ssl_cert_login_from,common_name}]}],
+ [rabbitmq_mqtt]},
+ {proxy_protocol,
+ "listeners.tcp.default = 5672
+ mqtt.default_user = guest
+ mqtt.default_pass = guest
+ mqtt.allow_anonymous = true
+ mqtt.vhost = /
+ mqtt.exchange = amq.topic
+ mqtt.subscription_ttl = undefined
+ mqtt.prefetch = 10
+ mqtt.proxy_protocol = true",
+ [{rabbit,[{tcp_listeners,[5672]}]},
+ {rabbitmq_mqtt,
+ [{default_user,<<"guest">>},
+ {default_pass,<<"guest">>},
+ {allow_anonymous,true},
+ {vhost,<<"/">>},
+ {exchange,<<"amq.topic">>},
+ {subscription_ttl,undefined},
+ {prefetch,10},
+ {proxy_protocol,true}]}],
+ [rabbitmq_mqtt]},
+ {prefetch_retained_msg_store,
+ "mqtt.default_user = guest
+ mqtt.default_pass = guest
+ mqtt.allow_anonymous = true
+ mqtt.vhost = /
+ mqtt.exchange = amq.topic
+ mqtt.subscription_ttl = 1800000
+ mqtt.prefetch = 10
+## use DETS (disk-based) store for retained messages
+ mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
+## only used by DETS store
+ mqtt.retained_message_store_dets_sync_interval = 2000
+
+ mqtt.listeners.ssl = none
+ mqtt.listeners.tcp.default = 1883",
+ [{rabbitmq_mqtt,
+ [{default_user,<<"guest">>},
+ {default_pass,<<"guest">>},
+ {allow_anonymous,true},
+ {vhost,<<"/">>},
+ {exchange,<<"amq.topic">>},
+ {subscription_ttl,1800000},
+ {prefetch,10},
+ {retained_message_store,rabbit_mqtt_retained_msg_store_dets},
+ {retained_message_store_dets_sync_interval,2000},
+ {ssl_listeners,[]},
+ {tcp_listeners,[1883]}]}],
+ [rabbitmq_mqtt]}].
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl
new file mode 100644
index 0000000000..34ec8dac19
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl
@@ -0,0 +1,127 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(java_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(BASE_CONF_RABBIT, {rabbit, [{ssl_options, [{fail_if_no_peer_cert, false}]}]}).
+-define(BASE_CONF_MQTT,
+ {rabbitmq_mqtt, [
+ {ssl_cert_login, true},
+ {allow_anonymous, false},
+ {sparkplug, true},
+ {tcp_listeners, []},
+ {ssl_listeners, []}
+ ]}).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ java
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 600}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ {ok, Ssl} = q(Config, [erlang_node_config, rabbit, ssl_options]),
+ Ssl1 = lists:keyreplace(fail_if_no_peer_cert, 1, Ssl, {fail_if_no_peer_cert, false}),
+ Config1 = rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{ssl_options, Ssl1}]}),
+ rabbit_ct_helpers:merge_app_env(Config1, ?BASE_CONF_MQTT).
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE},
+ {rmq_certspwd, "bunnychow"},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 3}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ CertsDir = ?config(rmq_certsdir, Config),
+ CertFile = filename:join([CertsDir, "client", "cert.pem"]),
+ {ok, CertBin} = file:read_file(CertFile),
+ [{'Certificate', Cert, not_encrypted}] = public_key:pem_decode(CertBin),
+ UserBin = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_ssl,
+ peer_cert_auth_name,
+ [Cert]),
+ User = binary_to_list(UserBin),
+ {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["add_user", User, ""]),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0,
+ ["set_topic_permissions", "-p", "/", "guest", "amq.topic",
+ % Write permission
+ "test-topic|test-retained-topic|{username}.{client_id}.a|^sp[AB]v\\d+___\\d+",
+ % Read permission
+ "test-topic|test-retained-topic|last-will|{username}.{client_id}.a|^sp[AB]v\\d+___\\d+"]),
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Testsuite cases
+%% -------------------------------------------------------------------
+
+java(Config) ->
+ CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir),
+ MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt),
+ MqttPort3 = rabbit_ct_broker_helpers:get_node_config(Config, 2, tcp_port_mqtt),
+ MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
+ AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ os:putenv("SSL_CERTS_DIR", CertsDir),
+ os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)),
+ os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)),
+ os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)),
+ os:putenv("MQTT_PORT_3", erlang:integer_to_list(MqttPort3)),
+ os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)),
+ DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
+ MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]),
+ {ok, _} = MakeResult.
+
+rpc(Config, M, F, A) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A).
+
+q(P, [K | Rem]) ->
+ case proplists:get_value(K, P) of
+ undefined -> undefined;
+ V -> q(V, Rem)
+ end;
+q(P, []) -> {ok, P}.
+
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore b/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore
new file mode 100644
index 0000000000..4c70cdb707
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore
@@ -0,0 +1,3 @@
+/build/
+/lib/
+/target/
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java
new file mode 100755
index 0000000000..2e394d5b34
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java
@@ -0,0 +1,110 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+ /**
+ * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+ */
+ private static final String DEFAULT_DOWNLOAD_URL =
+ "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar";
+
+ /**
+ * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+ * use instead of the default one.
+ */
+ private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+ ".mvn/wrapper/maven-wrapper.properties";
+
+ /**
+ * Path where the maven-wrapper.jar will be saved to.
+ */
+ private static final String MAVEN_WRAPPER_JAR_PATH =
+ ".mvn/wrapper/maven-wrapper.jar";
+
+ /**
+ * Name of the property which should be used to override the default download url for the wrapper.
+ */
+ private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+ public static void main(String args[]) {
+ System.out.println("- Downloader started");
+ File baseDirectory = new File(args[0]);
+ System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+ // If the maven-wrapper.properties exists, read it and check if it contains a custom
+ // wrapperUrl parameter.
+ File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+ String url = DEFAULT_DOWNLOAD_URL;
+ if(mavenWrapperPropertyFile.exists()) {
+ FileInputStream mavenWrapperPropertyFileInputStream = null;
+ try {
+ mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+ Properties mavenWrapperProperties = new Properties();
+ mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+ url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+ } catch (IOException e) {
+ System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+ } finally {
+ try {
+ if(mavenWrapperPropertyFileInputStream != null) {
+ mavenWrapperPropertyFileInputStream.close();
+ }
+ } catch (IOException e) {
+ // Ignore ...
+ }
+ }
+ }
+ System.out.println("- Downloading from: : " + url);
+
+ File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+ if(!outputFile.getParentFile().exists()) {
+ if(!outputFile.getParentFile().mkdirs()) {
+ System.out.println(
+ "- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+ }
+ }
+ System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+ try {
+ downloadFileFromURL(url, outputFile);
+ System.out.println("Done");
+ System.exit(0);
+ } catch (Throwable e) {
+ System.out.println("- Error downloading");
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+ URL website = new URL(urlString);
+ ReadableByteChannel rbc;
+ rbc = Channels.newChannel(website.openStream());
+ FileOutputStream fos = new FileOutputStream(destination);
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+ fos.close();
+ rbc.close();
+ }
+
+}
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar
new file mode 100755
index 0000000000..01e6799737
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar
Binary files differ
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties
new file mode 100755
index 0000000000..00d32aab1d
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip \ No newline at end of file
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
new file mode 100644
index 0000000000..e2f9748eb2
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
@@ -0,0 +1,27 @@
+export PATH :=$(CURDIR):$(PATH)
+HOSTNAME := $(shell hostname)
+MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \
+ -Dhostname=$(HOSTNAME) \
+ -Dcerts.dir=$(SSL_CERTS_DIR) \
+ -Dmqtt.ssl.port=$(MQTT_SSL_PORT) \
+ -Dmqtt.port=$(MQTT_PORT) \
+ -Dmqtt.port.2=$(MQTT_PORT_2) \
+ -Dmqtt.port.3=$(MQTT_PORT_3) \
+ -Damqp.port=$(AMQP_PORT)
+
+.PHONY: deps tests clean distclean
+
+deps:
+ mkdir -p lib
+ @mvnw dependency:copy-dependencies -DoutputDirectory=lib
+
+tests:
+ # Note: to run a single test
+ # @mvnw -q $(MVN_FLAGS) -Dtest=MqttTest#subscribeMultiple test
+ @mvnw -q $(MVN_FLAGS) test
+
+clean:
+ @mvnw clean
+
+distclean: clean
+ rm -f lib/*.jar
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw
new file mode 100755
index 0000000000..8b9da3b8b6
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw
@@ -0,0 +1,286 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven2 Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+ # TODO classpath?
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ wget "$jarUrl" -O "$wrapperJarPath"
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ curl -o "$wrapperJarPath" "$jarUrl"
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd
new file mode 100755
index 0000000000..a5284c7939
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd
@@ -0,0 +1,161 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
+FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ echo Found %WRAPPER_JAR%
+) else (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
+ echo Finished downloading %WRAPPER_JAR%
+)
+@REM End of extension
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
new file mode 100644
index 0000000000..b27b58c172
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client-mqtt</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>RabbitMQ MQTT plugin dependencies list</name>
+ <description>Fetches test dependencies only.</description>
+ <url>https://www.rabbitmq.com</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>[1.2.1,)</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>5.7.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.5.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <test-keystore.ca>${project.build.directory}/ca.keystore</test-keystore.ca>
+ <test-keystore.password>bunnychow</test-keystore.password>
+ <groovy-scripts.dir>${basedir}/src/test/scripts</groovy-scripts.dir>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.2</version>
+ <configuration>
+ <environmentVariables>
+ <DEPS_DIR>${deps.dir}</DEPS_DIR>
+ </environmentVariables>
+ <systemPropertyVariables>
+ <hostname>${hostname}</hostname>
+ <certs.dir>${certs.dir}</certs.dir>
+ <mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port>
+ <mqtt.port>${mqtt.port}</mqtt.port>
+ <mqtt.port.2>${mqtt.port.2}</mqtt.port.2>
+ <amqp.port>${amqp.port}</amqp.port>
+
+ <test-keystore.ca>${test-keystore.ca}</test-keystore.ca>
+ <test-keystore.password>${test-keystore.password}</test-keystore.password>
+ <test-client-cert.path>${certs.dir}/client/keycert.p12</test-client-cert.path>
+ <test-client-cert.password>bunnychow</test-client-cert.password>
+
+ </systemPropertyVariables>
+ <!--
+ needed because of bug in OpenJDK 8 u181 on Debian distros
+ see https://stackoverflow.com/questions/53010200/maven-surefire-could-not-find-forkedbooter-class
+ -->
+ <argLine>-Djdk.net.URLClassPath.disableClassPathURLCheck=true</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <version>2.1.1</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ <version>2.4.17</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <phase>generate-test-resources</phase>
+ <id>remove-old-test-keystores</id>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+ <source>
+ ${groovy-scripts.dir}/remove_old_test_keystores.groovy
+ </source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>keytool-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>generate-test-ca-keystore</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>importCertificate</goal>
+ </goals>
+ <configuration>
+ <file>${certs.dir}/testca/cacert.pem</file>
+ <keystore>${test-keystore.ca}</keystore>
+ <storepass>${test-keystore.password}</storepass>
+ <noprompt>true</noprompt>
+ <alias>server1</alias>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <compilerArgs>
+ <arg>-Xlint:deprecation</arg>
+ <arg>-Xlint:unchecked</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config
new file mode 100644
index 0000000000..3d6bafff86
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config
@@ -0,0 +1,14 @@
+[{rabbitmq_mqtt, [
+ {ssl_cert_login, true},
+ {allow_anonymous, true},
+ {tcp_listeners, [1883]},
+ {ssl_listeners, [8883]}
+ ]},
+ {rabbit, [{ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"},
+ {certfile,"%%CERTS_DIR%%/server/cert.pem"},
+ {keyfile,"%%CERTS_DIR%%/server/key.pem"},
+ {verify,verify_peer},
+ {fail_if_no_peer_cert,false}
+ ]}
+ ]}
+].
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
new file mode 100644
index 0000000000..24c4a0be14
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
@@ -0,0 +1,1030 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.mqtt.test;
+
+import com.rabbitmq.client.*;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
+import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule;
+import org.eclipse.paho.client.mqttv3.internal.wire.MqttPingReq;
+import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import javax.net.SocketFactory;
+import java.io.*;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/***
+ * MQTT v3.1 tests
+ *
+ */
+
+public class MqttTest implements MqttCallback {
+
+ private static final Duration EXPECT_TIMEOUT = Duration.ofSeconds(10);
+
+ private final String host = "localhost";
+ private final String brokerUrl = "tcp://" + host + ":" + getPort();
+ private final String brokerThreeUrl = "tcp://" + host + ":" + getThirdPort();
+ private volatile List<MqttMessage> receivedMessages;
+
+ private final byte[] payload = "payload".getBytes();
+ private final String topic = "test-topic";
+ private final String retainedTopic = "test-retained-topic";
+ private int testDelay = 2000;
+
+ private volatile long lastReceipt;
+ private volatile boolean expectConnectionFailure;
+ private volatile boolean failOnDelivery = false;
+
+ private Connection conn;
+ private Channel ch;
+
+ private static int getPort() {
+ Object port = System.getProperty("mqtt.port", "1883");
+ assertNotNull(port);
+ return Integer.parseInt(port.toString());
+ }
+
+ private static int getThirdPort() {
+ Object port = System.getProperty("mqtt.port.3", "1883");
+ assertNotNull(port);
+ return Integer.parseInt(port.toString());
+ }
+
+ private static int getAmqpPort() {
+ Object port = System.getProperty("amqp.port", "5672");
+ assertNotNull(port);
+ return Integer.parseInt(port.toString());
+ }
+
+ // override the 10s limit
+ private class TestMqttConnectOptions extends MqttConnectOptions {
+ private int keepAliveInterval = 60;
+ private final String user_name = "guest";
+ private final String password = "guest";
+
+ public TestMqttConnectOptions() {
+ super.setUserName(user_name);
+ super.setPassword(password.toCharArray());
+ super.setCleanSession(true);
+ super.setKeepAliveInterval(60);
+ // PublishMultiple overwhelms Paho defaults
+ super.setMaxInflight(15000);
+ }
+
+ @Override
+ public void setKeepAliveInterval(int keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ @Override
+ public int getKeepAliveInterval() {
+ return this.keepAliveInterval;
+ }
+ }
+
+ private MqttClient newClient(TestInfo testInfo) throws MqttException {
+ return newClient(clientId(testInfo));
+ }
+
+ private MqttClient newClient(String client_id) throws MqttException {
+ return newClient(brokerUrl, client_id);
+ }
+
+ private MqttClient newClient(String uri, TestInfo testInfo) throws MqttException {
+ return newClient(uri, clientId(testInfo));
+ }
+
+ private MqttClient newClient(String uri, String client_id) throws MqttException {
+ return new MqttClient(uri, client_id, null);
+ }
+
+ private MqttClient newConnectedClient(TestInfo testInfo, MqttConnectOptions conOpt) throws MqttException {
+ return newConnectedClient(clientId(testInfo), conOpt);
+ }
+
+ private MqttClient newConnectedClient(String client_id, MqttConnectOptions conOpt) throws MqttException {
+ MqttClient client = newClient(brokerUrl, client_id);
+ client.connect(conOpt);
+ return client;
+ }
+
+ private static String clientId(TestInfo info) {
+ return "test-" + info.getTestMethod().get().getName();
+ }
+
+ private void disconnect(MqttClient client) {
+ try {
+ if (client.isConnected()) {
+ client.disconnect(5000);
+ }
+ } catch (Exception ignored) {}
+ }
+
+ @BeforeEach
+ public void setUp() {
+ receivedMessages = Collections.synchronizedList(new ArrayList<>());
+ expectConnectionFailure = false;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ // clean any sticky sessions
+ receivedMessages.clear();
+ }
+
+ private void setUpAmqp() throws IOException, TimeoutException {
+ int port = getAmqpPort();
+ ConnectionFactory cf = new ConnectionFactory();
+ cf.setHost(host);
+ cf.setPort(port);
+ conn = cf.newConnection();
+ ch = conn.createChannel();
+ }
+
+ private void tearDownAmqp() throws IOException {
+ if (conn.isOpen()) {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void connectFirst() throws MqttException, IOException {
+ NetworkModule networkModule = new TCPNetworkModule(SocketFactory.getDefault(), host, getPort(), "");
+ networkModule.start();
+ DataInputStream in = new DataInputStream(networkModule.getInputStream());
+ OutputStream out = networkModule.getOutputStream();
+
+ MqttWireMessage message = new MqttPingReq();
+
+ try {
+ // ---8<---
+ // Copy/pasted from write() in MqttOutputStream.java.
+ byte[] bytes = message.getHeader();
+ byte[] pl = message.getPayload();
+ out.write(bytes,0,bytes.length);
+
+ int offset = 0;
+ int chunckSize = 1024;
+ while (offset < pl.length) {
+ int length = Math.min(chunckSize, pl.length - offset);
+ out.write(pl, offset, length);
+ offset += chunckSize;
+ }
+ // ---8<---
+
+ // ---8<---
+ // Copy/pasted from flush() in MqttOutputStream.java.
+ out.flush();
+ // ---8<---
+
+ // ---8<---
+ // Copy/pasted from readMqttWireMessage() in MqttInputStream.java.
+ ByteArrayOutputStream bais = new ByteArrayOutputStream();
+ byte first = in.readByte();
+ // ---8<---
+
+ fail("Error expected if CONNECT is not first packet");
+ } catch (IOException ignored) {}
+ }
+
+ @Test public void invalidUser(TestInfo info) throws MqttException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setUserName("invalid-user");
+ MqttClient client = newClient(info);
+ try {
+ client.connect(client_opts);
+ fail("Authentication failure expected");
+ } catch (MqttException ex) {
+ assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
+ } finally {
+ if (client.isConnected()) {
+ disconnect(client);
+ }
+ }
+ }
+
+ // rabbitmq/rabbitmq-mqtt#37: QoS 1, clean session = false
+ @Test public void qos1AndCleanSessionUnset()
+ throws MqttException, IOException, TimeoutException, InterruptedException {
+ testQueuePropertiesWithCleanSessionUnset("qos1-no-clean-session", 1, true, false);
+ }
+
+ protected void testQueuePropertiesWithCleanSessionSet(String cid, int qos, boolean durable, boolean autoDelete)
+ throws IOException, MqttException, TimeoutException, InterruptedException {
+ testQueuePropertiesWithCleanSession(true, cid, qos, durable, autoDelete);
+ }
+
+ protected void testQueuePropertiesWithCleanSessionUnset(String cid, int qos, boolean durable, boolean autoDelete)
+ throws IOException, MqttException, TimeoutException, InterruptedException {
+ testQueuePropertiesWithCleanSession(false, cid, qos, durable, autoDelete);
+ }
+
+ protected void testQueuePropertiesWithCleanSession(boolean cleanSession, String cid, int qos,
+ boolean durable, boolean autoDelete)
+ throws MqttException, IOException, TimeoutException {
+ MqttClient c = newClient(brokerUrl, cid);
+ MqttConnectOptions opts = new TestMqttConnectOptions();
+ opts.setUserName("guest");
+ opts.setPassword("guest".toCharArray());
+ opts.setCleanSession(cleanSession);
+ c.connect(opts);
+
+ setUpAmqp();
+ Channel tmpCh = conn.createChannel();
+
+ String q = "mqtt-subscription-" + cid + "qos" + qos;
+
+ c.subscribe(topic, qos);
+ // there is no server-sent notification about subscription
+ // success so we inject a delay
+ waitForTestDelay();
+
+ // ensure the queue is declared with the arguments we expect
+ // e.g. mqtt-subscription-client-3aqos0
+ try {
+ // first ensure the queue exists
+ tmpCh.queueDeclarePassive(q);
+ // then assert on properties
+ Map<String, Object> args = new HashMap<>();
+ args.put("x-expires", 86400000);
+ tmpCh.queueDeclare(q, durable, autoDelete, false, args);
+ } finally {
+ if (c.isConnected()) {
+ c.disconnect(3000);
+ }
+
+ Channel tmpCh2 = conn.createChannel();
+ tmpCh2.queueDelete(q);
+ tmpCh2.close();
+ tearDownAmqp();
+ }
+ }
+
+ @Test public void invalidPassword(TestInfo info) throws MqttException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setUserName("invalid-user");
+ client_opts.setPassword("invalid-password".toCharArray());
+ MqttClient client = newClient(info);
+ try {
+ client.connect(client_opts);
+ fail("Authentication failure expected");
+ } catch (MqttException ex) {
+ assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
+ } finally {
+ if (client.isConnected()) {
+ disconnect(client);
+ }
+ }
+ }
+
+ @Test public void emptyPassword(TestInfo info) throws MqttException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setPassword("".toCharArray());
+
+ MqttClient client = newClient(info);
+ try {
+ client.connect(client_opts);
+ fail("Authentication failure expected");
+ } catch (MqttException ex) {
+ assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
+ }
+ }
+
+
+ @Test public void subscribeQos0(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newClient(info);
+ client.connect(client_opts);
+ client.setCallback(this);
+ client.subscribe(topic, 0);
+
+ publish(client, topic, 0, payload);
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+ assertEquals(0, receivedMessages.get(0).getQos());
+ disconnect(client);
+ }
+
+ @Test public void subscribeUnsubscribe(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newClient(info);
+ client.connect(client_opts);
+ client.setCallback(this);
+ client.subscribe(topic, 0);
+
+ publish(client, topic, 1, payload);
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+ assertEquals(0, receivedMessages.get(0).getQos());
+
+ client.unsubscribe(topic);
+ publish(client, topic, 0, payload);
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ disconnect(client);
+ }
+
+ @Test public void subscribeQos1(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newClient(info);
+ client.connect(client_opts);
+ client.setCallback(this);
+ client.subscribe(topic, 1);
+
+ publish(client, topic, 0, payload);
+ publish(client, topic, 1, payload);
+ publish(client, topic, 2, payload);
+
+ waitAtMost(() -> receivedMessagesSize() == 3);
+
+ MqttMessage msg1 = receivedMessages.get(0);
+ MqttMessage msg2 = receivedMessages.get(1);
+ MqttMessage msg3 = receivedMessages.get(1);
+
+ assertArrayEquals(msg1.getPayload(), payload);
+ assertEquals(0, msg1.getQos());
+
+ assertArrayEquals(msg2.getPayload(), payload);
+ assertEquals(1, msg2.getQos());
+
+ // Downgraded QoS 2 to QoS 1
+ assertArrayEquals(msg3.getPayload(), payload);
+ assertEquals(1, msg3.getQos());
+
+ disconnect(client);
+ }
+
+ @Test public void subscribeReceivesRetainedMessagesWithMatchingQoS(TestInfo info)
+ throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newClient(info);
+ client.connect(client_opts);
+ client.setCallback(this);
+ clearRetained(client, retainedTopic);
+ client.subscribe(retainedTopic, 1);
+
+ publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8));
+ publishRetained(client, retainedTopic, 1, "retain 2".getBytes(StandardCharsets.UTF_8));
+
+ waitAtMost(() -> receivedMessagesSize() == 2);
+ MqttMessage lastMsg = receivedMessages.get(1);
+
+ client.unsubscribe(retainedTopic);
+ receivedMessages.clear();
+ client.subscribe(retainedTopic, 1);
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ final MqttMessage retainedMsg = receivedMessages.get(0);
+ assertEquals(new String(lastMsg.getPayload()),
+ new String(retainedMsg.getPayload()));
+
+ disconnect(client);
+ }
+
+ @Test public void subscribeReceivesRetainedMessagesWithDowngradedQoS(TestInfo info)
+ throws MqttException, InterruptedException {
+ MqttConnectOptions clientOpts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, clientOpts);
+ client.setCallback(this);
+ clearRetained(client, retainedTopic);
+ client.subscribe(retainedTopic, 1);
+
+ publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8));
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ MqttMessage lastMsg = receivedMessages.get(0);
+
+ client.unsubscribe(retainedTopic);
+ receivedMessages.clear();
+ final int subscribeQoS = 0;
+ client.subscribe(retainedTopic, subscribeQoS);
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ final MqttMessage retainedMsg = receivedMessages.get(0);
+ assertEquals(new String(lastMsg.getPayload()),
+ new String(retainedMsg.getPayload()));
+ assertEquals(subscribeQoS, retainedMsg.getQos());
+
+ disconnect(client);
+ }
+
+ @Test public void publishWithEmptyMessageClearsRetained(TestInfo info)
+ throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ clearRetained(client, retainedTopic);
+ client.subscribe(retainedTopic, 1);
+
+ publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8));
+ publishRetained(client, retainedTopic, 1, "retain 2".getBytes(StandardCharsets.UTF_8));
+
+ waitAtMost(() -> receivedMessagesSize() == 2);
+ client.unsubscribe(retainedTopic);
+ receivedMessages.clear();
+
+ clearRetained(client, retainedTopic);
+ client.subscribe(retainedTopic, 1);
+ waitAtMost(() -> receivedMessagesSize() == 0);
+
+ disconnect(client);
+ }
+
+ @Test public void topics(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ client.subscribe("/+/test-topic/#");
+ String[] cases = new String[]{"/pre/test-topic2", "/test-topic", "/a/test-topic/b/c/d", "/frob/test-topic"};
+ List<String> expected = Arrays.asList("/a/test-topic/b/c/d", "/frob/test-topic");
+ for(String example : cases){
+ publish(client, example, 0, example.getBytes());
+ }
+ waitAtMost(() -> receivedMessagesSize() == expected.size());
+ for (MqttMessage m : receivedMessages){
+ expected.contains(new String(m.getPayload()));
+ }
+ disconnect(client);
+ }
+
+ @Test public void sparkplugTopics(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException {
+ final String amqp091Topic = "spBv1___0.MACLab.DDATA.Opto22.CLX";
+ final String sparkplugTopic = "spBv1.0/MACLab/+/Opto22/CLX";
+
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ client.subscribe(sparkplugTopic);
+
+ setUpAmqp();
+ ch.basicPublish("amq.topic", amqp091Topic, MessageProperties.MINIMAL_BASIC, payload);
+ tearDownAmqp();
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ disconnect(client);
+ }
+
+ @Test public void nonCleanSession(TestInfo info) throws MqttException, InterruptedException {
+ String clientIdBase = clientId(info);
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setCleanSession(false);
+ MqttClient client = newConnectedClient(clientIdBase + "-1", client_opts);
+ client.subscribe(topic, 1);
+ client.disconnect();
+
+ MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts);
+ publish(client2, topic, 1, payload);
+ client2.disconnect();
+
+ client.setCallback(this);
+ client.connect(client_opts);
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+ disconnect(client);
+ }
+
+ @Test public void sessionRedelivery(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setCleanSession(false);
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.subscribe(topic, 1);
+ disconnect(client);
+
+ MqttClient client2 = newConnectedClient(info, client_opts);
+ publish(client2, topic, 1, payload);
+ disconnect(client2);
+
+ failOnDelivery = true;
+
+ // Connection should fail. Messages will be redelivered.
+ client.setCallback(this);
+ client.connect(client_opts);
+
+ // Message has been delivered but connection has failed.
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+
+ assertFalse(client.isConnected());
+
+ receivedMessages.clear();
+ failOnDelivery = false;
+
+ client.setCallback(this);
+ client.connect(client_opts);
+
+ // Message has been redelivered after session resume
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+ assertTrue(client.isConnected());
+ disconnect(client);
+
+ receivedMessages.clear();
+
+ client.setCallback(this);
+ waitAtMost(() -> client.isConnected() == false);
+ client.connect(client_opts);
+
+ // This time messaage are acknowledged and won't be redelivered
+ waitAtMost(() -> receivedMessagesSize() == 0);
+ assertEquals(0, receivedMessages.size());
+
+ disconnect(client);
+ }
+
+ @Test public void cleanSession(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setCleanSession(false);
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.subscribe(topic, 1);
+ client.disconnect();
+
+ MqttClient client2 = newConnectedClient(info, client_opts);
+ publish(client2, topic, 1, payload);
+ disconnect(client2);
+
+ client_opts.setCleanSession(true);
+ client.connect(client_opts);
+ client.setCallback(this);
+ client.subscribe(topic, 1);
+
+ waitAtMost(() -> receivedMessagesSize() == 0);
+ client.unsubscribe(topic);
+ disconnect(client);
+ }
+
+ @Test public void multipleClientIds(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ // uses duplicate client ID
+ MqttClient client2 = newConnectedClient(info, client_opts);
+ // the older connection with this client ID will be closed
+ waitAtMost(() -> client.isConnected() == false);
+ disconnect(client2);
+ }
+
+ @Test public void multipleClusterClientIds(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ MqttClient client3 = newClient(brokerThreeUrl, info);
+ client3.connect(client_opts);
+ waitAtMost(() -> client.isConnected() == false);
+ disconnect(client3);
+ }
+
+ @Test public void ping(TestInfo info) throws MqttException, InterruptedException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ client_opts.setKeepAliveInterval(1);
+ MqttClient client = newConnectedClient(info, client_opts);
+ waitAtMost(() -> client.isConnected());
+ disconnect(client);
+ }
+
+ @Test public void will(TestInfo info) throws MqttException, InterruptedException, IOException {
+ String clientIdBase = clientId(info);
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts);
+ client2.subscribe(topic);
+ client2.setCallback(this);
+
+ final SocketFactory factory = SocketFactory.getDefault();
+ final ArrayList<Socket> sockets = new ArrayList<>();
+ SocketFactory testFactory = new SocketFactory() {
+ public Socket createSocket(String s, int i) throws IOException {
+ Socket sock = factory.createSocket(s, i);
+ sockets.add(sock);
+ return sock;
+ }
+ public Socket createSocket(String s, int i, InetAddress a, int i1) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) {
+ return null;
+ }
+ @Override
+ public Socket createSocket() {
+ Socket sock = new Socket();
+ sockets.add(sock);
+ return sock;
+ }
+ };
+
+ MqttClient client = newClient(clientIdBase + "-1");
+ MqttTopic willTopic = client.getTopic(topic);
+
+ MqttConnectOptions opts = new TestMqttConnectOptions();
+ opts.setSocketFactory(testFactory);
+ opts.setWill(willTopic, payload, 0, false);
+ opts.setCleanSession(false);
+
+ client.connect(opts);
+
+ assertTrue(sockets.size() >= 1);
+ expectConnectionFailure = true;
+ sockets.get(0).close();
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), payload);
+ client2.unsubscribe(topic);
+ disconnect(client2);
+ }
+
+ @Test public void willIsRetained(TestInfo info) throws MqttException, InterruptedException, IOException {
+ String clientIdBase = clientId(info);
+ MqttConnectOptions client2_opts = new TestMqttConnectOptions();
+ client2_opts.setCleanSession(true);
+ MqttClient client2 = newConnectedClient(clientIdBase + "-2", client2_opts);
+ client2.setCallback(this);
+
+ clearRetained(client2, retainedTopic);
+ client2.subscribe(retainedTopic, 1);
+ disconnect(client2);
+
+ final SocketFactory factory = SocketFactory.getDefault();
+ final ArrayList<Socket> sockets = new ArrayList<>();
+ SocketFactory testFactory = new SocketFactory() {
+ public Socket createSocket(String s, int i) throws IOException {
+ Socket sock = factory.createSocket(s, i);
+ sockets.add(sock);
+ return sock;
+ }
+ public Socket createSocket(String s, int i, InetAddress a, int i1) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) {
+ return null;
+ }
+ @Override
+ public Socket createSocket() {
+ Socket sock = new Socket();
+ sockets.add(sock);
+ return sock;
+ }
+ };
+
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+
+ MqttClient client = newClient(clientIdBase + "-1");
+ MqttTopic willTopic = client.getTopic(retainedTopic);
+ byte[] willPayload = "willpayload".getBytes();
+
+ client_opts.setSocketFactory(testFactory);
+ client_opts.setWill(willTopic, willPayload, 1, true);
+
+ client.connect(client_opts);
+
+ assertEquals(1, sockets.size());
+ sockets.get(0).close();
+
+ // let last will propagate after disconnection
+ waitForTestDelay();
+
+ client2.connect(client2_opts);
+ client2.setCallback(this);
+ client2.subscribe(retainedTopic, 1);
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertArrayEquals(receivedMessages.get(0).getPayload(), willPayload);
+ client2.unsubscribe(topic);
+ disconnect(client2);
+ }
+
+ @Test public void subscribeMultiple(TestInfo info) throws MqttException {
+ String clientIdBase = clientId(info);
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(clientIdBase + "-1", client_opts);
+ publish(client, "/test-topic/1", 1, "msq1-qos1".getBytes());
+
+ MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts);
+ client2.setCallback(this);
+ client2.subscribe("/test-topic/#");
+ client2.subscribe("/test-topic/#");
+
+ publish(client, "/test-topic/2", 0, "msq2-qos0".getBytes());
+ publish(client, "/test-topic/3", 1, "msq3-qos1".getBytes());
+ publish(client, "/test-topic/4", 2, "msq3-qos2".getBytes());
+ publish(client, topic, 0, "msq4-qos0".getBytes());
+ publish(client, topic, 1, "msq4-qos1".getBytes());
+
+
+ assertEquals(3, receivedMessages.size());
+ disconnect(client);
+ disconnect(client2);
+ }
+
+ @Test public void publishMultiple() throws MqttException, InterruptedException {
+ int pubCount = 50;
+ for (int subQos=0; subQos <= 2; subQos++){
+ for (int pubQos=0; pubQos <= 2; pubQos++){
+ // avoid reusing the client in this test as a shared
+ // client cannot handle connection churn very well. MK.
+ String cid = "test-sub-qos-" + subQos + "-pub-qos-" + pubQos;
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newClient(brokerUrl, cid);
+ client.connect(client_opts);
+ client.subscribe(topic, subQos);
+ client.setCallback(this);
+ long start = System.currentTimeMillis();
+ for (int i=0; i<pubCount; i++){
+ publish(client, topic, pubQos, payload);
+ }
+
+ waitAtMost(() -> receivedMessagesSize() == pubCount);
+ System.out.println("publish QOS" + pubQos + " subscribe QOS" + subQos +
+ ", " + pubCount + " msgs took " +
+ (lastReceipt - start)/1000.0 + "sec");
+ client.disconnect(5000);
+ receivedMessages.clear();
+ }
+ }
+ }
+
+ @Test public void topicAuthorisationPublish(TestInfo info) throws Exception {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ client.subscribe("some/test-topic");
+ publish(client, "some/test-topic", 1, "content".getBytes());
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertTrue(client.isConnected());
+ try {
+ publish(client, "forbidden-topic", 1, "content".getBytes());
+ fail("Publishing on a forbidden topic, an exception should have been thrown");
+ client.disconnect();
+ } catch(Exception e) {
+ // OK
+ }
+ }
+
+ @Test public void topicAuthorisationSubscribe(TestInfo info) throws Exception {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ client.subscribe("some/test-topic");
+ try {
+ client.subscribe("forbidden-topic");
+ fail("Subscribing to a forbidden topic, an exception should have been thrown");
+ client.disconnect();
+ } catch(Exception e) {
+ // OK
+ e.printStackTrace();
+ }
+ }
+
+ @Test public void lastWillDowngradesQoS2(TestInfo info) throws Exception {
+ String lastWillTopic = "test-topic-will-downgrades-qos";
+
+ MqttConnectOptions client2Opts = new TestMqttConnectOptions();
+ MqttClient client2 = newConnectedClient(info, client2Opts);
+ client2.subscribe(lastWillTopic);
+ client2.setCallback(this);
+
+ final SocketFactory factory = SocketFactory.getDefault();
+ final ArrayList<Socket> sockets = new ArrayList<>();
+ SocketFactory testFactory = new SocketFactory() {
+ public Socket createSocket(String s, int i) throws IOException {
+ Socket sock = factory.createSocket(s, i);
+ sockets.add(sock);
+ return sock;
+ }
+ public Socket createSocket(String s, int i, InetAddress a, int i1) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) {
+ return null;
+ }
+ @Override
+ public Socket createSocket() {
+ Socket sock = new Socket();
+ sockets.add(sock);
+ return sock;
+ }
+ };
+
+ MqttConnectOptions clientOpts = new TestMqttConnectOptions();
+
+ MqttClient client = newClient("test-topic-will-downgrades-qos");
+ clientOpts.setSocketFactory(testFactory);
+ MqttTopic willTopic = client.getTopic(lastWillTopic);
+ clientOpts.setWill(willTopic, payload, 2, false);
+ clientOpts.setCleanSession(false);
+ client.connect(clientOpts);
+
+ waitAtMost(() -> sockets.size() == 1);
+ expectConnectionFailure = true;
+ sockets.get(0).close();
+
+ // let some time after disconnection
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertEquals(1, receivedMessages.size());
+ disconnect(client2);
+ }
+
+ @Test public void lastWillNotSentOnRestrictedTopic(TestInfo info) throws Exception {
+ MqttConnectOptions client2_opts = new TestMqttConnectOptions();
+
+ MqttClient client2 = newConnectedClient(info, client2_opts);
+ // topic authorized for subscription, restricted for publishing
+ String lastWillTopic = "last-will";
+ client2.subscribe(lastWillTopic);
+ client2.setCallback(this);
+
+ final SocketFactory factory = SocketFactory.getDefault();
+ final ArrayList<Socket> sockets = new ArrayList<>();
+ SocketFactory testFactory = new SocketFactory() {
+ public Socket createSocket(String s, int i) throws IOException {
+ Socket sock = factory.createSocket(s, i);
+ sockets.add(sock);
+ return sock;
+ }
+ public Socket createSocket(String s, int i, InetAddress a, int i1) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i) {
+ return null;
+ }
+ public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) {
+ return null;
+ }
+ @Override
+ public Socket createSocket() {
+ Socket sock = new Socket();
+ sockets.add(sock);
+ return sock;
+ }
+ };
+
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+
+ MqttClient client = newClient("last-will-not-sent-on-restricted-topic");
+ client_opts.setSocketFactory(testFactory);
+ MqttTopic willTopic = client.getTopic(lastWillTopic);
+ client_opts.setWill(willTopic, payload, 0, false);
+ client_opts.setCleanSession(false);
+ client.connect(client_opts);
+
+ assertEquals(1, sockets.size());
+ expectConnectionFailure = true;
+ sockets.get(0).close();
+
+ // let some time after disconnection
+ waitForTestDelay();
+ assertEquals(0, receivedMessages.size());
+ disconnect(client2);
+ }
+
+ @Test public void topicAuthorisationVariableExpansion(TestInfo info) throws Exception {
+ final String client_id = clientId(info);
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(client_id, client_opts);
+ client.setCallback(this);
+ String topicWithExpandedVariables = "guest/" + client_id + "/a";
+ client.subscribe(topicWithExpandedVariables);
+ publish(client, topicWithExpandedVariables, 1, "content".getBytes());
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ assertTrue(client.isConnected());
+ try {
+ publish(client, "guest/WrongClientId/a", 1, "content".getBytes());
+ fail("Publishing on a forbidden topic, an exception should have been thrown");
+ client.disconnect();
+ } catch(Exception e) {
+ // OK
+ }
+ }
+
+ @Test public void interopM2A(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException {
+ setUpAmqp();
+ String queue = ch.queueDeclare().getQueue();
+ ch.queueBind(queue, "amq.topic", topic);
+
+ byte[] interopPayload = "interop-body".getBytes();
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ publish(client, topic, 1, interopPayload);
+ disconnect(client);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<byte[]> messageBody = new AtomicReference<>();
+ ch.basicConsume(queue, true, new DefaultConsumer(ch) {
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ messageBody.set(body);
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(EXPECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(new String(interopPayload), new String(messageBody.get()));
+ assertNull(ch.basicGet(queue, true));
+ tearDownAmqp();
+ }
+
+ @Test public void interopA2M(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException {
+ MqttConnectOptions client_opts = new TestMqttConnectOptions();
+ MqttClient client = newConnectedClient(info, client_opts);
+ client.setCallback(this);
+ client.subscribe(topic, 1);
+
+ setUpAmqp();
+ ch.basicPublish("amq.topic", topic, MessageProperties.MINIMAL_BASIC, payload);
+ tearDownAmqp();
+
+ waitAtMost(() -> receivedMessagesSize() == 1);
+ client.disconnect();
+ }
+
+ private void publish(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException {
+ publish(client, topicName, qos, payload, false);
+ }
+
+ private void publish(MqttClient client, String topicName, int qos, byte[] payload, boolean retained) throws MqttException {
+ MqttTopic topic = client.getTopic(topicName);
+ MqttMessage message = new MqttMessage(payload);
+ message.setQos(qos);
+ message.setRetained(retained);
+ MqttDeliveryToken token = topic.publish(message);
+ token.waitForCompletion();
+ }
+
+ private void publishRetained(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException {
+ publish(client, topicName, qos, payload, true);
+ }
+
+ private void clearRetained(MqttClient client, String topicName) throws MqttException {
+ publishRetained(client, topicName, 1, "".getBytes());
+ }
+
+ public void connectionLost(Throwable cause) {
+ if (!expectConnectionFailure)
+ fail("Connection unexpectedly lost");
+ }
+
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ lastReceipt = System.currentTimeMillis();
+ receivedMessages.add(message);
+ if (failOnDelivery) {
+ throw new Exception("unexpected delivery on topic " + topic);
+ }
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ }
+
+ private Integer receivedMessagesSize() {
+ return receivedMessages.size();
+ }
+
+ private void waitForTestDelay() {
+ try {
+ Thread.sleep(testDelay);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void waitAtMost(BooleanSupplier condition) throws InterruptedException {
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ int waitTime = 100;
+ int waitedTime = 0;
+ long timeoutInMs = EXPECT_TIMEOUT.toMillis();
+ while (waitedTime <= timeoutInMs) {
+ Thread.sleep(waitTime);
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ waitedTime += waitTime;
+ }
+ fail("Waited " + EXPECT_TIMEOUT.get(ChronoUnit.SECONDS) + " second(s), condition never got true");
+ }
+}
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh
new file mode 100755
index 0000000000..cba9bcd493
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+CTL=$1
+USER="O=client,CN=$(hostname)"
+
+# Test direct connections
+$CTL add_user "$USER" ''
+$CTL set_permissions -p / "$USER" ".*" ".*" ".*"
+$CTL set_topic_permissions -p / "$USER" "amq.topic" "test-topic|test-retained-topic|.*topic.*" "test-topic|test-retained-topic|.*topic.*|last-will"
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh
new file mode 100644
index 0000000000..2e2282ee07
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh
@@ -0,0 +1,2 @@
+#!/bin/sh -e
+sh -e `dirname $0`/rabbit-test.sh "$DEPS_DIR/rabbit/scripts/rabbitmqctl -n $RABBITMQ_NODENAME"
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java
new file mode 100644
index 0000000000..2ea4c7a638
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java
@@ -0,0 +1,157 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.mqtt.test.tls;
+
+import org.eclipse.paho.client.mqttv3.*;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+/**
+ * MQTT v3.1 tests
+ *
+ */
+
+public class MqttSSLTest implements MqttCallback {
+
+ private final String brokerUrl = "ssl://" + getHost() + ":" + getPort();
+ private String clientId;
+ private String clientId2;
+ private MqttClient client;
+ private MqttClient client2;
+ private MqttConnectOptions conOpt;
+
+ private volatile List<MqttMessage> receivedMessages;
+ private volatile boolean expectConnectionFailure;
+
+ private static String getPort() {
+ Object port = System.getProperty("mqtt.ssl.port");
+ assertNotNull(port);
+ return port.toString();
+ }
+
+ private static String getHost() {
+ Object host = System.getProperty("hostname");
+ assertNotNull(host);
+ return host.toString();
+ }
+
+ // override 10s limit
+ private class MyConnOpts extends MqttConnectOptions {
+ private int keepAliveInterval = 60;
+
+ @Override
+ public void setKeepAliveInterval(int keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ @Override
+ public int getKeepAliveInterval() {
+ return keepAliveInterval;
+ }
+ }
+
+
+ @BeforeEach
+ public void setUp() throws MqttException, IOException {
+ clientId = getClass().getSimpleName() + ((int) (10000 * Math.random()));
+ clientId2 = clientId + "-2";
+ client = new MqttClient(brokerUrl, clientId, null);
+ client2 = new MqttClient(brokerUrl, clientId2, null);
+ conOpt = new MyConnOpts();
+ conOpt.setSocketFactory(MutualAuth.getSSLContextWithoutCert().getSocketFactory());
+ setConOpts(conOpt);
+ receivedMessages = Collections.synchronizedList(new ArrayList<MqttMessage>());
+ expectConnectionFailure = false;
+ }
+
+ @AfterEach
+ public void tearDown() throws MqttException {
+ // clean any sticky sessions
+ setConOpts(conOpt);
+ client = new MqttClient(brokerUrl, clientId, null);
+ try {
+ client.connect(conOpt);
+ client.disconnect();
+ } catch (Exception ignored) {
+ }
+
+ client2 = new MqttClient(brokerUrl, clientId2, null);
+ try {
+ client2.connect(conOpt);
+ client2.disconnect();
+ } catch (Exception ignored) {
+ }
+ }
+
+
+ private void setConOpts(MqttConnectOptions conOpts) {
+ conOpts.setCleanSession(true);
+ conOpts.setKeepAliveInterval(60);
+ }
+
+ @Test
+ public void certLogin() throws MqttException {
+ try {
+ conOpt.setSocketFactory(MutualAuth.getSSLContextWithClientCert().getSocketFactory());
+ client.connect(conOpt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception: " + e.getMessage());
+ }
+ }
+
+
+ @Test public void invalidUser() throws MqttException {
+ conOpt.setUserName("invalid-user");
+ try {
+ client.connect(conOpt);
+ fail("Authentication failure expected");
+ } catch (MqttException ex) {
+ assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception: " + e.getMessage());
+ }
+ }
+
+ @Test public void invalidPassword() throws MqttException {
+ conOpt.setUserName("invalid-user");
+ conOpt.setPassword("invalid-password".toCharArray());
+ try {
+ client.connect(conOpt);
+ fail("Authentication failure expected");
+ } catch (MqttException ex) {
+ assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception: " + e.getMessage());
+ }
+ }
+
+
+ public void connectionLost(Throwable cause) {
+ if (!expectConnectionFailure)
+ fail("Connection unexpectedly lost");
+ }
+
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ receivedMessages.add(message);
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ }
+}
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java
new file mode 100644
index 0000000000..081cae4052
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java
@@ -0,0 +1,89 @@
+package com.rabbitmq.mqtt.test.tls;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.List;
+import java.io.FileInputStream;
+
+
+public class MutualAuth {
+
+ private MutualAuth() {
+
+ }
+
+ private static String getStringProperty(String propertyName) throws IllegalArgumentException {
+ Object value = System.getProperty(propertyName);
+ if (value == null) throw new IllegalArgumentException("Property: " + propertyName + " not found");
+ return value.toString();
+ }
+
+ private static TrustManagerFactory getServerTrustManagerFactory() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException {
+ String keystorePath = System.getProperty("test-keystore.ca");
+ char[] trustPhrase = getStringProperty("test-keystore.password").toCharArray();
+ MutualAuth dummy = new MutualAuth();
+
+ // Server TrustStore
+ KeyStore tks = KeyStore.getInstance("JKS");
+ tks.load(new FileInputStream(keystorePath), trustPhrase);
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+ tmf.init(tks);
+
+ return tmf;
+ }
+
+ public static SSLContext getSSLContextWithClientCert() throws IOException {
+
+ char[] clientPhrase = getStringProperty("test-client-cert.password").toCharArray();
+
+ String p12Path = System.getProperty("test-client-cert.path");
+
+ MutualAuth dummy = new MutualAuth();
+ try {
+ SSLContext sslContext = getVanillaSSLContext();
+ // Client Keystore
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ ks.load(new FileInputStream(p12Path), clientPhrase);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, clientPhrase);
+
+ sslContext.init(kmf.getKeyManagers(), getServerTrustManagerFactory().getTrustManagers(), null);
+ return sslContext;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ }
+
+ private static SSLContext getVanillaSSLContext() throws NoSuchAlgorithmException {
+ SSLContext result = null;
+ List<String> xs = Arrays.asList("TLSv1.2", "TLSv1.1", "TLSv1");
+ for(String x : xs) {
+ try {
+ return SSLContext.getInstance(x);
+ } catch (NoSuchAlgorithmException nae) {
+ // keep trying
+ }
+ }
+ throw new NoSuchAlgorithmException("Could not obtain an SSLContext for TLS 1.0-1.2");
+ }
+
+ public static SSLContext getSSLContextWithoutCert() throws IOException {
+ try {
+ SSLContext sslContext = getVanillaSSLContext();
+ sslContext.init(null, getServerTrustManagerFactory().getTrustManagers(), null);
+ return sslContext;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy
new file mode 100644
index 0000000000..6864a41e29
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy
@@ -0,0 +1,10 @@
+def dir = new File(project.build.directory)
+
+dir.mkdir()
+
+// This pattern starts with `.*`. This is normally useless and even
+// inefficient but the matching doesn't work without it...
+def pattern = ~/.*\.keystore$/
+dir.eachFileMatch(pattern) { file ->
+ file.delete()
+}
diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl
new file mode 100644
index 0000000000..abdc3506dc
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl
@@ -0,0 +1,73 @@
+-module(mqtt_machine_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("mqtt_machine.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ basics
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+basics(_Config) ->
+ S0 = mqtt_machine:init(#{}),
+ ClientId = <<"id1">>,
+ {S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0),
+ ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1),
+ {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1),
+ ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2),
+ {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
+ ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3),
+ {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2),
+ ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4),
+
+ ok.
+
+%% Utility
+
+meta(Idx) ->
+ #{index => Idx,
+ term => 1,
+ ts => erlang:system_time(millisecond)}.
diff --git a/deps/rabbitmq_mqtt/test/processor_SUITE.erl b/deps/rabbitmq_mqtt/test/processor_SUITE.erl
new file mode 100644
index 0000000000..e38a1d5318
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/processor_SUITE.erl
@@ -0,0 +1,211 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+
+-module(processor_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ ignores_colons_in_username_if_option_set,
+ interprets_colons_in_username_if_option_not_set,
+ get_vhosts_from_global_runtime_parameter,
+ get_vhost,
+ add_client_id_to_adapter_info
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+init_per_suite(Config) ->
+ ok = application:load(rabbitmq_mqtt),
+ Config.
+end_per_suite(Config) ->
+ ok = application:unload(rabbitmq_mqtt),
+ Config.
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+init_per_testcase(get_vhost, Config) ->
+ mnesia:start(),
+ mnesia:create_table(rabbit_runtime_parameters, [
+ {attributes, record_info(fields, runtime_parameters)},
+ {record_name, runtime_parameters}]),
+ Config;
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(get_vhost, Config) ->
+ mnesia:stop(),
+ Config;
+end_per_testcase(_, Config) -> Config.
+
+ignore_colons(B) -> application:set_env(rabbitmq_mqtt, ignore_colons_in_username, B).
+
+ignores_colons_in_username_if_option_set(_Config) ->
+ ignore_colons(true),
+ ?assertEqual({rabbit_mqtt_util:env(vhost), <<"a:b:c">>},
+ rabbit_mqtt_processor:get_vhost_username(<<"a:b:c">>)).
+
+interprets_colons_in_username_if_option_not_set(_Config) ->
+ ignore_colons(false),
+ ?assertEqual({<<"a:b">>, <<"c">>},
+ rabbit_mqtt_processor:get_vhost_username(<<"a:b:c">>)).
+
+get_vhosts_from_global_runtime_parameter(_Config) ->
+ MappingParameter = [
+ {<<"O=client,CN=dummy1">>, <<"vhost1">>},
+ {<<"O=client,CN=dummy2">>, <<"vhost2">>}
+ ],
+ <<"vhost1">> = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy1">>, MappingParameter),
+ <<"vhost2">> = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy2">>, MappingParameter),
+ undefined = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy3">>, MappingParameter),
+ undefined = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy3">>, not_found).
+
+get_vhost(_Config) ->
+ clear_vhost_global_parameters(),
+
+ %% not a certificate user, no cert/vhost mapping, no vhost in user
+ %% should use default vhost
+ {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883),
+ {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, undefined, 1883),
+ clear_vhost_global_parameters(),
+
+ %% not a certificate user, no cert/vhost mapping, vhost in user
+ %% should use vhost in user
+ {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"somevhost:guest">>, none, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, no cert/vhost mapping
+ %% should use default vhost
+ {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, cert/vhost mapping with global runtime parameter
+ %% should use mapping
+ set_global_parameter(mqtt_default_vhosts, [
+ {<<"O=client,CN=dummy">>, <<"somevhost">>},
+ {<<"O=client,CN=otheruser">>, <<"othervhost">>}
+ ]),
+ {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, cert/vhost mapping with global runtime parameter, but no key for the user
+ %% should use default vhost
+ set_global_parameter(mqtt_default_vhosts, [{<<"O=client,CN=otheruser">>, <<"somevhost">>}]),
+ {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% not a certificate user, port/vhost mapping
+ %% should use mapping
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1883">>, <<"somevhost">>},
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883),
+ clear_vhost_global_parameters(),
+
+ %% not a certificate user, port/vhost mapping, but vhost in username
+ %% vhost in username should take precedence
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1883">>, <<"somevhost">>},
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"vhostinusername">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"vhostinusername:guest">>, none, 1883),
+ clear_vhost_global_parameters(),
+
+ %% not a certificate user, port/vhost mapping, but no mapping for this port
+ %% should use default vhost
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, port/vhost parameter, mapping, no cert/vhost mapping
+ %% should use port/vhost mapping
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1883">>, <<"somevhost">>},
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, port/vhost parameter but no mapping, cert/vhost mapping
+ %% should use cert/vhost mapping
+ set_global_parameter(mqtt_default_vhosts, [
+ {<<"O=client,CN=dummy">>, <<"somevhost">>},
+ {<<"O=client,CN=otheruser">>, <<"othervhost">>}
+ ]),
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, port/vhost parameter, cert/vhost parameter
+ %% cert/vhost parameter takes precedence
+ set_global_parameter(mqtt_default_vhosts, [
+ {<<"O=client,CN=dummy">>, <<"cert-somevhost">>},
+ {<<"O=client,CN=otheruser">>, <<"othervhost">>}
+ ]),
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1883">>, <<"port-vhost">>},
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"cert-somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883),
+ clear_vhost_global_parameters(),
+
+ %% certificate user, no port/vhost or cert/vhost mapping, vhost in username
+ %% should use vhost in username
+ {_, {<<"vhostinusername">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"vhostinusername:guest">>, <<"O=client,CN=dummy">>, 1883),
+
+ %% not a certificate user, port/vhost parameter, cert/vhost parameter
+ %% port/vhost mapping is used, as cert/vhost should not be used
+ set_global_parameter(mqtt_default_vhosts, [
+ {<<"O=cert">>, <<"cert-somevhost">>},
+ {<<"O=client,CN=otheruser">>, <<"othervhost">>}
+ ]),
+ set_global_parameter(mqtt_port_to_vhost_mapping, [
+ {<<"1883">>, <<"port-vhost">>},
+ {<<"1884">>, <<"othervhost">>}
+ ]),
+ {_, {<<"port-vhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883),
+ clear_vhost_global_parameters(),
+ ok.
+
+add_client_id_to_adapter_info(_Config) ->
+ TestFun = fun(AdapterInfo) ->
+ Info0 = rabbit_mqtt_processor:add_client_id_to_adapter_info(<<"my-client-id">>, AdapterInfo),
+ AdditionalInfo0 = Info0#amqp_adapter_info.additional_info,
+ ?assertEqual(#{<<"client_id">> => <<"my-client-id">>}, proplists:get_value(variable_map, AdditionalInfo0)),
+ ClientProperties = proplists:get_value(client_properties, AdditionalInfo0),
+ ?assertEqual([{client_id,longstr,<<"my-client-id">>}], ClientProperties)
+ end,
+ lists:foreach(TestFun, [#amqp_adapter_info{}, #amqp_adapter_info{additional_info = [{client_properties, []}]}]),
+ ok.
+
+set_global_parameter(Key, Term) ->
+ InsertParameterFun = fun () ->
+ mnesia:write(rabbit_runtime_parameters, #runtime_parameters{key = Key, value = Term}, write)
+ end,
+
+ {atomic, ok} = mnesia:transaction(InsertParameterFun).
+
+clear_vhost_global_parameters() ->
+ DeleteParameterFun = fun () ->
+ ok = mnesia:delete(rabbit_runtime_parameters, mqtt_default_vhosts, write),
+ ok = mnesia:delete(rabbit_runtime_parameters, mqtt_port_to_vhost_mapping, write)
+ end,
+ {atomic, ok} = mnesia:transaction(DeleteParameterFun).
diff --git a/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl
new file mode 100644
index 0000000000..5403de23d3
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl
@@ -0,0 +1,125 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(proxy_protocol_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TIMEOUT, 5000).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ proxy_protocol,
+ proxy_protocol_tls
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_certspwd, "bunnychow"},
+ {rabbitmq_ct_tls_verify, verify_none}
+ ]),
+ MqttConfig = mqtt_config(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+mqtt_config() ->
+ {rabbitmq_mqtt, [
+ {proxy_protocol, true},
+ {ssl_cert_login, true},
+ {allow_anonymous, true}]}.
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+proxy_protocol(Config) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
+ [binary, {active, false}, {packet, raw}]),
+ ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
+ ok = inet:send(Socket, mqtt_3_1_1_connect_frame()),
+ {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
+ ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, connection_name, []),
+ match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
+ gen_tcp:close(Socket),
+ ok.
+
+proxy_protocol_tls(Config) ->
+ app_utils:start_applications([asn1, crypto, public_key, ssl]),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
+ {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
+ [binary, {active, false}, {packet, raw}]),
+ ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
+ {ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT),
+ ok = ssl:send(SslSocket, mqtt_3_1_1_connect_frame()),
+ {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT),
+ ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, connection_name, []),
+ match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
+ gen_tcp:close(Socket),
+ ok.
+
+connection_name() ->
+ Connections = ets:tab2list(connection_created),
+ {_Key, Values} = lists:nth(1, Connections),
+ {_, Name} = lists:keyfind(name, 1, Values),
+ Name.
+
+merge_app_env(MqttConfig, Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, MqttConfig).
+
+mqtt_3_1_1_connect_frame() ->
+ <<16,
+ 24,
+ 0,
+ 4,
+ 77,
+ 81,
+ 84,
+ 84,
+ 4,
+ 2,
+ 0,
+ 60,
+ 0,
+ 12,
+ 84,
+ 101,
+ 115,
+ 116,
+ 67,
+ 111,
+ 110,
+ 115,
+ 117,
+ 109,
+ 101,
+ 114>>.
diff --git a/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl b/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl
new file mode 100644
index 0000000000..5272138c6b
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl
@@ -0,0 +1,45 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% A mock authn/authz that records information during calls. For testing purposes only.
+
+-module(rabbit_auth_backend_mqtt_mock).
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
+
+-export([user_login_authentication/2, user_login_authorization/2,
+ check_vhost_access/3, check_resource_access/4, check_topic_access/4,
+ state_can_expire/0,
+ get/1]).
+
+user_login_authentication(_, AuthProps) ->
+ ets:new(?MODULE, [set, public, named_table]),
+ ets:insert(?MODULE, {authentication, AuthProps}),
+ {ok, #auth_user{username = <<"dummy">>,
+ tags = [],
+ impl = none}}.
+
+user_login_authorization(_, _) ->
+ io:format("login authorization"),
+ {ok, does_not_matter}.
+
+check_vhost_access(#auth_user{}, _VHostPath, AuthzData) ->
+ ets:insert(?MODULE, {vhost_access, AuthzData}),
+ true.
+check_resource_access(#auth_user{}, #resource{}, _Permission, AuthzContext) ->
+ ets:insert(?MODULE, {resource_access, AuthzContext}),
+ true.
+check_topic_access(#auth_user{}, #resource{}, _Permission, TopicContext) ->
+ ets:insert(?MODULE, {topic_access, TopicContext}),
+ true.
+
+state_can_expire() -> false.
+
+get(K) ->
+ ets:lookup(?MODULE, K).
diff --git a/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app b/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app
new file mode 100644
index 0000000000..c4083ec5fc
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app
@@ -0,0 +1,19 @@
+{application, rabbitmq_mqtt,
+ [{description, "RabbitMQ MQTT Adapter"},
+ {vsn, "%%VSN%%"},
+ {modules, []},
+ {registered, []},
+ {mod, {rabbit_mqtt, []}},
+ {env, [{default_user, "guest_user"},
+ {default_pass, "guest_pass"},
+ {ssl_cert_login,false},
+ {allow_anonymous, true},
+ {vhost, "/"},
+ {exchange, "amq.topic"},
+ {subscription_ttl, 1800000}, % 30 min
+ {prefetch, 10},
+ {ssl_listeners, []},
+ {tcp_listeners, [1883]},
+ {tcp_listen_options, [{backlog, 128},
+ {nodelay, true}]}]},
+ {applications, [kernel, stdlib, rabbit, amqp_client]}]}.
diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
new file mode 100644
index 0000000000..b94fdb5920
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
@@ -0,0 +1,166 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(reader_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ block,
+ handle_invalid_frames,
+ stats
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, basic},
+ {collect_statistics_interval, 100}
+ ]}).
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Testsuite cases
+%% -------------------------------------------------------------------
+
+block(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ %% Only here to ensure the connection is really up
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+ emqttc:unsubscribe(C, [<<"TopicA">>]),
+
+ emqttc:subscribe(C, <<"Topic1">>, qos0),
+
+ %% Not blocked
+ {ok, _} = emqttc:sync_publish(C, <<"Topic1">>, <<"Not blocked yet">>,
+ [{qos, 1}]),
+
+ ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]),
+ ok = rpc(Config, rabbit_alarm, set_alarm, [{{resource_limit, memory, node()}, []}]),
+
+ %% Let it block
+ timer:sleep(100),
+ %% Blocked, but still will publish
+ {error, ack_timeout} = emqttc:sync_publish(C, <<"Topic1">>, <<"Now blocked">>,
+ [{qos, 1}]),
+
+ %% Blocked
+ {error, ack_timeout} = emqttc:sync_publish(C, <<"Topic1">>,
+ <<"Blocked">>, [{qos, 1}]),
+
+ rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
+ rpc(Config, rabbit_alarm, clear_alarm, [{resource_limit, memory, node()}]),
+
+ %% Let alarms clear
+ timer:sleep(1000),
+
+ expect_publishes(<<"Topic1">>, [<<"Not blocked yet">>,
+ <<"Now blocked">>,
+ <<"Blocked">>]),
+
+ emqttc:disconnect(C).
+
+handle_invalid_frames(Config) ->
+ N = rpc(Config, ets, info, [connection_metrics, size]),
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = gen_tcp:connect("localhost", P, []),
+ Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*">>,
+ gen_tcp:send(C, Bin),
+ gen_tcp:close(C),
+ %% No new stats entries should be inserted as connection never got to initialize
+ N = rpc(Config, ets, info, [connection_metrics, size]).
+
+stats(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ %% CMN = rpc(Config, ets, info, [connection_metrics, size]),
+ %% CCMN = rpc(Config, ets, info, [connection_coarse_metrics, size]),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ %% Ensure that there are some stats
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+ emqttc:unsubscribe(C, [<<"TopicA">>]),
+ timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
+ %% Retrieve the connection Pid
+ [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []),
+ [{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]),
+ %% Verify the content of the metrics, garbage_collection must be present
+ [{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]),
+ true = proplists:is_defined(garbage_collection, Props),
+ %% If the coarse entry is present, stats were successfully emitted
+ [{Pid, _, _, _, _}] = rpc(Config, ets, lookup,
+ [connection_coarse_metrics, Pid]),
+ emqttc:disconnect(C).
+
+expect_publishes(_Topic, []) -> ok;
+expect_publishes(Topic, [Payload|Rest]) ->
+ receive
+ {publish, Topic, Payload} -> expect_publishes(Topic, Rest)
+ after 5000 ->
+ throw({publish_not_delivered, Payload})
+ end.
+
+rpc(Config, M, F, A) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A).
diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl
new file mode 100644
index 0000000000..22b72a8d87
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl
@@ -0,0 +1,144 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(retainer_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ coerce_configuration_data,
+ should_translate_amqp2mqtt_on_publish,
+ should_translate_amqp2mqtt_on_retention,
+ should_translate_amqp2mqtt_on_retention_search
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 600}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]}
+ ]),
+ % see https://github.com/rabbitmq/rabbitmq-mqtt/issues/86
+ RabbitConfig = {rabbit, [
+ {default_user, "guest"},
+ {default_pass, "guest"},
+ {default_vhost, "/"},
+ {default_permissions, [".*", ".*", ".*"]}
+ ]},
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun(Conf) -> merge_app_env(RabbitConfig, Conf) end ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+merge_app_env(MqttConfig, Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, MqttConfig).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Testsuite cases
+%% -------------------------------------------------------------------
+
+coerce_configuration_data(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link(connection_opts(P)),
+
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ emqttc:disconnect(C),
+ ok.
+
+%% -------------------------------------------------------------------
+%% When a client is subscribed to TopicA/Device.Field and another
+%% client publishes to TopicA/Device.Field the client should be
+%% sent messages for the translated topic (TopicA/Device/Field)
+%% -------------------------------------------------------------------
+should_translate_amqp2mqtt_on_publish(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link(connection_opts(P)),
+ %% there's an active consumer
+ emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
+ emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
+ expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
+ emqttc:disconnect(C).
+
+%% -------------------------------------------------------------------
+%% If a client is publishes a retained message to TopicA/Device.Field and another
+%% client subscribes to TopicA/Device.Field the client should be
+%% sent the retained message for the translated topic (TopicA/Device/Field)
+%% -------------------------------------------------------------------
+should_translate_amqp2mqtt_on_retention(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link(connection_opts(P)),
+ %% publish with retain = true before a consumer comes around
+ emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
+ emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
+ expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
+ emqttc:disconnect(C).
+
+%% -------------------------------------------------------------------
+%% If a client is publishes a retained message to TopicA/Device.Field and another
+%% client subscribes to TopicA/Device/Field the client should be
+%% sent retained message for the translated topic (TopicA/Device/Field)
+%% -------------------------------------------------------------------
+should_translate_amqp2mqtt_on_retention_search(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link(connection_opts(P)),
+ emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
+ emqttc:subscribe(C, <<"TopicA/Device/Field">>, qos1),
+ expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
+ emqttc:disconnect(C).
+
+connection_opts(Port) ->
+ [{host, "localhost"},
+ {port, Port},
+ {client_id, <<"simpleClientRetainer">>},
+ {proto_ver,3},
+ {logger, info},
+ {puback_timeout, 1}].
+
+ expect_publishes(_Topic, []) -> ok;
+ expect_publishes(Topic, [Payload | Rest]) ->
+ receive
+ {publish, Topic, Payload} -> expect_publishes(Topic, Rest)
+ after 1500 ->
+ throw({publish_not_delivered, Payload})
+ end.
diff --git a/deps/rabbitmq_mqtt/test/util_SUITE.erl b/deps/rabbitmq_mqtt/test/util_SUITE.erl
new file mode 100644
index 0000000000..6694498595
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/util_SUITE.erl
@@ -0,0 +1,80 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module(util_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, util_tests}
+ ].
+
+groups() ->
+ [
+ {util_tests, [parallel], [
+ coerce_exchange,
+ coerce_vhost,
+ coerce_default_user,
+ coerce_default_pass,
+ mqtt_amqp_topic_translation
+ ]
+ }
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+init_per_suite(Config) ->
+ ok = application:load(rabbitmq_mqtt),
+ Config.
+end_per_suite(Config) ->
+ ok = application:unload(rabbitmq_mqtt),
+ Config.
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+coerce_exchange(_) ->
+ ?assertEqual(<<"amq.topic">>, rabbit_mqtt_util:env(exchange)).
+
+coerce_vhost(_) ->
+ ?assertEqual(<<"/">>, rabbit_mqtt_util:env(vhost)).
+
+coerce_default_user(_) ->
+ ?assertEqual(<<"guest_user">>, rabbit_mqtt_util:env(default_user)).
+
+coerce_default_pass(_) ->
+ ?assertEqual(<<"guest_pass">>, rabbit_mqtt_util:env(default_pass)).
+
+mqtt_amqp_topic_translation(_) ->
+ ok = application:set_env(rabbitmq_mqtt, sparkplug, true),
+ {ok, {mqtt2amqp_fun, Mqtt2AmqpFun}, {amqp2mqtt_fun, Amqp2MqttFun}} =
+ rabbit_mqtt_util:get_topic_translation_funs(),
+
+ T0 = "/foo/bar/+/baz",
+ T0_As_Amqp = <<".foo.bar.*.baz">>,
+ T0_As_Mqtt = <<"/foo/bar/+/baz">>,
+ ?assertEqual(T0_As_Amqp, Mqtt2AmqpFun(T0)),
+ ?assertEqual(T0_As_Mqtt, Amqp2MqttFun(T0_As_Amqp)),
+
+ T1 = "spAv1.0/foo/bar/+/baz",
+ T1_As_Amqp = <<"spAv1___0.foo.bar.*.baz">>,
+ T1_As_Mqtt = <<"spAv1.0/foo/bar/+/baz">>,
+ ?assertEqual(T1_As_Amqp, Mqtt2AmqpFun(T1)),
+ ?assertEqual(T1_As_Mqtt, Amqp2MqttFun(T1_As_Amqp)),
+
+ T2 = "spBv2.90/foo/bar/+/baz",
+ T2_As_Amqp = <<"spBv2___90.foo.bar.*.baz">>,
+ T2_As_Mqtt = <<"spBv2.90/foo/bar/+/baz">>,
+ ?assertEqual(T2_As_Amqp, Mqtt2AmqpFun(T2)),
+ ?assertEqual(T2_As_Mqtt, Amqp2MqttFun(T2_As_Amqp)),
+
+ ok = application:unset_env(rabbitmq_mqtt, sparkplug),
+ ok.