diff options
Diffstat (limited to 'openstackclient/tests/common')
| -rw-r--r-- | openstackclient/tests/common/test_clientmanager.py | 367 |
1 files changed, 27 insertions, 340 deletions
diff --git a/openstackclient/tests/common/test_clientmanager.py b/openstackclient/tests/common/test_clientmanager.py index 117c7184..625e175a 100644 --- a/openstackclient/tests/common/test_clientmanager.py +++ b/openstackclient/tests/common/test_clientmanager.py @@ -13,111 +13,34 @@ # under the License. # -import json as jsonutils -import mock +import copy -from keystoneauth1.access import service_catalog -from keystoneauth1.identity import v2 as auth_v2 from keystoneauth1 import token_endpoint -from osc_lib.api import auth -from osc_lib import exceptions as exc -from requests_mock.contrib import fixture +from osc_lib.tests import utils as osc_lib_test_utils from openstackclient.common import clientmanager from openstackclient.tests import fakes -from openstackclient.tests import utils -API_VERSION = {"identity": "2.0"} -AUTH_REF = {'version': 'v2.0'} -AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access']) -SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF) +class TestClientManager(osc_lib_test_utils.TestClientManager): - -# This is deferred in api.auth but we need it here... -auth.get_options_list() - - -class Container(object): - attr = clientmanager.ClientCache(lambda x: object()) - buggy_attr = clientmanager.ClientCache(lambda x: x.foo) - - def __init__(self): - pass - - -class FakeOptions(object): - - def __init__(self, **kwargs): - for option in auth.OPTIONS_LIST: - setattr(self, option.replace('-', '_'), None) - self.auth_type = None - self.identity_api_version = '2.0' - self.timing = None - self.region_name = None - self.interface = None - self.url = None - self.auth = {} - self.cert = None - self.key = None - self.default_domain = 'default' - self.__dict__.update(kwargs) - - -class TestClientCache(utils.TestCase): - - def test_singleton(self): - # NOTE(dtroyer): Verify that the ClientCache descriptor only invokes - # the factory one time and always returns the same value after that. - c = Container() - self.assertEqual(c.attr, c.attr) - - def test_attribute_error_propagates(self): - c = Container() - err = self.assertRaises(exc.PluginAttributeError, - getattr, c, 'buggy_attr') - self.assertNotIsInstance(err, AttributeError) - self.assertEqual("'Container' object has no attribute 'foo'", str(err)) - - -class TestClientManager(utils.TestCase): - - def setUp(self): - super(TestClientManager, self).setUp() - self.mock = mock.Mock() - self.requests = self.useFixture(fixture.Fixture()) - # fake v2password token retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT) - # fake token and token_endpoint retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT, - url='/'.join([fakes.AUTH_URL, 'v2.0/tokens'])) - # fake v3password token retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3, - url='/'.join([fakes.AUTH_URL, 'auth/tokens'])) - # fake password version endpoint discovery - self.stub_auth(json=fakes.TEST_VERSIONS, - url=fakes.AUTH_URL, - verb='GET') + def _clientmanager_class(self): + """Allow subclasses to override the ClientManager class""" + return clientmanager.ClientManager def test_client_manager_token_endpoint(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth_type='token_endpoint', - auth=dict( - token=fakes.AUTH_TOKEN, - url=fakes.AUTH_URL, - ), - ), - api_version=API_VERSION, - verify=True + token_auth = { + 'url': fakes.AUTH_URL, + 'token': fakes.AUTH_TOKEN, + } + client_manager = self._make_clientmanager( + auth_args=token_auth, + auth_plugin_name='token_endpoint', ) - client_manager.setup_auth() - client_manager.auth_ref self.assertEqual( fakes.AUTH_URL, - client_manager._url, + client_manager._cli_options.config['auth']['url'], ) self.assertEqual( fakes.AUTH_TOKEN, @@ -127,256 +50,20 @@ class TestClientManager(utils.TestCase): client_manager.auth, token_endpoint.Token, ) - self.assertFalse(client_manager._insecure) - self.assertTrue(client_manager._verify) - self.assertTrue(client_manager.is_network_endpoint_enabled()) - - def test_client_manager_token(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - token=fakes.AUTH_TOKEN, - auth_url=fakes.AUTH_URL, - ), - auth_type='v2token', - interface=fakes.INTERFACE, - region_name=fakes.REGION_NAME, - ), - api_version=API_VERSION, - verify=True - ) - client_manager.setup_auth() - client_manager.auth_ref - - self.assertEqual( - fakes.AUTH_URL, - client_manager._auth_url, - ) - self.assertIsInstance( - client_manager.auth, - auth_v2.Token, - ) - self.assertEqual( - fakes.INTERFACE, - client_manager._interface, - ) - self.assertEqual( - fakes.REGION_NAME, - client_manager._region_name, - ) - self.assertFalse(client_manager._insecure) - self.assertTrue(client_manager._verify) - self.assertTrue(client_manager.is_network_endpoint_enabled()) - - def test_client_manager_password(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - ), - api_version=API_VERSION, - verify=False, - ) - client_manager.setup_auth() - client_manager.auth_ref - - self.assertEqual( - fakes.AUTH_URL, - client_manager._auth_url, - ) - self.assertEqual( - fakes.USERNAME, - client_manager._username, - ) - self.assertEqual( - fakes.PASSWORD, - client_manager._password, - ) - self.assertIsInstance( - client_manager.auth, - auth_v2.Password, - ) - self.assertTrue(client_manager._insecure) - self.assertFalse(client_manager._verify) - # These need to stick around until the old-style clients are gone - self.assertEqual( - AUTH_REF.pop('version'), - client_manager.auth_ref.version, - ) - self.assertEqual( - fakes.to_unicode_dict(AUTH_REF), - client_manager.auth_ref._data['access'], - ) - self.assertEqual( - dir(SERVICE_CATALOG), - dir(client_manager.auth_ref.service_catalog), - ) self.assertTrue(client_manager.is_network_endpoint_enabled()) def test_client_manager_network_endpoint_disabled(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v3password', - ), - api_version={"identity": "3"}, - verify=False, - ) - client_manager.setup_auth() - client_manager.auth_ref - - # v3 fake doesn't have network endpoint. + auth_args = copy.deepcopy(self.default_password_auth) + auth_args.update({ + 'user_domain_name': 'default', + 'project_domain_name': 'default', + }) + # v3 fake doesn't have network endpoint + client_manager = self._make_clientmanager( + auth_args=auth_args, + identity_api_version='3', + auth_plugin_name='v3password', + ) + + self.assertFalse(client_manager.is_service_available('network')) self.assertFalse(client_manager.is_network_endpoint_enabled()) - - def stub_auth(self, json=None, url=None, verb=None, **kwargs): - subject_token = fakes.AUTH_TOKEN - base_url = fakes.AUTH_URL - if json: - text = jsonutils.dumps(json) - headers = {'X-Subject-Token': subject_token, - 'Content-Type': 'application/json'} - if not url: - url = '/'.join([base_url, 'tokens']) - url = url.replace("/?", "?") - if not verb: - verb = 'POST' - self.requests.register_uri(verb, - url, - headers=headers, - text=text) - - def test_client_manager_password_verify_ca(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v2password', - ), - api_version=API_VERSION, - verify='cafile', - ) - client_manager.setup_auth() - client_manager.auth_ref - - self.assertFalse(client_manager._insecure) - self.assertTrue(client_manager._verify) - self.assertEqual('cafile', client_manager._cacert) - self.assertTrue(client_manager.is_network_endpoint_enabled()) - - def test_client_manager_password_no_cert(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions()) - self.assertIsNone(client_manager._cert) - - def test_client_manager_password_client_cert(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(cert='cert')) - self.assertEqual('cert', client_manager._cert) - - def test_client_manager_password_client_cert_and_key(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(cert='cert', key='key')) - self.assertEqual(('cert', 'key'), client_manager._cert) - - def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name): - auth_params['auth_type'] = auth_plugin_name - auth_params['identity_api_version'] = api_version - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(**auth_params), - api_version={"identity": api_version}, - verify=True - ) - client_manager.setup_auth() - client_manager.auth_ref - - self.assertEqual( - auth_plugin_name, - client_manager.auth_plugin_name, - ) - - def test_client_manager_select_auth_plugin(self): - # test token auth - params = dict( - auth=dict( - auth_url=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ), - ) - self._select_auth_plugin(params, '2.0', 'v2token') - self._select_auth_plugin(params, '3', 'v3token') - self._select_auth_plugin(params, 'XXX', 'token') - # test token/endpoint auth - params = dict( - auth_plugin='token_endpoint', - auth=dict( - url='test', - token=fakes.AUTH_TOKEN, - ), - ) - self._select_auth_plugin(params, 'XXX', 'token_endpoint') - # test password auth - params = dict( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - ) - self._select_auth_plugin(params, '2.0', 'v2password') - self._select_auth_plugin(params, '3', 'v3password') - self._select_auth_plugin(params, 'XXX', 'password') - - def test_client_manager_select_auth_plugin_failure(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(os_auth_plugin=''), - api_version=API_VERSION, - verify=True, - ) - self.assertRaises( - exc.CommandError, - client_manager.setup_auth, - ) - - @mock.patch('osc_lib.api.auth.check_valid_authentication_options') - def test_client_manager_auth_setup_once(self, check_authn_options_func): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - ), - api_version=API_VERSION, - verify=False, - ) - self.assertFalse(client_manager._auth_setup_completed) - client_manager.setup_auth() - self.assertTrue(check_authn_options_func.called) - self.assertTrue(client_manager._auth_setup_completed) - - # now make sure we don't do auth setup the second time around - # by checking whether check_valid_auth_options() gets called again - check_authn_options_func.reset_mock() - client_manager.auth_ref - check_authn_options_func.assert_not_called() |
