diff options
Diffstat (limited to 'deps/rabbitmq_mqtt/test')
32 files changed, 4066 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl new file mode 100644 index 0000000000..7368139d95 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -0,0 +1,493 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(auth_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-define(CONNECT_TIMEOUT, 10000). + +all() -> + [{group, anonymous_no_ssl_user}, + {group, anonymous_ssl_user}, + {group, no_ssl_user}, + {group, ssl_user}, + {group, client_id_propagation}]. + +groups() -> + [{anonymous_ssl_user, [], + [anonymous_auth_success, + user_credentials_auth, + ssl_user_auth_success, + ssl_user_vhost_not_allowed, + ssl_user_vhost_parameter_mapping_success, + ssl_user_vhost_parameter_mapping_not_allowed, + ssl_user_vhost_parameter_mapping_vhost_does_not_exist, + ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping + ]}, + {anonymous_no_ssl_user, [], + [anonymous_auth_success, + user_credentials_auth, + port_vhost_mapping_success, + port_vhost_mapping_success_no_mapping, + port_vhost_mapping_not_allowed, + port_vhost_mapping_vhost_does_not_exist + %% SSL auth will succeed, because we cannot ignore anonymous + ]}, + {ssl_user, [], + [anonymous_auth_failure, + user_credentials_auth, + ssl_user_auth_success, + ssl_user_vhost_not_allowed, + ssl_user_vhost_parameter_mapping_success, + ssl_user_vhost_parameter_mapping_not_allowed, + ssl_user_vhost_parameter_mapping_vhost_does_not_exist, + ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping + ]}, + {no_ssl_user, [], + [anonymous_auth_failure, + user_credentials_auth, + ssl_user_auth_failure, + port_vhost_mapping_success, + port_vhost_mapping_success_no_mapping, + port_vhost_mapping_not_allowed, + port_vhost_mapping_vhost_does_not_exist + ]}, + {client_id_propagation, [], + [client_id_propagation] + } + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_certspwd, "bunnychow"} + ]), + MqttConfig = mqtt_config(Group), + AuthConfig = auth_config(Group), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++ + [ fun(Conf) -> case AuthConfig of + undefined -> Conf; + _ -> merge_app_env(AuthConfig, Conf) + end + end ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +merge_app_env(MqttConfig, Config) -> + rabbit_ct_helpers:merge_app_env(Config, MqttConfig). + +mqtt_config(anonymous_ssl_user) -> + {rabbitmq_mqtt, [{ssl_cert_login, true}, + {allow_anonymous, true}]}; +mqtt_config(anonymous_no_ssl_user) -> + {rabbitmq_mqtt, [{ssl_cert_login, false}, + {allow_anonymous, true}]}; +mqtt_config(ssl_user) -> + {rabbitmq_mqtt, [{ssl_cert_login, true}, + {allow_anonymous, false}]}; +mqtt_config(no_ssl_user) -> + {rabbitmq_mqtt, [{ssl_cert_login, false}, + {allow_anonymous, false}]}; +mqtt_config(client_id_propagation) -> + {rabbitmq_mqtt, [{ssl_cert_login, true}, + {allow_anonymous, true}]}. + +auth_config(client_id_propagation) -> + {rabbit, [ + {auth_backends, [rabbit_auth_backend_mqtt_mock]} + ] + }; +auth_config(_) -> + undefined. + +init_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success; + Testcase == ssl_user_auth_failure -> + Config1 = set_cert_user_on_default_vhost(Config), + rabbit_ct_helpers:testcase_started(Config1, Testcase); +init_per_testcase(ssl_user_vhost_parameter_mapping_success, Config) -> + Config1 = set_cert_user_on_default_vhost(Config), + User = ?config(temp_ssl_user, Config1), + ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>), + Config2 = set_vhost_for_cert_user(Config1, User), + rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_success); +init_per_testcase(ssl_user_vhost_parameter_mapping_not_allowed, Config) -> + Config1 = set_cert_user_on_default_vhost(Config), + User = ?config(temp_ssl_user, Config1), + Config2 = set_vhost_for_cert_user(Config1, User), + VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2), + ok = rabbit_ct_broker_helpers:clear_permissions(Config2, User, VhostForCertUser), + rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_not_allowed); +init_per_testcase(user_credentials_auth, Config) -> + User = <<"new-user">>, + Pass = <<"new-user-pass">>, + ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, Pass), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>), + Config1 = rabbit_ct_helpers:set_config(Config, [{new_user, User}, + {new_user_pass, Pass}]), + rabbit_ct_helpers:testcase_started(Config1, user_credentials_auth); +init_per_testcase(ssl_user_vhost_not_allowed, Config) -> + Config1 = set_cert_user_on_default_vhost(Config), + User = ?config(temp_ssl_user, Config1), + ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>), + rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_not_allowed); +init_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) -> + Config1 = set_cert_user_on_default_vhost(Config), + User = ?config(temp_ssl_user, Config1), + Config2 = set_vhost_for_cert_user(Config1, User), + VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser), + rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_parameter_mapping_vhost_does_not_exist); +init_per_testcase(port_vhost_mapping_success, Config) -> + User = <<"guest">>, + Config1 = set_vhost_for_port_vhost_mapping_user(Config, User), + rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>), + rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success); +init_per_testcase(port_vhost_mapping_success_no_mapping, Config) -> + User = <<"guest">>, + Config1 = set_vhost_for_port_vhost_mapping_user(Config, User), + PortToVHostMappingParameter = [ + {<<"1">>, <<"unlikely to exist">>}, + {<<"2">>, <<"unlikely to exist">>}], + ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter), + VHost = ?config(temp_vhost_for_port_mapping, Config1), + rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost), + rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success_no_mapping); +init_per_testcase(port_vhost_mapping_not_allowed, Config) -> + User = <<"guest">>, + Config1 = set_vhost_for_port_vhost_mapping_user(Config, User), + rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>), + VHost = ?config(temp_vhost_for_port_mapping, Config1), + rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost), + rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_not_allowed); +init_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) -> + User = <<"guest">>, + Config1 = set_vhost_for_port_vhost_mapping_user(Config, User), + rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>), + VHost = ?config(temp_vhost_for_port_mapping, Config1), + rabbit_ct_broker_helpers:delete_vhost(Config1, VHost), + rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_vhost_does_not_exist); +init_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) -> + Config1 = set_cert_user_on_default_vhost(Config), + User = ?config(temp_ssl_user, Config1), + Config2 = set_vhost_for_cert_user(Config1, User), + + Config3 = set_vhost_for_port_vhost_mapping_user(Config2, User), + VhostForPortMapping = ?config(mqtt_port_to_vhost_mapping, Config2), + rabbit_ct_broker_helpers:clear_permissions(Config3, User, VhostForPortMapping), + + rabbit_ct_broker_helpers:clear_permissions(Config3, User, <<"/">>), + rabbit_ct_helpers:testcase_started(Config3, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping); +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +set_cert_user_on_default_vhost(Config) -> + CertsDir = ?config(rmq_certsdir, Config), + CertFile = filename:join([CertsDir, "client", "cert.pem"]), + {ok, CertBin} = file:read_file(CertFile), + [{'Certificate', Cert, not_encrypted}] = public_key:pem_decode(CertBin), + UserBin = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_ssl, + peer_cert_auth_name, + [Cert]), + User = binary_to_list(UserBin), + ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, ""), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>), + rabbit_ct_helpers:set_config(Config, [{temp_ssl_user, User}]). + +set_vhost_for_cert_user(Config, User) -> + VhostForCertUser = <<"vhost_for_cert_user">>, + UserToVHostMappingParameter = [ + {rabbit_data_coercion:to_binary(User), VhostForCertUser}, + {<<"O=client,CN=unlikelytoexistuser">>, <<"vhost2">>} + ], + ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForCertUser), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForCertUser), + ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_default_vhosts, UserToVHostMappingParameter), + rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_ssl_user, VhostForCertUser}]). + +set_vhost_for_port_vhost_mapping_user(Config, User) -> + VhostForPortMapping = <<"vhost_for_port_vhost_mapping">>, + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + TlsPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), + PortToVHostMappingParameter = [ + {integer_to_binary(Port), VhostForPortMapping}, + {<<"1884">>, <<"vhost2">>}, + {integer_to_binary(TlsPort), VhostForPortMapping}, + {<<"8884">>, <<"vhost2">>} + + ], + ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForPortMapping), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForPortMapping), + ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter), + rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_port_mapping, VhostForPortMapping}]). + +end_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success; + Testcase == ssl_user_auth_failure; + Testcase == ssl_user_vhost_not_allowed -> + delete_cert_user(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase); +end_per_testcase(TestCase, Config) when TestCase == ssl_user_vhost_parameter_mapping_success; + TestCase == ssl_user_vhost_parameter_mapping_not_allowed -> + delete_cert_user(Config), + VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts), + rabbit_ct_helpers:testcase_finished(Config, TestCase); +end_per_testcase(user_credentials_auth, Config) -> + User = ?config(new_user, Config), + {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]), + rabbit_ct_helpers:testcase_finished(Config, user_credentials_auth); +end_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) -> + delete_cert_user(Config), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts), + rabbit_ct_helpers:testcase_finished(Config, ssl_user_vhost_parameter_mapping_vhost_does_not_exist); +end_per_testcase(Testcase, Config) when Testcase == port_vhost_mapping_success; + Testcase == port_vhost_mapping_not_allowed; + Testcase == port_vhost_mapping_success_no_mapping -> + User = <<"guest">>, + rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>), + VHost = ?config(temp_vhost_for_port_mapping, Config), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + rabbit_ct_helpers:testcase_finished(Config, Testcase); +end_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) -> + User = <<"guest">>, + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + rabbit_ct_helpers:testcase_finished(Config, port_vhost_mapping_vhost_does_not_exist); +end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) -> + delete_cert_user(Config), + VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts), + + VHostForPortVHostMapping = ?config(temp_vhost_for_port_mapping, Config), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping), + ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping); +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +delete_cert_user(Config) -> + User = ?config(temp_ssl_user, Config), + {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]). + +anonymous_auth_success(Config) -> + expect_successful_connection(fun connect_anonymous/1, Config). + +anonymous_auth_failure(Config) -> + expect_authentication_failure(fun connect_anonymous/1, Config). + + +ssl_user_auth_success(Config) -> + expect_successful_connection(fun connect_ssl/1, Config). + +ssl_user_auth_failure(Config) -> + expect_authentication_failure(fun connect_ssl/1, Config). + +user_credentials_auth(Config) -> + NewUser = ?config(new_user, Config), + NewUserPass = ?config(new_user_pass, Config), + + expect_successful_connection( + fun(Conf) -> connect_user(NewUser, NewUserPass, Conf) end, + Config), + + expect_successful_connection( + fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end, + Config), + + expect_successful_connection( + fun(Conf) -> connect_user(<<"/:guest">>, <<"guest">>, Conf) end, + Config), + + expect_authentication_failure( + fun(Conf) -> connect_user(NewUser, <<"invalid_pass">>, Conf) end, + Config), + + expect_authentication_failure( + fun(Conf) -> connect_user(undefined, <<"pass">>, Conf) end, + Config), + + expect_authentication_failure( + fun(Conf) -> connect_user(NewUser, undefined, Conf) end, + Config), + + expect_authentication_failure( + fun(Conf) -> connect_user(<<"non-existing-vhost:guest">>, <<"guest">>, Conf) end, + Config). + +ssl_user_vhost_parameter_mapping_success(Config) -> + expect_successful_connection(fun connect_ssl/1, Config). + +ssl_user_vhost_parameter_mapping_not_allowed(Config) -> + expect_authentication_failure(fun connect_ssl/1, Config). + +ssl_user_vhost_not_allowed(Config) -> + expect_authentication_failure(fun connect_ssl/1, Config). + +ssl_user_vhost_parameter_mapping_vhost_does_not_exist(Config) -> + expect_authentication_failure(fun connect_ssl/1, Config). + +port_vhost_mapping_success(Config) -> + expect_successful_connection( + fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end, + Config). + +port_vhost_mapping_success_no_mapping(Config) -> + %% no vhost mapping for the port, falling back to default vhost + %% where the user can connect + expect_successful_connection( + fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end, + Config + ). + +port_vhost_mapping_not_allowed(Config) -> + expect_authentication_failure( + fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end, + Config + ). + +port_vhost_mapping_vhost_does_not_exist(Config) -> + expect_authentication_failure( + fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end, + Config + ). + +ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping(Config) -> + expect_successful_connection(fun connect_ssl/1, Config). + +connect_anonymous(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}]). + +connect_ssl(Config) -> + CertsDir = ?config(rmq_certsdir, Config), + SSLConfig = [{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}, + {certfile, filename:join([CertsDir, "client", "cert.pem"])}, + {keyfile, filename:join([CertsDir, "client", "key.pem"])}], + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), + emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {ssl, SSLConfig}]). + +client_id_propagation(Config) -> + ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, + rabbit_auth_backend_mqtt_mock), + ClientId = <<"client-id-propagation">>, + {ok, C} = connect_user(<<"client-id-propagation">>, <<"client-id-propagation">>, + Config, ClientId), + receive {mqttc, C, connected} -> ok + after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout) + end, + emqttc:subscribe(C, <<"TopicA">>, qos0), + [{authentication, AuthProps}] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_auth_backend_mqtt_mock, + get, + [authentication]), + ?assertEqual(ClientId, proplists:get_value(client_id, AuthProps)), + + [{vhost_access, AuthzData}] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_auth_backend_mqtt_mock, + get, + [vhost_access]), + ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzData)), + + [{resource_access, AuthzContext}] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_auth_backend_mqtt_mock, + get, + [resource_access]), + ?assertEqual(true, maps:size(AuthzContext) > 0), + ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzContext)), + + [{topic_access, TopicContext}] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_auth_backend_mqtt_mock, + get, + [topic_access]), + VariableMap = maps:get(variable_map, TopicContext), + ?assertEqual(ClientId, maps:get(<<"client_id">>, VariableMap)), + + emqttc:disconnect(C). + +connect_user(User, Pass, Config) -> + connect_user(User, Pass, Config, User). +connect_user(User, Pass, Config, ClientID) -> + Creds = case User of + undefined -> []; + _ -> [{username, User}] + end ++ case Pass of + undefined -> []; + _ -> [{password, Pass}] + end, + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, ClientID}, + {proto_ver, 3}, + {logger, info}] ++ Creds). + +expect_successful_connection(ConnectFun, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []), + {ok, C} = ConnectFun(Config), + receive {mqttc, C, connected} -> emqttc:disconnect(C) + after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout) + end, + [Attempt] = + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []), + ?assertEqual(false, proplists:is_defined(remote_address, Attempt)), + ?assertEqual(false, proplists:is_defined(username, Attempt)), + ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>), + ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1), + ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 0), + ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 1). + +expect_authentication_failure(ConnectFun, Config) -> + process_flag(trap_exit, true), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []), + {ok, C} = ConnectFun(Config), + Result = receive + {mqttc, C, connected} -> {error, unexpected_anonymous_connection}; + {'EXIT', C, {shutdown,{connack_error,'CONNACK_AUTH'}}} -> ok; + {'EXIT', C, {shutdown,{connack_error,'CONNACK_CREDENTIALS'}}} -> ok + after + ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout} + end, + [Attempt] = + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []), + ?assertEqual(false, proplists:is_defined(remote_address, Attempt), <<>>), + ?assertEqual(false, proplists:is_defined(username, Attempt)), + ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>), + ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1), + ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 1), + ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 0), + process_flag(trap_exit, false), + case Result of + ok -> ok; + {error, Err} -> exit(Err) + end. diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl new file mode 100644 index 0000000000..941b195ced --- /dev/null +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -0,0 +1,188 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(cluster_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + connection_id_tracking, + connection_id_tracking_on_nodedown, + connection_id_tracking_with_decommissioned_node + ]} + ]. + +suite() -> + [{timetrap, {minutes, 5}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, basic}, + {collect_statistics_interval, 100} + ]}). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 5} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases +%% ------------------------------------------------------------------- + +%% Note about running this testsuite in a mixed-versions cluster: +%% All even-numbered nodes will use the same code base when using a +%% secondary Umbrella. Odd-numbered nodes might use an incompatible code +%% base. When cluster-wide client ID tracking was introduced, it was not +%% put behind a feature flag because there was no need for one. Here, we +%% don't have a way to ensure that all nodes participate in client ID +%% tracking. However, those using the same code should. That's why we +%% limit our RPC calls to those nodes. +%% +%% That's also the reason why we use a 5-node cluster: with node 2 and +%% 4 which might not participate, it leaves nodes 1, 3 and 5: thus 3 +%% nodes, the minimum to use Ra in proper conditions. + +connection_id_tracking(Config) -> + ID = <<"duplicate-id">>, + {ok, MRef1, C1} = connect_to_node(Config, 0, ID), + emqttc:subscribe(C1, <<"TopicA">>, qos0), + emqttc:publish(C1, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + %% there's one connection + assert_connection_count(Config, 10, 2, 1), + + %% connect to the same node (A or 0) + {ok, MRef2, _C2} = connect_to_node(Config, 0, ID), + + %% C1 is disconnected + await_disconnection(MRef1), + + %% connect to a different node (C or 2) + {ok, _, C3} = connect_to_node(Config, 2, ID), + assert_connection_count(Config, 10, 2, 1), + + %% C2 is disconnected + await_disconnection(MRef2), + + emqttc:disconnect(C3). + +connection_id_tracking_on_nodedown(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, MRef, C} = connect_to_node(Config, 0, <<"simpleClient">>), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + assert_connection_count(Config, 10, 2, 1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + await_disconnection(MRef), + assert_connection_count(Config, 10, 2, 0), + ok. + +connection_id_tracking_with_decommissioned_node(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, MRef, C} = connect_to_node(Config, 0, <<"simpleClient">>), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + assert_connection_count(Config, 10, 2, 1), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]), + await_disconnection(MRef), + assert_connection_count(Config, 10, 2, 0), + ok. + +%% +%% Helpers +%% + +assert_connection_count(_Config, 0, _, _) -> + ct:fail("failed to complete rabbit_mqtt_collector:list/0"); +assert_connection_count(Config, Retries, NodeId, NumElements) -> + List = rabbit_ct_broker_helpers:rpc(Config, NodeId, rabbit_mqtt_collector, list, []), + case length(List) == NumElements of + true -> + ok; + false -> + timer:sleep(200), + assert_connection_count(Config, Retries-1, NodeId, NumElements) + end. + + + +connect_to_node(Config, Node, ClientID) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt), + {ok, C} = connect(Port, ClientID), + MRef = erlang:monitor(process, C), + {ok, MRef, C}. + +connect(Port, ClientID) -> + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, Port}, + {client_id, ClientID}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + unlink(C), + {ok, C}. + +await_disconnection(Ref) -> + receive + {'DOWN', Ref, _, _, _} -> ok + after 30000 -> exit(missing_down_message) + end. + +expect_publishes(_Topic, []) -> ok; +expect_publishes(Topic, [Payload|Rest]) -> + receive + {publish, Topic, Payload} -> expect_publishes(Topic, Rest) + after 5000 -> + throw({publish_not_delivered, Payload}) + end. diff --git a/deps/rabbitmq_mqtt/test/command_SUITE.erl b/deps/rabbitmq_mqtt/test/command_SUITE.erl new file mode 100644 index 0000000000..a15c3789f7 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/command_SUITE.erl @@ -0,0 +1,158 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + + +-module(command_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_mqtt.hrl"). + + +-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand'). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + merge_defaults, + run + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 3} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +merge_defaults(_Config) -> + {[<<"client_id">>, <<"conn_name">>], #{verbose := false}} = + ?COMMAND:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}), + + {[<<"other_key">>], #{verbose := false}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}). + + +run(Config) -> + + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = #{node => Node, timeout => 10000, verbose => false}, + + %% No connections + [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)), + + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, _} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + ct:sleep(100), + + [[{client_id, <<"simpleClient">>}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)), + + + {ok, _} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient1">>}, + {proto_ver, 3}, + {logger, info}, + {username, <<"guest">>}, + {password, <<"guest">>}, + {puback_timeout, 1}]), + ct:sleep(200), + + [[{client_id, <<"simpleClient">>}, {user, <<"guest">>}], + [{client_id, <<"simpleClient1">>}, {user, <<"guest">>}]] = + lists:sort( + 'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>, <<"user">>], + Opts))), + + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + start_amqp_connection(network, Node, Port), + + %% There are still just two connections + [[{client_id, <<"simpleClient">>}], + [{client_id, <<"simpleClient1">>}]] = + lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))), + + start_amqp_connection(direct, Node, Port), + ct:sleep(200), + + %% Still two MQTT connections, one direct AMQP 0-9-1 connection + [[{client_id, <<"simpleClient">>}], + [{client_id, <<"simpleClient1">>}]] = + lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))), + + %% Verbose returns all keys + Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS), + AllKeys1 = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)), + AllKeys2 = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})), + + %% There are two connections + [FirstPL, _] = AllKeys1, + [SecondPL, _] = AllKeys2, + + First = maps:from_list(lists:usort(FirstPL)), + Second = maps:from_list(lists:usort(SecondPL)), + + %% Keys are INFO_ITEMS + KeysCount = length(?INFO_ITEMS), + ?assert(KeysCount =:= maps:size(First)), + ?assert(KeysCount =:= maps:size(Second)), + + Keys = maps:keys(First), + + [] = Keys -- ?INFO_ITEMS, + [] = ?INFO_ITEMS -- Keys. + + +start_amqp_connection(Type, Node, Port) -> + amqp_connection:start(amqp_params(Type, Node, Port)). + +amqp_params(network, _, Port) -> + #amqp_params_network{port = Port}; +amqp_params(direct, Node, _) -> + #amqp_params_direct{node = Node}. + + + diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl b/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl new file mode 100644 index 0000000000..c760148cad --- /dev/null +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE.erl @@ -0,0 +1,55 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(config_schema_SUITE). + +-compile(export_all). + +all() -> + [ + run_snippets + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_config_schema:init_schemas(rabbitmq_mqtt, Config1). + + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +run_snippets(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, run_snippets1, [Config]). + +run_snippets1(Config) -> + rabbit_ct_config_schema:run_snippets(Config). + diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cacert.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/cert.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/certs/key.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets new file mode 100644 index 0000000000..032cce01f9 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets @@ -0,0 +1,144 @@ +[{defaults, + "listeners.tcp.default = 5672 + mqtt.default_user = guest + mqtt.default_pass = guest + mqtt.allow_anonymous = true + mqtt.vhost = / + mqtt.exchange = amq.topic + mqtt.subscription_ttl = 1800000 + mqtt.prefetch = 10 + mqtt.sparkplug = true + mqtt.listeners.ssl = none +## Default MQTT with TLS port is 8883 +# mqtt.listeners.ssl.default = 8883 + mqtt.listeners.tcp.default = 1883 + mqtt.tcp_listen_options.backlog = 128 + mqtt.tcp_listen_options.nodelay = true + mqtt.proxy_protocol = false", + [{rabbit,[{tcp_listeners,[5672]}]}, + {rabbitmq_mqtt, + [{default_user,<<"guest">>}, + {default_pass,<<"guest">>}, + {allow_anonymous,true}, + {vhost,<<"/">>}, + {exchange,<<"amq.topic">>}, + {subscription_ttl,1800000}, + {prefetch,10}, + {sparkplug,true}, + {ssl_listeners,[]}, + {tcp_listeners,[1883]}, + {tcp_listen_options,[{backlog,128},{nodelay,true}]}, + {proxy_protocol,false}]}], + [rabbitmq_mqtt]}, + + {listener_tcp_options, + "mqtt.listeners.tcp.1 = 127.0.0.1:61613 + mqtt.listeners.tcp.2 = ::1:61613 + + mqtt.tcp_listen_options.backlog = 2048 + mqtt.tcp_listen_options.recbuf = 8192 + mqtt.tcp_listen_options.sndbuf = 8192 + + mqtt.tcp_listen_options.keepalive = true + mqtt.tcp_listen_options.nodelay = true + + mqtt.tcp_listen_options.exit_on_close = true + + mqtt.tcp_listen_options.send_timeout = 120 +", + [{rabbitmq_mqtt,[ + {tcp_listeners,[ + {"127.0.0.1",61613}, + {"::1",61613} + ]} + , {tcp_listen_options, [ + {backlog, 2048}, + {exit_on_close, true}, + + {recbuf, 8192}, + {sndbuf, 8192}, + + {send_timeout, 120}, + + {keepalive, true}, + {nodelay, true} + ]} + ]}], + [rabbitmq_mqtt]}, + + + {ssl, + "ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem + ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem + ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem + ssl_options.verify = verify_peer + ssl_options.fail_if_no_peer_cert = true + + mqtt.listeners.ssl.default = 8883 + mqtt.listeners.tcp.default = 1883", + [{rabbit, + [{ssl_options, + [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, + {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, + {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, + {verify,verify_peer}, + {fail_if_no_peer_cert,true}]}]}, + {rabbitmq_mqtt,[{ssl_listeners,[8883]},{tcp_listeners,[1883]}]}], + [rabbitmq_mqtt]}, + {ssl_cert_login, + "mqtt.ssl_cert_login = true", + [{rabbitmq_mqtt,[{ssl_cert_login,true}]}], + [rabbitmq_mqtt]}, + {ssl_cert_login_from, + "ssl_cert_login_from = common_name", + [{rabbit,[{ssl_cert_login_from,common_name}]}], + [rabbitmq_mqtt]}, + {proxy_protocol, + "listeners.tcp.default = 5672 + mqtt.default_user = guest + mqtt.default_pass = guest + mqtt.allow_anonymous = true + mqtt.vhost = / + mqtt.exchange = amq.topic + mqtt.subscription_ttl = undefined + mqtt.prefetch = 10 + mqtt.proxy_protocol = true", + [{rabbit,[{tcp_listeners,[5672]}]}, + {rabbitmq_mqtt, + [{default_user,<<"guest">>}, + {default_pass,<<"guest">>}, + {allow_anonymous,true}, + {vhost,<<"/">>}, + {exchange,<<"amq.topic">>}, + {subscription_ttl,undefined}, + {prefetch,10}, + {proxy_protocol,true}]}], + [rabbitmq_mqtt]}, + {prefetch_retained_msg_store, + "mqtt.default_user = guest + mqtt.default_pass = guest + mqtt.allow_anonymous = true + mqtt.vhost = / + mqtt.exchange = amq.topic + mqtt.subscription_ttl = 1800000 + mqtt.prefetch = 10 +## use DETS (disk-based) store for retained messages + mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets +## only used by DETS store + mqtt.retained_message_store_dets_sync_interval = 2000 + + mqtt.listeners.ssl = none + mqtt.listeners.tcp.default = 1883", + [{rabbitmq_mqtt, + [{default_user,<<"guest">>}, + {default_pass,<<"guest">>}, + {allow_anonymous,true}, + {vhost,<<"/">>}, + {exchange,<<"amq.topic">>}, + {subscription_ttl,1800000}, + {prefetch,10}, + {retained_message_store,rabbit_mqtt_retained_msg_store_dets}, + {retained_message_store_dets_sync_interval,2000}, + {ssl_listeners,[]}, + {tcp_listeners,[1883]}]}], + [rabbitmq_mqtt]}]. diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl new file mode 100644 index 0000000000..34ec8dac19 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -0,0 +1,127 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(java_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(BASE_CONF_RABBIT, {rabbit, [{ssl_options, [{fail_if_no_peer_cert, false}]}]}). +-define(BASE_CONF_MQTT, + {rabbitmq_mqtt, [ + {ssl_cert_login, true}, + {allow_anonymous, false}, + {sparkplug, true}, + {tcp_listeners, []}, + {ssl_listeners, []} + ]}). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + java + ]} + ]. + +suite() -> + [{timetrap, {seconds, 600}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + {ok, Ssl} = q(Config, [erlang_node_config, rabbit, ssl_options]), + Ssl1 = lists:keyreplace(fail_if_no_peer_cert, 1, Ssl, {fail_if_no_peer_cert, false}), + Config1 = rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{ssl_options, Ssl1}]}), + rabbit_ct_helpers:merge_app_env(Config1, ?BASE_CONF_MQTT). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE}, + {rmq_certspwd, "bunnychow"}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 3} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + CertsDir = ?config(rmq_certsdir, Config), + CertFile = filename:join([CertsDir, "client", "cert.pem"]), + {ok, CertBin} = file:read_file(CertFile), + [{'Certificate', Cert, not_encrypted}] = public_key:pem_decode(CertBin), + UserBin = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_ssl, + peer_cert_auth_name, + [Cert]), + User = binary_to_list(UserBin), + {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["add_user", User, ""]), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, + ["set_topic_permissions", "-p", "/", "guest", "amq.topic", + % Write permission + "test-topic|test-retained-topic|{username}.{client_id}.a|^sp[AB]v\\d+___\\d+", + % Read permission + "test-topic|test-retained-topic|last-will|{username}.{client_id}.a|^sp[AB]v\\d+___\\d+"]), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +java(Config) -> + CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir), + MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt), + MqttPort3 = rabbit_ct_broker_helpers:get_node_config(Config, 2, tcp_port_mqtt), + MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), + AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + os:putenv("SSL_CERTS_DIR", CertsDir), + os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)), + os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)), + os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)), + os:putenv("MQTT_PORT_3", erlang:integer_to_list(MqttPort3)), + os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)), + DataDir = rabbit_ct_helpers:get_config(Config, data_dir), + MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]), + {ok, _} = MakeResult. + +rpc(Config, M, F, A) -> + rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A). + +q(P, [K | Rem]) -> + case proplists:get_value(K, P) of + undefined -> undefined; + V -> q(V, Rem) + end; +q(P, []) -> {ok, P}. + diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore b/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore new file mode 100644 index 0000000000..4c70cdb707 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.gitignore @@ -0,0 +1,3 @@ +/build/ +/lib/ +/target/ diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100755 index 0000000000..2e394d5b34 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,110 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = + "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: : " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar Binary files differnew file mode 100755 index 0000000000..01e6799737 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.jar diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties new file mode 100755 index 0000000000..00d32aab1d --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
\ No newline at end of file diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile new file mode 100644 index 0000000000..e2f9748eb2 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile @@ -0,0 +1,27 @@ +export PATH :=$(CURDIR):$(PATH) +HOSTNAME := $(shell hostname) +MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \ + -Dhostname=$(HOSTNAME) \ + -Dcerts.dir=$(SSL_CERTS_DIR) \ + -Dmqtt.ssl.port=$(MQTT_SSL_PORT) \ + -Dmqtt.port=$(MQTT_PORT) \ + -Dmqtt.port.2=$(MQTT_PORT_2) \ + -Dmqtt.port.3=$(MQTT_PORT_3) \ + -Damqp.port=$(AMQP_PORT) + +.PHONY: deps tests clean distclean + +deps: + mkdir -p lib + @mvnw dependency:copy-dependencies -DoutputDirectory=lib + +tests: + # Note: to run a single test + # @mvnw -q $(MVN_FLAGS) -Dtest=MqttTest#subscribeMultiple test + @mvnw -q $(MVN_FLAGS) test + +clean: + @mvnw clean + +distclean: clean + rm -f lib/*.jar diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw new file mode 100755 index 0000000000..8b9da3b8b6 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw @@ -0,0 +1,286 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar" + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + wget "$jarUrl" -O "$wrapperJarPath" + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + curl -o "$wrapperJarPath" "$jarUrl" + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd new file mode 100755 index 0000000000..a5284c7939 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/mvnw.cmd @@ -0,0 +1,161 @@ +@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
+FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ echo Found %WRAPPER_JAR%
+) else (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
+ echo Finished downloading %WRAPPER_JAR%
+)
+@REM End of extension
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml new file mode 100644 index 0000000000..b27b58c172 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml @@ -0,0 +1,137 @@ +<?xml version="1.0"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client-mqtt</artifactId> + <version>3.8.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>RabbitMQ MQTT plugin dependencies list</name> + <description>Fetches test dependencies only.</description> + <url>https://www.rabbitmq.com</url> + + <dependencies> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>[1.2.1,)</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>5.7.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>5.5.2</version> + <scope>test</scope> + </dependency> + </dependencies> + + <properties> + <test-keystore.ca>${project.build.directory}/ca.keystore</test-keystore.ca> + <test-keystore.password>bunnychow</test-keystore.password> + <groovy-scripts.dir>${basedir}/src/test/scripts</groovy-scripts.dir> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.22.2</version> + <configuration> + <environmentVariables> + <DEPS_DIR>${deps.dir}</DEPS_DIR> + </environmentVariables> + <systemPropertyVariables> + <hostname>${hostname}</hostname> + <certs.dir>${certs.dir}</certs.dir> + <mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port> + <mqtt.port>${mqtt.port}</mqtt.port> + <mqtt.port.2>${mqtt.port.2}</mqtt.port.2> + <amqp.port>${amqp.port}</amqp.port> + + <test-keystore.ca>${test-keystore.ca}</test-keystore.ca> + <test-keystore.password>${test-keystore.password}</test-keystore.password> + <test-client-cert.path>${certs.dir}/client/keycert.p12</test-client-cert.path> + <test-client-cert.password>bunnychow</test-client-cert.password> + + </systemPropertyVariables> + <!-- + needed because of bug in OpenJDK 8 u181 on Debian distros + see https://stackoverflow.com/questions/53010200/maven-surefire-could-not-find-forkedbooter-class + --> + <argLine>-Djdk.net.URLClassPath.disableClassPathURLCheck=true</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>groovy-maven-plugin</artifactId> + <version>2.1.1</version> + <dependencies> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-all</artifactId> + <version>2.4.17</version> + </dependency> + </dependencies> + <executions> + <execution> + <phase>generate-test-resources</phase> + <id>remove-old-test-keystores</id> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <source> + ${groovy-scripts.dir}/remove_old_test_keystores.groovy + </source> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>keytool-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>generate-test-ca-keystore</id> + <phase>generate-test-resources</phase> + <goals> + <goal>importCertificate</goal> + </goals> + <configuration> + <file>${certs.dir}/testca/cacert.pem</file> + <keystore>${test-keystore.ca}</keystore> + <storepass>${test-keystore.password}</storepass> + <noprompt>true</noprompt> + <alias>server1</alias> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <compilerArgs> + <arg>-Xlint:deprecation</arg> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + + </plugins> + </build> +</project> diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config new file mode 100644 index 0000000000..3d6bafff86 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test.config @@ -0,0 +1,14 @@ +[{rabbitmq_mqtt, [ + {ssl_cert_login, true}, + {allow_anonymous, true}, + {tcp_listeners, [1883]}, + {ssl_listeners, [8883]} + ]}, + {rabbit, [{ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"}, + {certfile,"%%CERTS_DIR%%/server/cert.pem"}, + {keyfile,"%%CERTS_DIR%%/server/key.pem"}, + {verify,verify_peer}, + {fail_if_no_peer_cert,false} + ]} + ]} +]. diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java new file mode 100644 index 0000000000..24c4a0be14 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java @@ -0,0 +1,1030 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.mqtt.test; + +import com.rabbitmq.client.*; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.internal.NetworkModule; +import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttPingReq; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import javax.net.SocketFactory; +import java.io.*; +import java.net.InetAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; + +import static org.junit.jupiter.api.Assertions.*; + +/*** + * MQTT v3.1 tests + * + */ + +public class MqttTest implements MqttCallback { + + private static final Duration EXPECT_TIMEOUT = Duration.ofSeconds(10); + + private final String host = "localhost"; + private final String brokerUrl = "tcp://" + host + ":" + getPort(); + private final String brokerThreeUrl = "tcp://" + host + ":" + getThirdPort(); + private volatile List<MqttMessage> receivedMessages; + + private final byte[] payload = "payload".getBytes(); + private final String topic = "test-topic"; + private final String retainedTopic = "test-retained-topic"; + private int testDelay = 2000; + + private volatile long lastReceipt; + private volatile boolean expectConnectionFailure; + private volatile boolean failOnDelivery = false; + + private Connection conn; + private Channel ch; + + private static int getPort() { + Object port = System.getProperty("mqtt.port", "1883"); + assertNotNull(port); + return Integer.parseInt(port.toString()); + } + + private static int getThirdPort() { + Object port = System.getProperty("mqtt.port.3", "1883"); + assertNotNull(port); + return Integer.parseInt(port.toString()); + } + + private static int getAmqpPort() { + Object port = System.getProperty("amqp.port", "5672"); + assertNotNull(port); + return Integer.parseInt(port.toString()); + } + + // override the 10s limit + private class TestMqttConnectOptions extends MqttConnectOptions { + private int keepAliveInterval = 60; + private final String user_name = "guest"; + private final String password = "guest"; + + public TestMqttConnectOptions() { + super.setUserName(user_name); + super.setPassword(password.toCharArray()); + super.setCleanSession(true); + super.setKeepAliveInterval(60); + // PublishMultiple overwhelms Paho defaults + super.setMaxInflight(15000); + } + + @Override + public void setKeepAliveInterval(int keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + @Override + public int getKeepAliveInterval() { + return this.keepAliveInterval; + } + } + + private MqttClient newClient(TestInfo testInfo) throws MqttException { + return newClient(clientId(testInfo)); + } + + private MqttClient newClient(String client_id) throws MqttException { + return newClient(brokerUrl, client_id); + } + + private MqttClient newClient(String uri, TestInfo testInfo) throws MqttException { + return newClient(uri, clientId(testInfo)); + } + + private MqttClient newClient(String uri, String client_id) throws MqttException { + return new MqttClient(uri, client_id, null); + } + + private MqttClient newConnectedClient(TestInfo testInfo, MqttConnectOptions conOpt) throws MqttException { + return newConnectedClient(clientId(testInfo), conOpt); + } + + private MqttClient newConnectedClient(String client_id, MqttConnectOptions conOpt) throws MqttException { + MqttClient client = newClient(brokerUrl, client_id); + client.connect(conOpt); + return client; + } + + private static String clientId(TestInfo info) { + return "test-" + info.getTestMethod().get().getName(); + } + + private void disconnect(MqttClient client) { + try { + if (client.isConnected()) { + client.disconnect(5000); + } + } catch (Exception ignored) {} + } + + @BeforeEach + public void setUp() { + receivedMessages = Collections.synchronizedList(new ArrayList<>()); + expectConnectionFailure = false; + } + + @AfterEach + public void tearDown() { + // clean any sticky sessions + receivedMessages.clear(); + } + + private void setUpAmqp() throws IOException, TimeoutException { + int port = getAmqpPort(); + ConnectionFactory cf = new ConnectionFactory(); + cf.setHost(host); + cf.setPort(port); + conn = cf.newConnection(); + ch = conn.createChannel(); + } + + private void tearDownAmqp() throws IOException { + if (conn.isOpen()) { + conn.close(); + } + } + + @Test + public void connectFirst() throws MqttException, IOException { + NetworkModule networkModule = new TCPNetworkModule(SocketFactory.getDefault(), host, getPort(), ""); + networkModule.start(); + DataInputStream in = new DataInputStream(networkModule.getInputStream()); + OutputStream out = networkModule.getOutputStream(); + + MqttWireMessage message = new MqttPingReq(); + + try { + // ---8<--- + // Copy/pasted from write() in MqttOutputStream.java. + byte[] bytes = message.getHeader(); + byte[] pl = message.getPayload(); + out.write(bytes,0,bytes.length); + + int offset = 0; + int chunckSize = 1024; + while (offset < pl.length) { + int length = Math.min(chunckSize, pl.length - offset); + out.write(pl, offset, length); + offset += chunckSize; + } + // ---8<--- + + // ---8<--- + // Copy/pasted from flush() in MqttOutputStream.java. + out.flush(); + // ---8<--- + + // ---8<--- + // Copy/pasted from readMqttWireMessage() in MqttInputStream.java. + ByteArrayOutputStream bais = new ByteArrayOutputStream(); + byte first = in.readByte(); + // ---8<--- + + fail("Error expected if CONNECT is not first packet"); + } catch (IOException ignored) {} + } + + @Test public void invalidUser(TestInfo info) throws MqttException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setUserName("invalid-user"); + MqttClient client = newClient(info); + try { + client.connect(client_opts); + fail("Authentication failure expected"); + } catch (MqttException ex) { + assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode()); + } finally { + if (client.isConnected()) { + disconnect(client); + } + } + } + + // rabbitmq/rabbitmq-mqtt#37: QoS 1, clean session = false + @Test public void qos1AndCleanSessionUnset() + throws MqttException, IOException, TimeoutException, InterruptedException { + testQueuePropertiesWithCleanSessionUnset("qos1-no-clean-session", 1, true, false); + } + + protected void testQueuePropertiesWithCleanSessionSet(String cid, int qos, boolean durable, boolean autoDelete) + throws IOException, MqttException, TimeoutException, InterruptedException { + testQueuePropertiesWithCleanSession(true, cid, qos, durable, autoDelete); + } + + protected void testQueuePropertiesWithCleanSessionUnset(String cid, int qos, boolean durable, boolean autoDelete) + throws IOException, MqttException, TimeoutException, InterruptedException { + testQueuePropertiesWithCleanSession(false, cid, qos, durable, autoDelete); + } + + protected void testQueuePropertiesWithCleanSession(boolean cleanSession, String cid, int qos, + boolean durable, boolean autoDelete) + throws MqttException, IOException, TimeoutException { + MqttClient c = newClient(brokerUrl, cid); + MqttConnectOptions opts = new TestMqttConnectOptions(); + opts.setUserName("guest"); + opts.setPassword("guest".toCharArray()); + opts.setCleanSession(cleanSession); + c.connect(opts); + + setUpAmqp(); + Channel tmpCh = conn.createChannel(); + + String q = "mqtt-subscription-" + cid + "qos" + qos; + + c.subscribe(topic, qos); + // there is no server-sent notification about subscription + // success so we inject a delay + waitForTestDelay(); + + // ensure the queue is declared with the arguments we expect + // e.g. mqtt-subscription-client-3aqos0 + try { + // first ensure the queue exists + tmpCh.queueDeclarePassive(q); + // then assert on properties + Map<String, Object> args = new HashMap<>(); + args.put("x-expires", 86400000); + tmpCh.queueDeclare(q, durable, autoDelete, false, args); + } finally { + if (c.isConnected()) { + c.disconnect(3000); + } + + Channel tmpCh2 = conn.createChannel(); + tmpCh2.queueDelete(q); + tmpCh2.close(); + tearDownAmqp(); + } + } + + @Test public void invalidPassword(TestInfo info) throws MqttException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setUserName("invalid-user"); + client_opts.setPassword("invalid-password".toCharArray()); + MqttClient client = newClient(info); + try { + client.connect(client_opts); + fail("Authentication failure expected"); + } catch (MqttException ex) { + assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode()); + } finally { + if (client.isConnected()) { + disconnect(client); + } + } + } + + @Test public void emptyPassword(TestInfo info) throws MqttException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setPassword("".toCharArray()); + + MqttClient client = newClient(info); + try { + client.connect(client_opts); + fail("Authentication failure expected"); + } catch (MqttException ex) { + assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode()); + } + } + + + @Test public void subscribeQos0(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newClient(info); + client.connect(client_opts); + client.setCallback(this); + client.subscribe(topic, 0); + + publish(client, topic, 0, payload); + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + assertEquals(0, receivedMessages.get(0).getQos()); + disconnect(client); + } + + @Test public void subscribeUnsubscribe(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newClient(info); + client.connect(client_opts); + client.setCallback(this); + client.subscribe(topic, 0); + + publish(client, topic, 1, payload); + + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + assertEquals(0, receivedMessages.get(0).getQos()); + + client.unsubscribe(topic); + publish(client, topic, 0, payload); + waitAtMost(() -> receivedMessagesSize() == 1); + disconnect(client); + } + + @Test public void subscribeQos1(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newClient(info); + client.connect(client_opts); + client.setCallback(this); + client.subscribe(topic, 1); + + publish(client, topic, 0, payload); + publish(client, topic, 1, payload); + publish(client, topic, 2, payload); + + waitAtMost(() -> receivedMessagesSize() == 3); + + MqttMessage msg1 = receivedMessages.get(0); + MqttMessage msg2 = receivedMessages.get(1); + MqttMessage msg3 = receivedMessages.get(1); + + assertArrayEquals(msg1.getPayload(), payload); + assertEquals(0, msg1.getQos()); + + assertArrayEquals(msg2.getPayload(), payload); + assertEquals(1, msg2.getQos()); + + // Downgraded QoS 2 to QoS 1 + assertArrayEquals(msg3.getPayload(), payload); + assertEquals(1, msg3.getQos()); + + disconnect(client); + } + + @Test public void subscribeReceivesRetainedMessagesWithMatchingQoS(TestInfo info) + throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newClient(info); + client.connect(client_opts); + client.setCallback(this); + clearRetained(client, retainedTopic); + client.subscribe(retainedTopic, 1); + + publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8)); + publishRetained(client, retainedTopic, 1, "retain 2".getBytes(StandardCharsets.UTF_8)); + + waitAtMost(() -> receivedMessagesSize() == 2); + MqttMessage lastMsg = receivedMessages.get(1); + + client.unsubscribe(retainedTopic); + receivedMessages.clear(); + client.subscribe(retainedTopic, 1); + waitAtMost(() -> receivedMessagesSize() == 1); + final MqttMessage retainedMsg = receivedMessages.get(0); + assertEquals(new String(lastMsg.getPayload()), + new String(retainedMsg.getPayload())); + + disconnect(client); + } + + @Test public void subscribeReceivesRetainedMessagesWithDowngradedQoS(TestInfo info) + throws MqttException, InterruptedException { + MqttConnectOptions clientOpts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, clientOpts); + client.setCallback(this); + clearRetained(client, retainedTopic); + client.subscribe(retainedTopic, 1); + + publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8)); + + waitAtMost(() -> receivedMessagesSize() == 1); + MqttMessage lastMsg = receivedMessages.get(0); + + client.unsubscribe(retainedTopic); + receivedMessages.clear(); + final int subscribeQoS = 0; + client.subscribe(retainedTopic, subscribeQoS); + + waitAtMost(() -> receivedMessagesSize() == 1); + final MqttMessage retainedMsg = receivedMessages.get(0); + assertEquals(new String(lastMsg.getPayload()), + new String(retainedMsg.getPayload())); + assertEquals(subscribeQoS, retainedMsg.getQos()); + + disconnect(client); + } + + @Test public void publishWithEmptyMessageClearsRetained(TestInfo info) + throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + clearRetained(client, retainedTopic); + client.subscribe(retainedTopic, 1); + + publishRetained(client, retainedTopic, 1, "retain 1".getBytes(StandardCharsets.UTF_8)); + publishRetained(client, retainedTopic, 1, "retain 2".getBytes(StandardCharsets.UTF_8)); + + waitAtMost(() -> receivedMessagesSize() == 2); + client.unsubscribe(retainedTopic); + receivedMessages.clear(); + + clearRetained(client, retainedTopic); + client.subscribe(retainedTopic, 1); + waitAtMost(() -> receivedMessagesSize() == 0); + + disconnect(client); + } + + @Test public void topics(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + client.subscribe("/+/test-topic/#"); + String[] cases = new String[]{"/pre/test-topic2", "/test-topic", "/a/test-topic/b/c/d", "/frob/test-topic"}; + List<String> expected = Arrays.asList("/a/test-topic/b/c/d", "/frob/test-topic"); + for(String example : cases){ + publish(client, example, 0, example.getBytes()); + } + waitAtMost(() -> receivedMessagesSize() == expected.size()); + for (MqttMessage m : receivedMessages){ + expected.contains(new String(m.getPayload())); + } + disconnect(client); + } + + @Test public void sparkplugTopics(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException { + final String amqp091Topic = "spBv1___0.MACLab.DDATA.Opto22.CLX"; + final String sparkplugTopic = "spBv1.0/MACLab/+/Opto22/CLX"; + + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + client.subscribe(sparkplugTopic); + + setUpAmqp(); + ch.basicPublish("amq.topic", amqp091Topic, MessageProperties.MINIMAL_BASIC, payload); + tearDownAmqp(); + + waitAtMost(() -> receivedMessagesSize() == 1); + disconnect(client); + } + + @Test public void nonCleanSession(TestInfo info) throws MqttException, InterruptedException { + String clientIdBase = clientId(info); + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setCleanSession(false); + MqttClient client = newConnectedClient(clientIdBase + "-1", client_opts); + client.subscribe(topic, 1); + client.disconnect(); + + MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts); + publish(client2, topic, 1, payload); + client2.disconnect(); + + client.setCallback(this); + client.connect(client_opts); + + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + disconnect(client); + } + + @Test public void sessionRedelivery(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setCleanSession(false); + MqttClient client = newConnectedClient(info, client_opts); + client.subscribe(topic, 1); + disconnect(client); + + MqttClient client2 = newConnectedClient(info, client_opts); + publish(client2, topic, 1, payload); + disconnect(client2); + + failOnDelivery = true; + + // Connection should fail. Messages will be redelivered. + client.setCallback(this); + client.connect(client_opts); + + // Message has been delivered but connection has failed. + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + + assertFalse(client.isConnected()); + + receivedMessages.clear(); + failOnDelivery = false; + + client.setCallback(this); + client.connect(client_opts); + + // Message has been redelivered after session resume + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + assertTrue(client.isConnected()); + disconnect(client); + + receivedMessages.clear(); + + client.setCallback(this); + waitAtMost(() -> client.isConnected() == false); + client.connect(client_opts); + + // This time messaage are acknowledged and won't be redelivered + waitAtMost(() -> receivedMessagesSize() == 0); + assertEquals(0, receivedMessages.size()); + + disconnect(client); + } + + @Test public void cleanSession(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setCleanSession(false); + MqttClient client = newConnectedClient(info, client_opts); + client.subscribe(topic, 1); + client.disconnect(); + + MqttClient client2 = newConnectedClient(info, client_opts); + publish(client2, topic, 1, payload); + disconnect(client2); + + client_opts.setCleanSession(true); + client.connect(client_opts); + client.setCallback(this); + client.subscribe(topic, 1); + + waitAtMost(() -> receivedMessagesSize() == 0); + client.unsubscribe(topic); + disconnect(client); + } + + @Test public void multipleClientIds(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + // uses duplicate client ID + MqttClient client2 = newConnectedClient(info, client_opts); + // the older connection with this client ID will be closed + waitAtMost(() -> client.isConnected() == false); + disconnect(client2); + } + + @Test public void multipleClusterClientIds(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + MqttClient client3 = newClient(brokerThreeUrl, info); + client3.connect(client_opts); + waitAtMost(() -> client.isConnected() == false); + disconnect(client3); + } + + @Test public void ping(TestInfo info) throws MqttException, InterruptedException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + client_opts.setKeepAliveInterval(1); + MqttClient client = newConnectedClient(info, client_opts); + waitAtMost(() -> client.isConnected()); + disconnect(client); + } + + @Test public void will(TestInfo info) throws MqttException, InterruptedException, IOException { + String clientIdBase = clientId(info); + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts); + client2.subscribe(topic); + client2.setCallback(this); + + final SocketFactory factory = SocketFactory.getDefault(); + final ArrayList<Socket> sockets = new ArrayList<>(); + SocketFactory testFactory = new SocketFactory() { + public Socket createSocket(String s, int i) throws IOException { + Socket sock = factory.createSocket(s, i); + sockets.add(sock); + return sock; + } + public Socket createSocket(String s, int i, InetAddress a, int i1) { + return null; + } + public Socket createSocket(InetAddress a, int i) { + return null; + } + public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) { + return null; + } + @Override + public Socket createSocket() { + Socket sock = new Socket(); + sockets.add(sock); + return sock; + } + }; + + MqttClient client = newClient(clientIdBase + "-1"); + MqttTopic willTopic = client.getTopic(topic); + + MqttConnectOptions opts = new TestMqttConnectOptions(); + opts.setSocketFactory(testFactory); + opts.setWill(willTopic, payload, 0, false); + opts.setCleanSession(false); + + client.connect(opts); + + assertTrue(sockets.size() >= 1); + expectConnectionFailure = true; + sockets.get(0).close(); + + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), payload); + client2.unsubscribe(topic); + disconnect(client2); + } + + @Test public void willIsRetained(TestInfo info) throws MqttException, InterruptedException, IOException { + String clientIdBase = clientId(info); + MqttConnectOptions client2_opts = new TestMqttConnectOptions(); + client2_opts.setCleanSession(true); + MqttClient client2 = newConnectedClient(clientIdBase + "-2", client2_opts); + client2.setCallback(this); + + clearRetained(client2, retainedTopic); + client2.subscribe(retainedTopic, 1); + disconnect(client2); + + final SocketFactory factory = SocketFactory.getDefault(); + final ArrayList<Socket> sockets = new ArrayList<>(); + SocketFactory testFactory = new SocketFactory() { + public Socket createSocket(String s, int i) throws IOException { + Socket sock = factory.createSocket(s, i); + sockets.add(sock); + return sock; + } + public Socket createSocket(String s, int i, InetAddress a, int i1) { + return null; + } + public Socket createSocket(InetAddress a, int i) { + return null; + } + public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) { + return null; + } + @Override + public Socket createSocket() { + Socket sock = new Socket(); + sockets.add(sock); + return sock; + } + }; + + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + + MqttClient client = newClient(clientIdBase + "-1"); + MqttTopic willTopic = client.getTopic(retainedTopic); + byte[] willPayload = "willpayload".getBytes(); + + client_opts.setSocketFactory(testFactory); + client_opts.setWill(willTopic, willPayload, 1, true); + + client.connect(client_opts); + + assertEquals(1, sockets.size()); + sockets.get(0).close(); + + // let last will propagate after disconnection + waitForTestDelay(); + + client2.connect(client2_opts); + client2.setCallback(this); + client2.subscribe(retainedTopic, 1); + + waitAtMost(() -> receivedMessagesSize() == 1); + assertArrayEquals(receivedMessages.get(0).getPayload(), willPayload); + client2.unsubscribe(topic); + disconnect(client2); + } + + @Test public void subscribeMultiple(TestInfo info) throws MqttException { + String clientIdBase = clientId(info); + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(clientIdBase + "-1", client_opts); + publish(client, "/test-topic/1", 1, "msq1-qos1".getBytes()); + + MqttClient client2 = newConnectedClient(clientIdBase + "-2", client_opts); + client2.setCallback(this); + client2.subscribe("/test-topic/#"); + client2.subscribe("/test-topic/#"); + + publish(client, "/test-topic/2", 0, "msq2-qos0".getBytes()); + publish(client, "/test-topic/3", 1, "msq3-qos1".getBytes()); + publish(client, "/test-topic/4", 2, "msq3-qos2".getBytes()); + publish(client, topic, 0, "msq4-qos0".getBytes()); + publish(client, topic, 1, "msq4-qos1".getBytes()); + + + assertEquals(3, receivedMessages.size()); + disconnect(client); + disconnect(client2); + } + + @Test public void publishMultiple() throws MqttException, InterruptedException { + int pubCount = 50; + for (int subQos=0; subQos <= 2; subQos++){ + for (int pubQos=0; pubQos <= 2; pubQos++){ + // avoid reusing the client in this test as a shared + // client cannot handle connection churn very well. MK. + String cid = "test-sub-qos-" + subQos + "-pub-qos-" + pubQos; + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newClient(brokerUrl, cid); + client.connect(client_opts); + client.subscribe(topic, subQos); + client.setCallback(this); + long start = System.currentTimeMillis(); + for (int i=0; i<pubCount; i++){ + publish(client, topic, pubQos, payload); + } + + waitAtMost(() -> receivedMessagesSize() == pubCount); + System.out.println("publish QOS" + pubQos + " subscribe QOS" + subQos + + ", " + pubCount + " msgs took " + + (lastReceipt - start)/1000.0 + "sec"); + client.disconnect(5000); + receivedMessages.clear(); + } + } + } + + @Test public void topicAuthorisationPublish(TestInfo info) throws Exception { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + client.subscribe("some/test-topic"); + publish(client, "some/test-topic", 1, "content".getBytes()); + waitAtMost(() -> receivedMessagesSize() == 1); + assertTrue(client.isConnected()); + try { + publish(client, "forbidden-topic", 1, "content".getBytes()); + fail("Publishing on a forbidden topic, an exception should have been thrown"); + client.disconnect(); + } catch(Exception e) { + // OK + } + } + + @Test public void topicAuthorisationSubscribe(TestInfo info) throws Exception { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + client.subscribe("some/test-topic"); + try { + client.subscribe("forbidden-topic"); + fail("Subscribing to a forbidden topic, an exception should have been thrown"); + client.disconnect(); + } catch(Exception e) { + // OK + e.printStackTrace(); + } + } + + @Test public void lastWillDowngradesQoS2(TestInfo info) throws Exception { + String lastWillTopic = "test-topic-will-downgrades-qos"; + + MqttConnectOptions client2Opts = new TestMqttConnectOptions(); + MqttClient client2 = newConnectedClient(info, client2Opts); + client2.subscribe(lastWillTopic); + client2.setCallback(this); + + final SocketFactory factory = SocketFactory.getDefault(); + final ArrayList<Socket> sockets = new ArrayList<>(); + SocketFactory testFactory = new SocketFactory() { + public Socket createSocket(String s, int i) throws IOException { + Socket sock = factory.createSocket(s, i); + sockets.add(sock); + return sock; + } + public Socket createSocket(String s, int i, InetAddress a, int i1) { + return null; + } + public Socket createSocket(InetAddress a, int i) { + return null; + } + public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) { + return null; + } + @Override + public Socket createSocket() { + Socket sock = new Socket(); + sockets.add(sock); + return sock; + } + }; + + MqttConnectOptions clientOpts = new TestMqttConnectOptions(); + + MqttClient client = newClient("test-topic-will-downgrades-qos"); + clientOpts.setSocketFactory(testFactory); + MqttTopic willTopic = client.getTopic(lastWillTopic); + clientOpts.setWill(willTopic, payload, 2, false); + clientOpts.setCleanSession(false); + client.connect(clientOpts); + + waitAtMost(() -> sockets.size() == 1); + expectConnectionFailure = true; + sockets.get(0).close(); + + // let some time after disconnection + waitAtMost(() -> receivedMessagesSize() == 1); + assertEquals(1, receivedMessages.size()); + disconnect(client2); + } + + @Test public void lastWillNotSentOnRestrictedTopic(TestInfo info) throws Exception { + MqttConnectOptions client2_opts = new TestMqttConnectOptions(); + + MqttClient client2 = newConnectedClient(info, client2_opts); + // topic authorized for subscription, restricted for publishing + String lastWillTopic = "last-will"; + client2.subscribe(lastWillTopic); + client2.setCallback(this); + + final SocketFactory factory = SocketFactory.getDefault(); + final ArrayList<Socket> sockets = new ArrayList<>(); + SocketFactory testFactory = new SocketFactory() { + public Socket createSocket(String s, int i) throws IOException { + Socket sock = factory.createSocket(s, i); + sockets.add(sock); + return sock; + } + public Socket createSocket(String s, int i, InetAddress a, int i1) { + return null; + } + public Socket createSocket(InetAddress a, int i) { + return null; + } + public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) { + return null; + } + @Override + public Socket createSocket() { + Socket sock = new Socket(); + sockets.add(sock); + return sock; + } + }; + + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + + MqttClient client = newClient("last-will-not-sent-on-restricted-topic"); + client_opts.setSocketFactory(testFactory); + MqttTopic willTopic = client.getTopic(lastWillTopic); + client_opts.setWill(willTopic, payload, 0, false); + client_opts.setCleanSession(false); + client.connect(client_opts); + + assertEquals(1, sockets.size()); + expectConnectionFailure = true; + sockets.get(0).close(); + + // let some time after disconnection + waitForTestDelay(); + assertEquals(0, receivedMessages.size()); + disconnect(client2); + } + + @Test public void topicAuthorisationVariableExpansion(TestInfo info) throws Exception { + final String client_id = clientId(info); + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(client_id, client_opts); + client.setCallback(this); + String topicWithExpandedVariables = "guest/" + client_id + "/a"; + client.subscribe(topicWithExpandedVariables); + publish(client, topicWithExpandedVariables, 1, "content".getBytes()); + waitAtMost(() -> receivedMessagesSize() == 1); + assertTrue(client.isConnected()); + try { + publish(client, "guest/WrongClientId/a", 1, "content".getBytes()); + fail("Publishing on a forbidden topic, an exception should have been thrown"); + client.disconnect(); + } catch(Exception e) { + // OK + } + } + + @Test public void interopM2A(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException { + setUpAmqp(); + String queue = ch.queueDeclare().getQueue(); + ch.queueBind(queue, "amq.topic", topic); + + byte[] interopPayload = "interop-body".getBytes(); + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + publish(client, topic, 1, interopPayload); + disconnect(client); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<byte[]> messageBody = new AtomicReference<>(); + ch.basicConsume(queue, true, new DefaultConsumer(ch) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + messageBody.set(body); + latch.countDown(); + } + }); + assertTrue(latch.await(EXPECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(new String(interopPayload), new String(messageBody.get())); + assertNull(ch.basicGet(queue, true)); + tearDownAmqp(); + } + + @Test public void interopA2M(TestInfo info) throws MqttException, IOException, InterruptedException, TimeoutException { + MqttConnectOptions client_opts = new TestMqttConnectOptions(); + MqttClient client = newConnectedClient(info, client_opts); + client.setCallback(this); + client.subscribe(topic, 1); + + setUpAmqp(); + ch.basicPublish("amq.topic", topic, MessageProperties.MINIMAL_BASIC, payload); + tearDownAmqp(); + + waitAtMost(() -> receivedMessagesSize() == 1); + client.disconnect(); + } + + private void publish(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException { + publish(client, topicName, qos, payload, false); + } + + private void publish(MqttClient client, String topicName, int qos, byte[] payload, boolean retained) throws MqttException { + MqttTopic topic = client.getTopic(topicName); + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + MqttDeliveryToken token = topic.publish(message); + token.waitForCompletion(); + } + + private void publishRetained(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException { + publish(client, topicName, qos, payload, true); + } + + private void clearRetained(MqttClient client, String topicName) throws MqttException { + publishRetained(client, topicName, 1, "".getBytes()); + } + + public void connectionLost(Throwable cause) { + if (!expectConnectionFailure) + fail("Connection unexpectedly lost"); + } + + public void messageArrived(String topic, MqttMessage message) throws Exception { + lastReceipt = System.currentTimeMillis(); + receivedMessages.add(message); + if (failOnDelivery) { + throw new Exception("unexpected delivery on topic " + topic); + } + } + + public void deliveryComplete(IMqttDeliveryToken token) { + } + + private Integer receivedMessagesSize() { + return receivedMessages.size(); + } + + private void waitForTestDelay() { + try { + Thread.sleep(testDelay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public static void waitAtMost(BooleanSupplier condition) throws InterruptedException { + if (condition.getAsBoolean()) { + return; + } + int waitTime = 100; + int waitedTime = 0; + long timeoutInMs = EXPECT_TIMEOUT.toMillis(); + while (waitedTime <= timeoutInMs) { + Thread.sleep(waitTime); + if (condition.getAsBoolean()) { + return; + } + waitedTime += waitTime; + } + fail("Waited " + EXPECT_TIMEOUT.get(ChronoUnit.SECONDS) + " second(s), condition never got true"); + } +} diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh new file mode 100755 index 0000000000..cba9bcd493 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/rabbit-test.sh @@ -0,0 +1,8 @@ +#!/bin/sh +CTL=$1 +USER="O=client,CN=$(hostname)" + +# Test direct connections +$CTL add_user "$USER" '' +$CTL set_permissions -p / "$USER" ".*" ".*" ".*" +$CTL set_topic_permissions -p / "$USER" "amq.topic" "test-topic|test-retained-topic|.*topic.*" "test-topic|test-retained-topic|.*topic.*|last-will" diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh new file mode 100644 index 0000000000..2e2282ee07 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/setup-rabbit-test.sh @@ -0,0 +1,2 @@ +#!/bin/sh -e +sh -e `dirname $0`/rabbit-test.sh "$DEPS_DIR/rabbit/scripts/rabbitmqctl -n $RABBITMQ_NODENAME" diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java new file mode 100644 index 0000000000..2ea4c7a638 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MqttSSLTest.java @@ -0,0 +1,157 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.mqtt.test.tls; + +import org.eclipse.paho.client.mqttv3.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + + +/** + * MQTT v3.1 tests + * + */ + +public class MqttSSLTest implements MqttCallback { + + private final String brokerUrl = "ssl://" + getHost() + ":" + getPort(); + private String clientId; + private String clientId2; + private MqttClient client; + private MqttClient client2; + private MqttConnectOptions conOpt; + + private volatile List<MqttMessage> receivedMessages; + private volatile boolean expectConnectionFailure; + + private static String getPort() { + Object port = System.getProperty("mqtt.ssl.port"); + assertNotNull(port); + return port.toString(); + } + + private static String getHost() { + Object host = System.getProperty("hostname"); + assertNotNull(host); + return host.toString(); + } + + // override 10s limit + private class MyConnOpts extends MqttConnectOptions { + private int keepAliveInterval = 60; + + @Override + public void setKeepAliveInterval(int keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + @Override + public int getKeepAliveInterval() { + return keepAliveInterval; + } + } + + + @BeforeEach + public void setUp() throws MqttException, IOException { + clientId = getClass().getSimpleName() + ((int) (10000 * Math.random())); + clientId2 = clientId + "-2"; + client = new MqttClient(brokerUrl, clientId, null); + client2 = new MqttClient(brokerUrl, clientId2, null); + conOpt = new MyConnOpts(); + conOpt.setSocketFactory(MutualAuth.getSSLContextWithoutCert().getSocketFactory()); + setConOpts(conOpt); + receivedMessages = Collections.synchronizedList(new ArrayList<MqttMessage>()); + expectConnectionFailure = false; + } + + @AfterEach + public void tearDown() throws MqttException { + // clean any sticky sessions + setConOpts(conOpt); + client = new MqttClient(brokerUrl, clientId, null); + try { + client.connect(conOpt); + client.disconnect(); + } catch (Exception ignored) { + } + + client2 = new MqttClient(brokerUrl, clientId2, null); + try { + client2.connect(conOpt); + client2.disconnect(); + } catch (Exception ignored) { + } + } + + + private void setConOpts(MqttConnectOptions conOpts) { + conOpts.setCleanSession(true); + conOpts.setKeepAliveInterval(60); + } + + @Test + public void certLogin() throws MqttException { + try { + conOpt.setSocketFactory(MutualAuth.getSSLContextWithClientCert().getSocketFactory()); + client.connect(conOpt); + } catch (Exception e) { + e.printStackTrace(); + fail("Exception: " + e.getMessage()); + } + } + + + @Test public void invalidUser() throws MqttException { + conOpt.setUserName("invalid-user"); + try { + client.connect(conOpt); + fail("Authentication failure expected"); + } catch (MqttException ex) { + assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode()); + } catch (Exception e) { + e.printStackTrace(); + fail("Exception: " + e.getMessage()); + } + } + + @Test public void invalidPassword() throws MqttException { + conOpt.setUserName("invalid-user"); + conOpt.setPassword("invalid-password".toCharArray()); + try { + client.connect(conOpt); + fail("Authentication failure expected"); + } catch (MqttException ex) { + assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode()); + } catch (Exception e) { + e.printStackTrace(); + fail("Exception: " + e.getMessage()); + } + } + + + public void connectionLost(Throwable cause) { + if (!expectConnectionFailure) + fail("Connection unexpectedly lost"); + } + + public void messageArrived(String topic, MqttMessage message) throws Exception { + receivedMessages.add(message); + } + + public void deliveryComplete(IMqttDeliveryToken token) { + } +} diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java new file mode 100644 index 0000000000..081cae4052 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/tls/MutualAuth.java @@ -0,0 +1,89 @@ +package com.rabbitmq.mqtt.test.tls; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.Arrays; +import java.util.List; +import java.io.FileInputStream; + + +public class MutualAuth { + + private MutualAuth() { + + } + + private static String getStringProperty(String propertyName) throws IllegalArgumentException { + Object value = System.getProperty(propertyName); + if (value == null) throw new IllegalArgumentException("Property: " + propertyName + " not found"); + return value.toString(); + } + + private static TrustManagerFactory getServerTrustManagerFactory() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + String keystorePath = System.getProperty("test-keystore.ca"); + char[] trustPhrase = getStringProperty("test-keystore.password").toCharArray(); + MutualAuth dummy = new MutualAuth(); + + // Server TrustStore + KeyStore tks = KeyStore.getInstance("JKS"); + tks.load(new FileInputStream(keystorePath), trustPhrase); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(tks); + + return tmf; + } + + public static SSLContext getSSLContextWithClientCert() throws IOException { + + char[] clientPhrase = getStringProperty("test-client-cert.password").toCharArray(); + + String p12Path = System.getProperty("test-client-cert.path"); + + MutualAuth dummy = new MutualAuth(); + try { + SSLContext sslContext = getVanillaSSLContext(); + // Client Keystore + KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(new FileInputStream(p12Path), clientPhrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, clientPhrase); + + sslContext.init(kmf.getKeyManagers(), getServerTrustManagerFactory().getTrustManagers(), null); + return sslContext; + } catch (Exception e) { + throw new IOException(e); + } + + } + + private static SSLContext getVanillaSSLContext() throws NoSuchAlgorithmException { + SSLContext result = null; + List<String> xs = Arrays.asList("TLSv1.2", "TLSv1.1", "TLSv1"); + for(String x : xs) { + try { + return SSLContext.getInstance(x); + } catch (NoSuchAlgorithmException nae) { + // keep trying + } + } + throw new NoSuchAlgorithmException("Could not obtain an SSLContext for TLS 1.0-1.2"); + } + + public static SSLContext getSSLContextWithoutCert() throws IOException { + try { + SSLContext sslContext = getVanillaSSLContext(); + sslContext.init(null, getServerTrustManagerFactory().getTrustManagers(), null); + return sslContext; + } catch (Exception e) { + throw new IOException(e); + } + } + +} diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy new file mode 100644 index 0000000000..6864a41e29 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/scripts/remove_old_test_keystores.groovy @@ -0,0 +1,10 @@ +def dir = new File(project.build.directory) + +dir.mkdir() + +// This pattern starts with `.*`. This is normally useless and even +// inefficient but the matching doesn't work without it... +def pattern = ~/.*\.keystore$/ +dir.eachFileMatch(pattern) { file -> + file.delete() +} diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl new file mode 100644 index 0000000000..abdc3506dc --- /dev/null +++ b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl @@ -0,0 +1,73 @@ +-module(mqtt_machine_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("mqtt_machine.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + basics + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +basics(_Config) -> + S0 = mqtt_machine:init(#{}), + ClientId = <<"id1">>, + {S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0), + ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1), + {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1), + ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2), + {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), + ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3), + {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2), + ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4), + + ok. + +%% Utility + +meta(Idx) -> + #{index => Idx, + term => 1, + ts => erlang:system_time(millisecond)}. diff --git a/deps/rabbitmq_mqtt/test/processor_SUITE.erl b/deps/rabbitmq_mqtt/test/processor_SUITE.erl new file mode 100644 index 0000000000..e38a1d5318 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/processor_SUITE.erl @@ -0,0 +1,211 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + + +-module(processor_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + ignores_colons_in_username_if_option_set, + interprets_colons_in_username_if_option_not_set, + get_vhosts_from_global_runtime_parameter, + get_vhost, + add_client_id_to_adapter_info + ]} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +init_per_suite(Config) -> + ok = application:load(rabbitmq_mqtt), + Config. +end_per_suite(Config) -> + ok = application:unload(rabbitmq_mqtt), + Config. +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. +init_per_testcase(get_vhost, Config) -> + mnesia:start(), + mnesia:create_table(rabbit_runtime_parameters, [ + {attributes, record_info(fields, runtime_parameters)}, + {record_name, runtime_parameters}]), + Config; +init_per_testcase(_, Config) -> Config. +end_per_testcase(get_vhost, Config) -> + mnesia:stop(), + Config; +end_per_testcase(_, Config) -> Config. + +ignore_colons(B) -> application:set_env(rabbitmq_mqtt, ignore_colons_in_username, B). + +ignores_colons_in_username_if_option_set(_Config) -> + ignore_colons(true), + ?assertEqual({rabbit_mqtt_util:env(vhost), <<"a:b:c">>}, + rabbit_mqtt_processor:get_vhost_username(<<"a:b:c">>)). + +interprets_colons_in_username_if_option_not_set(_Config) -> + ignore_colons(false), + ?assertEqual({<<"a:b">>, <<"c">>}, + rabbit_mqtt_processor:get_vhost_username(<<"a:b:c">>)). + +get_vhosts_from_global_runtime_parameter(_Config) -> + MappingParameter = [ + {<<"O=client,CN=dummy1">>, <<"vhost1">>}, + {<<"O=client,CN=dummy2">>, <<"vhost2">>} + ], + <<"vhost1">> = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy1">>, MappingParameter), + <<"vhost2">> = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy2">>, MappingParameter), + undefined = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy3">>, MappingParameter), + undefined = rabbit_mqtt_processor:get_vhost_from_user_mapping(<<"O=client,CN=dummy3">>, not_found). + +get_vhost(_Config) -> + clear_vhost_global_parameters(), + + %% not a certificate user, no cert/vhost mapping, no vhost in user + %% should use default vhost + {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883), + {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, undefined, 1883), + clear_vhost_global_parameters(), + + %% not a certificate user, no cert/vhost mapping, vhost in user + %% should use vhost in user + {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"somevhost:guest">>, none, 1883), + clear_vhost_global_parameters(), + + %% certificate user, no cert/vhost mapping + %% should use default vhost + {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% certificate user, cert/vhost mapping with global runtime parameter + %% should use mapping + set_global_parameter(mqtt_default_vhosts, [ + {<<"O=client,CN=dummy">>, <<"somevhost">>}, + {<<"O=client,CN=otheruser">>, <<"othervhost">>} + ]), + {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% certificate user, cert/vhost mapping with global runtime parameter, but no key for the user + %% should use default vhost + set_global_parameter(mqtt_default_vhosts, [{<<"O=client,CN=otheruser">>, <<"somevhost">>}]), + {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% not a certificate user, port/vhost mapping + %% should use mapping + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1883">>, <<"somevhost">>}, + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883), + clear_vhost_global_parameters(), + + %% not a certificate user, port/vhost mapping, but vhost in username + %% vhost in username should take precedence + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1883">>, <<"somevhost">>}, + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"vhostinusername">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"vhostinusername:guest">>, none, 1883), + clear_vhost_global_parameters(), + + %% not a certificate user, port/vhost mapping, but no mapping for this port + %% should use default vhost + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"/">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883), + clear_vhost_global_parameters(), + + %% certificate user, port/vhost parameter, mapping, no cert/vhost mapping + %% should use port/vhost mapping + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1883">>, <<"somevhost">>}, + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% certificate user, port/vhost parameter but no mapping, cert/vhost mapping + %% should use cert/vhost mapping + set_global_parameter(mqtt_default_vhosts, [ + {<<"O=client,CN=dummy">>, <<"somevhost">>}, + {<<"O=client,CN=otheruser">>, <<"othervhost">>} + ]), + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% certificate user, port/vhost parameter, cert/vhost parameter + %% cert/vhost parameter takes precedence + set_global_parameter(mqtt_default_vhosts, [ + {<<"O=client,CN=dummy">>, <<"cert-somevhost">>}, + {<<"O=client,CN=otheruser">>, <<"othervhost">>} + ]), + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1883">>, <<"port-vhost">>}, + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"cert-somevhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, <<"O=client,CN=dummy">>, 1883), + clear_vhost_global_parameters(), + + %% certificate user, no port/vhost or cert/vhost mapping, vhost in username + %% should use vhost in username + {_, {<<"vhostinusername">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"vhostinusername:guest">>, <<"O=client,CN=dummy">>, 1883), + + %% not a certificate user, port/vhost parameter, cert/vhost parameter + %% port/vhost mapping is used, as cert/vhost should not be used + set_global_parameter(mqtt_default_vhosts, [ + {<<"O=cert">>, <<"cert-somevhost">>}, + {<<"O=client,CN=otheruser">>, <<"othervhost">>} + ]), + set_global_parameter(mqtt_port_to_vhost_mapping, [ + {<<"1883">>, <<"port-vhost">>}, + {<<"1884">>, <<"othervhost">>} + ]), + {_, {<<"port-vhost">>, <<"guest">>}} = rabbit_mqtt_processor:get_vhost(<<"guest">>, none, 1883), + clear_vhost_global_parameters(), + ok. + +add_client_id_to_adapter_info(_Config) -> + TestFun = fun(AdapterInfo) -> + Info0 = rabbit_mqtt_processor:add_client_id_to_adapter_info(<<"my-client-id">>, AdapterInfo), + AdditionalInfo0 = Info0#amqp_adapter_info.additional_info, + ?assertEqual(#{<<"client_id">> => <<"my-client-id">>}, proplists:get_value(variable_map, AdditionalInfo0)), + ClientProperties = proplists:get_value(client_properties, AdditionalInfo0), + ?assertEqual([{client_id,longstr,<<"my-client-id">>}], ClientProperties) + end, + lists:foreach(TestFun, [#amqp_adapter_info{}, #amqp_adapter_info{additional_info = [{client_properties, []}]}]), + ok. + +set_global_parameter(Key, Term) -> + InsertParameterFun = fun () -> + mnesia:write(rabbit_runtime_parameters, #runtime_parameters{key = Key, value = Term}, write) + end, + + {atomic, ok} = mnesia:transaction(InsertParameterFun). + +clear_vhost_global_parameters() -> + DeleteParameterFun = fun () -> + ok = mnesia:delete(rabbit_runtime_parameters, mqtt_default_vhosts, write), + ok = mnesia:delete(rabbit_runtime_parameters, mqtt_port_to_vhost_mapping, write) + end, + {atomic, ok} = mnesia:transaction(DeleteParameterFun). diff --git a/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl new file mode 100644 index 0000000000..5403de23d3 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl @@ -0,0 +1,125 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(proxy_protocol_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TIMEOUT, 5000). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + proxy_protocol, + proxy_protocol_tls + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_certspwd, "bunnychow"}, + {rabbitmq_ct_tls_verify, verify_none} + ]), + MqttConfig = mqtt_config(), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +mqtt_config() -> + {rabbitmq_mqtt, [ + {proxy_protocol, true}, + {ssl_cert_login, true}, + {allow_anonymous, true}]}. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +proxy_protocol(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port, + [binary, {active, false}, {packet, raw}]), + ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"), + ok = inet:send(Socket, mqtt_3_1_1_connect_frame()), + {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), + ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, connection_name, []), + match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]), + gen_tcp:close(Socket), + ok. + +proxy_protocol_tls(Config) -> + app_utils:start_applications([asn1, crypto, public_key, ssl]), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), + {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port, + [binary, {active, false}, {packet, raw}]), + ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"), + {ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT), + ok = ssl:send(SslSocket, mqtt_3_1_1_connect_frame()), + {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT), + ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, connection_name, []), + match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]), + gen_tcp:close(Socket), + ok. + +connection_name() -> + Connections = ets:tab2list(connection_created), + {_Key, Values} = lists:nth(1, Connections), + {_, Name} = lists:keyfind(name, 1, Values), + Name. + +merge_app_env(MqttConfig, Config) -> + rabbit_ct_helpers:merge_app_env(Config, MqttConfig). + +mqtt_3_1_1_connect_frame() -> + <<16, + 24, + 0, + 4, + 77, + 81, + 84, + 84, + 4, + 2, + 0, + 60, + 0, + 12, + 84, + 101, + 115, + 116, + 67, + 111, + 110, + 115, + 117, + 109, + 101, + 114>>. diff --git a/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl b/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl new file mode 100644 index 0000000000..5272138c6b --- /dev/null +++ b/deps/rabbitmq_mqtt/test/rabbit_auth_backend_mqtt_mock.erl @@ -0,0 +1,45 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% A mock authn/authz that records information during calls. For testing purposes only. + +-module(rabbit_auth_backend_mqtt_mock). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). + +-export([user_login_authentication/2, user_login_authorization/2, + check_vhost_access/3, check_resource_access/4, check_topic_access/4, + state_can_expire/0, + get/1]). + +user_login_authentication(_, AuthProps) -> + ets:new(?MODULE, [set, public, named_table]), + ets:insert(?MODULE, {authentication, AuthProps}), + {ok, #auth_user{username = <<"dummy">>, + tags = [], + impl = none}}. + +user_login_authorization(_, _) -> + io:format("login authorization"), + {ok, does_not_matter}. + +check_vhost_access(#auth_user{}, _VHostPath, AuthzData) -> + ets:insert(?MODULE, {vhost_access, AuthzData}), + true. +check_resource_access(#auth_user{}, #resource{}, _Permission, AuthzContext) -> + ets:insert(?MODULE, {resource_access, AuthzContext}), + true. +check_topic_access(#auth_user{}, #resource{}, _Permission, TopicContext) -> + ets:insert(?MODULE, {topic_access, TopicContext}), + true. + +state_can_expire() -> false. + +get(K) -> + ets:lookup(?MODULE, K). diff --git a/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app b/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app new file mode 100644 index 0000000000..c4083ec5fc --- /dev/null +++ b/deps/rabbitmq_mqtt/test/rabbitmq_mqtt.app @@ -0,0 +1,19 @@ +{application, rabbitmq_mqtt, + [{description, "RabbitMQ MQTT Adapter"}, + {vsn, "%%VSN%%"}, + {modules, []}, + {registered, []}, + {mod, {rabbit_mqtt, []}}, + {env, [{default_user, "guest_user"}, + {default_pass, "guest_pass"}, + {ssl_cert_login,false}, + {allow_anonymous, true}, + {vhost, "/"}, + {exchange, "amq.topic"}, + {subscription_ttl, 1800000}, % 30 min + {prefetch, 10}, + {ssl_listeners, []}, + {tcp_listeners, [1883]}, + {tcp_listen_options, [{backlog, 128}, + {nodelay, true}]}]}, + {applications, [kernel, stdlib, rabbit, amqp_client]}]}. diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl new file mode 100644 index 0000000000..b94fdb5920 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -0,0 +1,166 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(reader_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + block, + handle_invalid_frames, + stats + ]} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, basic}, + {collect_statistics_interval, 100} + ]}). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +block(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + %% Only here to ensure the connection is really up + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + emqttc:unsubscribe(C, [<<"TopicA">>]), + + emqttc:subscribe(C, <<"Topic1">>, qos0), + + %% Not blocked + {ok, _} = emqttc:sync_publish(C, <<"Topic1">>, <<"Not blocked yet">>, + [{qos, 1}]), + + ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]), + ok = rpc(Config, rabbit_alarm, set_alarm, [{{resource_limit, memory, node()}, []}]), + + %% Let it block + timer:sleep(100), + %% Blocked, but still will publish + {error, ack_timeout} = emqttc:sync_publish(C, <<"Topic1">>, <<"Now blocked">>, + [{qos, 1}]), + + %% Blocked + {error, ack_timeout} = emqttc:sync_publish(C, <<"Topic1">>, + <<"Blocked">>, [{qos, 1}]), + + rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]), + rpc(Config, rabbit_alarm, clear_alarm, [{resource_limit, memory, node()}]), + + %% Let alarms clear + timer:sleep(1000), + + expect_publishes(<<"Topic1">>, [<<"Not blocked yet">>, + <<"Now blocked">>, + <<"Blocked">>]), + + emqttc:disconnect(C). + +handle_invalid_frames(Config) -> + N = rpc(Config, ets, info, [connection_metrics, size]), + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = gen_tcp:connect("localhost", P, []), + Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*">>, + gen_tcp:send(C, Bin), + gen_tcp:close(C), + %% No new stats entries should be inserted as connection never got to initialize + N = rpc(Config, ets, info, [connection_metrics, size]). + +stats(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + %% CMN = rpc(Config, ets, info, [connection_metrics, size]), + %% CCMN = rpc(Config, ets, info, [connection_coarse_metrics, size]), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + %% Ensure that there are some stats + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + emqttc:unsubscribe(C, [<<"TopicA">>]), + timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms + %% Retrieve the connection Pid + [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []), + [{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]), + %% Verify the content of the metrics, garbage_collection must be present + [{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]), + true = proplists:is_defined(garbage_collection, Props), + %% If the coarse entry is present, stats were successfully emitted + [{Pid, _, _, _, _}] = rpc(Config, ets, lookup, + [connection_coarse_metrics, Pid]), + emqttc:disconnect(C). + +expect_publishes(_Topic, []) -> ok; +expect_publishes(Topic, [Payload|Rest]) -> + receive + {publish, Topic, Payload} -> expect_publishes(Topic, Rest) + after 5000 -> + throw({publish_not_delivered, Payload}) + end. + +rpc(Config, M, F, A) -> + rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A). diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl new file mode 100644 index 0000000000..22b72a8d87 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -0,0 +1,144 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(retainer_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + coerce_configuration_data, + should_translate_amqp2mqtt_on_publish, + should_translate_amqp2mqtt_on_retention, + should_translate_amqp2mqtt_on_retention_search + ]} + ]. + +suite() -> + [{timetrap, {seconds, 600}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]} + ]), + % see https://github.com/rabbitmq/rabbitmq-mqtt/issues/86 + RabbitConfig = {rabbit, [ + {default_user, "guest"}, + {default_pass, "guest"}, + {default_vhost, "/"}, + {default_permissions, [".*", ".*", ".*"]} + ]}, + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun(Conf) -> merge_app_env(RabbitConfig, Conf) end ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +merge_app_env(MqttConfig, Config) -> + rabbit_ct_helpers:merge_app_env(Config, MqttConfig). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +coerce_configuration_data(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link(connection_opts(P)), + + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + emqttc:disconnect(C), + ok. + +%% ------------------------------------------------------------------- +%% When a client is subscribed to TopicA/Device.Field and another +%% client publishes to TopicA/Device.Field the client should be +%% sent messages for the translated topic (TopicA/Device/Field) +%% ------------------------------------------------------------------- +should_translate_amqp2mqtt_on_publish(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link(connection_opts(P)), + %% there's an active consumer + emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), + expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), + emqttc:disconnect(C). + +%% ------------------------------------------------------------------- +%% If a client is publishes a retained message to TopicA/Device.Field and another +%% client subscribes to TopicA/Device.Field the client should be +%% sent the retained message for the translated topic (TopicA/Device/Field) +%% ------------------------------------------------------------------- +should_translate_amqp2mqtt_on_retention(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link(connection_opts(P)), + %% publish with retain = true before a consumer comes around + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), + emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), + expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), + emqttc:disconnect(C). + +%% ------------------------------------------------------------------- +%% If a client is publishes a retained message to TopicA/Device.Field and another +%% client subscribes to TopicA/Device/Field the client should be +%% sent retained message for the translated topic (TopicA/Device/Field) +%% ------------------------------------------------------------------- +should_translate_amqp2mqtt_on_retention_search(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link(connection_opts(P)), + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), + emqttc:subscribe(C, <<"TopicA/Device/Field">>, qos1), + expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), + emqttc:disconnect(C). + +connection_opts(Port) -> + [{host, "localhost"}, + {port, Port}, + {client_id, <<"simpleClientRetainer">>}, + {proto_ver,3}, + {logger, info}, + {puback_timeout, 1}]. + + expect_publishes(_Topic, []) -> ok; + expect_publishes(Topic, [Payload | Rest]) -> + receive + {publish, Topic, Payload} -> expect_publishes(Topic, Rest) + after 1500 -> + throw({publish_not_delivered, Payload}) + end. diff --git a/deps/rabbitmq_mqtt/test/util_SUITE.erl b/deps/rabbitmq_mqtt/test/util_SUITE.erl new file mode 100644 index 0000000000..6694498595 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/util_SUITE.erl @@ -0,0 +1,80 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + +-module(util_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, util_tests} + ]. + +groups() -> + [ + {util_tests, [parallel], [ + coerce_exchange, + coerce_vhost, + coerce_default_user, + coerce_default_pass, + mqtt_amqp_topic_translation + ] + } + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +init_per_suite(Config) -> + ok = application:load(rabbitmq_mqtt), + Config. +end_per_suite(Config) -> + ok = application:unload(rabbitmq_mqtt), + Config. +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. +init_per_testcase(_, Config) -> Config. +end_per_testcase(_, Config) -> Config. + +coerce_exchange(_) -> + ?assertEqual(<<"amq.topic">>, rabbit_mqtt_util:env(exchange)). + +coerce_vhost(_) -> + ?assertEqual(<<"/">>, rabbit_mqtt_util:env(vhost)). + +coerce_default_user(_) -> + ?assertEqual(<<"guest_user">>, rabbit_mqtt_util:env(default_user)). + +coerce_default_pass(_) -> + ?assertEqual(<<"guest_pass">>, rabbit_mqtt_util:env(default_pass)). + +mqtt_amqp_topic_translation(_) -> + ok = application:set_env(rabbitmq_mqtt, sparkplug, true), + {ok, {mqtt2amqp_fun, Mqtt2AmqpFun}, {amqp2mqtt_fun, Amqp2MqttFun}} = + rabbit_mqtt_util:get_topic_translation_funs(), + + T0 = "/foo/bar/+/baz", + T0_As_Amqp = <<".foo.bar.*.baz">>, + T0_As_Mqtt = <<"/foo/bar/+/baz">>, + ?assertEqual(T0_As_Amqp, Mqtt2AmqpFun(T0)), + ?assertEqual(T0_As_Mqtt, Amqp2MqttFun(T0_As_Amqp)), + + T1 = "spAv1.0/foo/bar/+/baz", + T1_As_Amqp = <<"spAv1___0.foo.bar.*.baz">>, + T1_As_Mqtt = <<"spAv1.0/foo/bar/+/baz">>, + ?assertEqual(T1_As_Amqp, Mqtt2AmqpFun(T1)), + ?assertEqual(T1_As_Mqtt, Amqp2MqttFun(T1_As_Amqp)), + + T2 = "spBv2.90/foo/bar/+/baz", + T2_As_Amqp = <<"spBv2___90.foo.bar.*.baz">>, + T2_As_Mqtt = <<"spBv2.90/foo/bar/+/baz">>, + ?assertEqual(T2_As_Amqp, Mqtt2AmqpFun(T2)), + ?assertEqual(T2_As_Mqtt, Amqp2MqttFun(T2_As_Amqp)), + + ok = application:unset_env(rabbitmq_mqtt, sparkplug), + ok. |