summaryrefslogtreecommitdiff
path: root/reddwarfclient/tests/test_auth.py
diff options
context:
space:
mode:
Diffstat (limited to 'reddwarfclient/tests/test_auth.py')
-rw-r--r--reddwarfclient/tests/test_auth.py414
1 files changed, 0 insertions, 414 deletions
diff --git a/reddwarfclient/tests/test_auth.py b/reddwarfclient/tests/test_auth.py
deleted file mode 100644
index f1c4b59..0000000
--- a/reddwarfclient/tests/test_auth.py
+++ /dev/null
@@ -1,414 +0,0 @@
-import contextlib
-
-from testtools import TestCase
-from reddwarfclient import auth
-from mock import Mock
-
-from reddwarfclient import exceptions
-
-"""
-Unit tests for the classes and functions in auth.py.
-"""
-
-
-def check_url_none(test_case, auth_class):
- # url is None, it must throw exception
- authObj = auth_class(url=None, type=auth_class, client=None,
- username=None, password=None, tenant=None)
- try:
- authObj.authenticate()
- test_case.fail("AuthUrlNotGiven exception expected")
- except exceptions.AuthUrlNotGiven:
- pass
-
-
-class AuthenticatorTest(TestCase):
-
- def setUp(self):
- super(AuthenticatorTest, self).setUp()
- self.orig_load = auth.ServiceCatalog._load
- self.orig__init = auth.ServiceCatalog.__init__
-
- def tearDown(self):
- super(AuthenticatorTest, self).tearDown()
- auth.ServiceCatalog._load = self.orig_load
- auth.ServiceCatalog.__init__ = self.orig__init
-
- def test_get_authenticator_cls(self):
- class_list = (auth.KeyStoneV2Authenticator,
- auth.RaxAuthenticator,
- auth.Auth1_1,
- auth.FakeAuth)
-
- for c in class_list:
- self.assertEqual(c, auth.get_authenticator_cls(c))
-
- class_names = {"keystone": auth.KeyStoneV2Authenticator,
- "rax": auth.RaxAuthenticator,
- "auth1.1": auth.Auth1_1,
- "fake": auth.FakeAuth}
-
- for cn in class_names.keys():
- self.assertEqual(class_names[cn], auth.get_authenticator_cls(cn))
-
- cls_or_name = "_unknown_"
- self.assertRaises(ValueError, auth.get_authenticator_cls, cls_or_name)
-
- def test__authenticate(self):
- authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator,
- Mock(), Mock(), Mock(), Mock())
- # test response code 200
- resp = Mock()
- resp.status = 200
- body = "test_body"
-
- auth.ServiceCatalog._load = Mock(return_value=1)
- authObj.client._time_request = Mock(return_value=(resp, body))
-
- sc = authObj._authenticate(Mock(), Mock())
- self.assertEqual(body, sc.catalog)
-
- # test AmbiguousEndpoints exception
- auth.ServiceCatalog.__init__ = \
- Mock(side_effect=exceptions.AmbiguousEndpoints)
- self.assertRaises(exceptions.AmbiguousEndpoints,
- authObj._authenticate, Mock(), Mock())
-
- # test handling KeyError and raising AuthorizationFailure exception
- auth.ServiceCatalog.__init__ = Mock(side_effect=KeyError)
- self.assertRaises(exceptions.AuthorizationFailure,
- authObj._authenticate, Mock(), Mock())
-
- # test EndpointNotFound exception
- mock = Mock(side_effect=exceptions.EndpointNotFound)
- auth.ServiceCatalog.__init__ = mock
- self.assertRaises(exceptions.EndpointNotFound,
- authObj._authenticate, Mock(), Mock())
- mock.side_effect = None
-
- # test response code 305
- resp.__getitem__ = Mock(return_value='loc')
- resp.status = 305
- body = "test_body"
- authObj.client._time_request = Mock(return_value=(resp, body))
-
- l = authObj._authenticate(Mock(), Mock())
- self.assertEqual('loc', l)
-
- # test any response code other than 200 and 305
- resp.status = 404
- exceptions.from_response = Mock(side_effect=ValueError)
- self.assertRaises(ValueError, authObj._authenticate, Mock(), Mock())
-
- def test_authenticate(self):
- authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator,
- Mock(), Mock(), Mock(), Mock())
- self.assertRaises(NotImplementedError, authObj.authenticate)
-
-
-class KeyStoneV2AuthenticatorTest(TestCase):
-
- def test_authenticate(self):
- # url is None
- check_url_none(self, auth.KeyStoneV2Authenticator)
-
- # url is not None, so it must not throw exception
- url = "test_url"
- cls_type = auth.KeyStoneV2Authenticator
- authObj = auth.KeyStoneV2Authenticator(url=url, type=cls_type,
- client=None, username=None,
- password=None, tenant=None)
-
- def side_effect_func(url):
- return url
-
- mock = Mock()
- mock.side_effect = side_effect_func
- authObj._v2_auth = mock
- r = authObj.authenticate()
- self.assertEqual(url, r)
-
- def test__v2_auth(self):
- username = "reddwarf_user"
- password = "reddwarf_password"
- tenant = "tenant"
- cls_type = auth.KeyStoneV2Authenticator
- authObj = auth.KeyStoneV2Authenticator(url=None, type=cls_type,
- client=None,
- username=username,
- password=password,
- tenant=tenant)
-
- def side_effect_func(url, body):
- return body
- mock = Mock()
- mock.side_effect = side_effect_func
- authObj._authenticate = mock
- body = authObj._v2_auth(Mock())
- self.assertEqual(username,
- body['auth']['passwordCredentials']['username'])
- self.assertEqual(password,
- body['auth']['passwordCredentials']['password'])
- self.assertEqual(tenant, body['auth']['tenantName'])
-
-
-class Auth1_1Test(TestCase):
-
- def test_authenticate(self):
- # handle when url is None
- check_url_none(self, auth.Auth1_1)
-
- # url is not none
- username = "reddwarf_user"
- password = "reddwarf_password"
- url = "test_url"
- authObj = auth.Auth1_1(url=url,
- type=auth.Auth1_1,
- client=None, username=username,
- password=password, tenant=None)
-
- def side_effect_func(auth_url, body, root_key):
- return auth_url, body, root_key
-
- mock = Mock()
- mock.side_effect = side_effect_func
- authObj._authenticate = mock
- auth_url, body, root_key = authObj.authenticate()
-
- self.assertEqual(username, body['credentials']['username'])
- self.assertEqual(password, body['credentials']['key'])
- self.assertEqual(auth_url, url)
- self.assertEqual('auth', root_key)
-
-
-class RaxAuthenticatorTest(TestCase):
-
- def test_authenticate(self):
- # url is None
- check_url_none(self, auth.RaxAuthenticator)
-
- # url is not None, so it must not throw exception
- url = "test_url"
- authObj = auth.RaxAuthenticator(url=url,
- type=auth.RaxAuthenticator,
- client=None, username=None,
- password=None, tenant=None)
-
- def side_effect_func(url):
- return url
-
- mock = Mock()
- mock.side_effect = side_effect_func
- authObj._rax_auth = mock
- r = authObj.authenticate()
- self.assertEqual(url, r)
-
- def test__rax_auth(self):
- username = "reddwarf_user"
- password = "reddwarf_password"
- tenant = "tenant"
- authObj = auth.RaxAuthenticator(url=None,
- type=auth.RaxAuthenticator,
- client=None, username=username,
- password=password, tenant=tenant)
-
- def side_effect_func(url, body):
- return body
-
- mock = Mock()
- mock.side_effect = side_effect_func
- authObj._authenticate = mock
- body = authObj._rax_auth(Mock())
-
- v = body['auth']['RAX-KSKEY:apiKeyCredentials']['username']
- self.assertEqual(username, v)
-
- v = body['auth']['RAX-KSKEY:apiKeyCredentials']['apiKey']
- self.assertEqual(password, v)
-
- v = body['auth']['RAX-KSKEY:apiKeyCredentials']['tenantName']
- self.assertEqual(tenant, v)
-
-
-class FakeAuthTest(TestCase):
-
- def test_authenticate(self):
- tenant = "tenant"
- authObj = auth.FakeAuth(url=None,
- type=auth.FakeAuth,
- client=None, username=None,
- password=None, tenant=tenant)
-
- fc = authObj.authenticate()
- public_url = "%s/%s" % ('http://localhost:8779/v1.0', tenant)
- self.assertEqual(public_url, fc.get_public_url())
- self.assertEqual(tenant, fc.get_token())
-
-
-class ServiceCatalogTest(TestCase):
-
- def setUp(self):
- super(ServiceCatalogTest, self).setUp()
- self.orig_url_for = auth.ServiceCatalog._url_for
- self.orig__init__ = auth.ServiceCatalog.__init__
- auth.ServiceCatalog.__init__ = Mock(return_value=None)
- self.test_url = "http://localhost:1234/test"
-
- def tearDown(self):
- super(ServiceCatalogTest, self).tearDown()
- auth.ServiceCatalog._url_for = self.orig_url_for
- auth.ServiceCatalog.__init__ = self.orig__init__
-
- def test__load(self):
- url = "random_url"
- auth.ServiceCatalog._url_for = Mock(return_value=url)
-
- # when service_url is None
- scObj = auth.ServiceCatalog()
- scObj.region = None
- scObj.service_url = None
- scObj._load()
- self.assertEqual(url, scObj.public_url)
- self.assertEqual(url, scObj.management_url)
-
- # service url is not None
- service_url = "service_url"
- scObj = auth.ServiceCatalog()
- scObj.region = None
- scObj.service_url = service_url
- scObj._load()
- self.assertEqual(service_url, scObj.public_url)
- self.assertEqual(service_url, scObj.management_url)
-
- def test_get_token(self):
- test_id = "test_id"
- scObj = auth.ServiceCatalog()
- scObj.root_key = "root_key"
- scObj.catalog = dict()
- scObj.catalog[scObj.root_key] = dict()
- scObj.catalog[scObj.root_key]['token'] = dict()
- scObj.catalog[scObj.root_key]['token']['id'] = test_id
- self.assertEqual(test_id, scObj.get_token())
-
- def test_get_management_url(self):
- test_mng_url = "test_management_url"
- scObj = auth.ServiceCatalog()
- scObj.management_url = test_mng_url
- self.assertEqual(test_mng_url, scObj.get_management_url())
-
- def test_get_public_url(self):
- test_public_url = "test_public_url"
- scObj = auth.ServiceCatalog()
- scObj.public_url = test_public_url
- self.assertEqual(test_public_url, scObj.get_public_url())
-
- def test__url_for(self):
- scObj = auth.ServiceCatalog()
-
- # case for no endpoint found
- self.case_no_endpoint_match(scObj)
-
- # case for empty service catalog
- self.case_endpoing_with_empty_catalog(scObj)
-
- # more than one matching endpoints
- self.case_ambiguous_endpoint(scObj)
-
- # happy case
- self.case_unique_endpoint(scObj)
-
- # testing if-statements in for-loop to iterate services in catalog
- self.case_iterating_services_in_catalog(scObj)
-
- def case_no_endpoint_match(self, scObj):
- # empty endpoint list
- scObj.catalog = dict()
- scObj.catalog['endpoints'] = list()
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
-
- def side_effect_func_ep(attr):
- return "test_attr_value"
-
- # simulating dict
- endpoint = Mock()
- mock = Mock()
- mock.side_effect = side_effect_func_ep
- endpoint.__getitem__ = mock
- scObj.catalog['endpoints'].append(endpoint)
-
- # not-empty list but not matching endpoint
- filter_value = "not_matching_value"
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
- attr="test_attr", filter_value=filter_value)
-
- filter_value = "test_attr_value" # so that we have an endpoint match
- scObj.root_key = "access"
- scObj.catalog[scObj.root_key] = dict()
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
- attr="test_attr", filter_value=filter_value)
-
- def case_endpoing_with_empty_catalog(self, scObj):
- # first, test with empty catalog, this should pass since
- # there is already enpoint added
- scObj.catalog[scObj.root_key]['serviceCatalog'] = list()
-
- endpoint = scObj.catalog['endpoints'][0]
- endpoint.get = Mock(return_value=self.test_url)
- r_url = scObj._url_for(attr="test_attr",
- filter_value="test_attr_value")
- self.assertEqual(self.test_url, r_url)
-
- def case_ambiguous_endpoint(self, scObj):
- scObj.service_type = "reddwarf"
- scObj.service_name = "test_service_name"
-
- def side_effect_func_service(key):
- if key == "type":
- return "reddwarf"
- elif key == "name":
- return "test_service_name"
- return None
-
- mock1 = Mock()
- mock1.side_effect = side_effect_func_service
- service1 = Mock()
- service1.get = mock1
-
- endpoint2 = {"test_attr": "test_attr_value"}
- service1.__getitem__ = Mock(return_value=[endpoint2])
- scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1]
- self.assertRaises(exceptions.AmbiguousEndpoints, scObj._url_for,
- attr="test_attr", filter_value="test_attr_value")
-
- def case_unique_endpoint(self, scObj):
- # changing the endpoint2 attribute to pass the filter
- service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0]
- endpoint2 = service1[0][0]
- endpoint2["test_attr"] = "new value not matching filter"
- r_url = scObj._url_for(attr="test_attr",
- filter_value="test_attr_value")
- self.assertEqual(self.test_url, r_url)
-
- def case_iterating_services_in_catalog(self, scObj):
- service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0]
-
- scObj.catalog = dict()
- scObj.root_key = "access"
- scObj.catalog[scObj.root_key] = dict()
- scObj.service_type = "no_match"
-
- scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1]
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
-
- scObj.service_type = "database"
- scObj.service_name = "no_match"
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
-
- # no endpoints and no 'serviceCatalog' in catalog => raise exception
- scObj = auth.ServiceCatalog()
- scObj.catalog = dict()
- scObj.root_key = "access"
- scObj.catalog[scObj.root_key] = dict()
- scObj.catalog[scObj.root_key]['serviceCatalog'] = []
- self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
- attr="test_attr", filter_value="test_attr_value")