diff options
| author | Steve Martinelli <stevemar@ca.ibm.com> | 2014-02-06 05:53:24 +0000 |
|---|---|---|
| committer | Steve Martinelli <stevemar@ca.ibm.com> | 2015-02-08 20:49:43 -0500 |
| commit | cf23fd5cf6671770566e3280ff66ef8c5522cb4b (patch) | |
| tree | 00b89f2d0bc23d3a685ad34f0b1bdb9ce39a9d5b /openstackclient | |
| parent | b02cce567855399794e654f45eb2619eee8cc357 (diff) | |
| download | python-openstackclient-cf23fd5cf6671770566e3280ff66ef8c5522cb4b.tar.gz | |
Implement trust in identity v3 api
Added new module in identity v3 api to handle create, read, and delete
operations of trust resources.
Co-Authored-By: Lance Bragstad <lbragstad@gmail.com>
Co-Authored-By: Steve Martinelli <stevemar@ca.ibm.com>
Closes-Bug: #1413718
Change-Id: I2b360b141ff70d4f396466abede859a3db6644f4
Diffstat (limited to 'openstackclient')
| -rw-r--r-- | openstackclient/identity/v3/trust.py | 228 | ||||
| -rw-r--r-- | openstackclient/tests/identity/v3/fakes.py | 20 | ||||
| -rw-r--r-- | openstackclient/tests/identity/v3/test_trust.py | 229 |
3 files changed, 476 insertions, 1 deletions
diff --git a/openstackclient/identity/v3/trust.py b/openstackclient/identity/v3/trust.py new file mode 100644 index 00000000..e67b02e7 --- /dev/null +++ b/openstackclient/identity/v3/trust.py @@ -0,0 +1,228 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Identity v3 Trust action implementations""" + +import datetime +import logging +import six + +from cliff import command +from cliff import lister +from cliff import show + +from openstackclient.common import utils +from openstackclient.identity import common + + +class CreateTrust(show.ShowOne): + """Create new trust""" + + log = logging.getLogger(__name__ + '.CreateTrust') + + def get_parser(self, prog_name): + parser = super(CreateTrust, self).get_parser(prog_name) + parser.add_argument( + 'trustor', + metavar='<trustor-user>', + help='User that is delegating authorization (name or ID)', + ) + parser.add_argument( + 'trustee', + metavar='<trustee-user>', + help='User that is assuming authorization (name or ID)', + ) + parser.add_argument( + '--project', + metavar='<project>', + required=True, + help='Project being delegated (name or ID) (required)', + ) + parser.add_argument( + '--role', + metavar='<role>', + action='append', + default=[], + help='Roles to authorize (name or ID) ' + '(repeat to set multiple values) (required)', + required=True + ) + parser.add_argument( + '--impersonate', + dest='impersonate', + action='store_true', + default=False, + help='Tokens generated from the trust will represent <trustor>' + ' (defaults to False)', + ) + parser.add_argument( + '--expiration', + metavar='<expiration>', + help='Sets an expiration date for the trust' + ' (format of YYYY-mm-ddTHH:MM:SS)', + ) + parser.add_argument( + '--project-domain', + metavar='<domain>', + help='Domain that contains <project> (name or ID)', + ) + parser.add_argument( + '--trustor-domain', + metavar='<domain>', + help='Domain that contains <trustor> (name or ID)', + ) + parser.add_argument( + '--trustee-domain', + metavar='<domain>', + help='Domain that contains <trustee> (name or ID)', + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)' % parsed_args) + identity_client = self.app.client_manager.identity + + if parsed_args.project_domain: + project_domain = common.find_domain(identity_client, + parsed_args.project_domain).id + else: + project_domain = None + + if parsed_args.trustor_domain: + trustor_domain = common.find_domain(identity_client, + parsed_args.trustor_domain).id + else: + trustor_domain = None + + if parsed_args.trustee_domain: + trustee_domain = common.find_domain(identity_client, + parsed_args.trustee_domain).id + else: + trustee_domain = None + + # NOTE(stevemar): Find the two users, project and roles that + # are necessary for making a trust usable, the API dictates that + # trustee, project and role are optional, but that makes the trust + # pointless, and trusts are immutable, so let's enforce it at the + # client level. + trustor_id = utils.find_resource(identity_client.users, + parsed_args.trustor, + domain_id=trustor_domain).id + trustee_id = utils.find_resource(identity_client.users, + parsed_args.trustee, + domain_id=trustee_domain).id + project_id = utils.find_resource(identity_client.projects, + parsed_args.project, + domain_id=project_domain).id + + role_names = [] + for role in parsed_args.role: + role_name = utils.find_resource( + identity_client.roles, + role, + ).name + role_names.append(role_name) + + expires_at = None + if parsed_args.expiration: + expires_at = datetime.datetime.strptime(parsed_args.expiration, + '%Y-%m-%dT%H:%M:%S') + + trust = identity_client.trusts.create( + trustee_id, trustor_id, + impersonation=parsed_args.impersonate, + project=project_id, + role_names=role_names, + expires_at=expires_at, + ) + + trust._info.pop('roles_links', None) + trust._info.pop('links', None) + + # Format roles into something sensible + roles = trust._info.pop('roles') + msg = ''.join([r['name'] + ' ' for r in roles]) + trust._info['roles'] = msg + + return zip(*sorted(six.iteritems(trust._info))) + + +class DeleteTrust(command.Command): + """Delete trust(s)""" + + log = logging.getLogger(__name__ + '.DeleteTrust') + + def get_parser(self, prog_name): + parser = super(DeleteTrust, self).get_parser(prog_name) + parser.add_argument( + 'trust', + metavar='<trust>', + help='Trust(s) to delete', + nargs="+", + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)' % parsed_args) + identity_client = self.app.client_manager.identity + for t in parsed_args.trust: + trust_obj = utils.find_resource(identity_client.trusts, t) + identity_client.trusts.delete(trust_obj.id) + + +class ListTrust(lister.Lister): + """List trusts""" + + log = logging.getLogger(__name__ + '.ListTrust') + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)' % parsed_args) + columns = ('ID', 'Expires At', 'Impersonation', 'Project ID', + 'Trustee User ID', 'Trustor User ID') + data = self.app.client_manager.identity.trusts.list() + return (columns, + (utils.get_item_properties( + s, columns, + formatters={}, + ) for s in data)) + + +class ShowTrust(show.ShowOne): + """Display trust details""" + + log = logging.getLogger(__name__ + '.ShowTrust') + + def get_parser(self, prog_name): + parser = super(ShowTrust, self).get_parser(prog_name) + parser.add_argument( + 'trust', + metavar='<trust>', + help='Trust to display', + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)' % parsed_args) + identity_client = self.app.client_manager.identity + trust = utils.find_resource(identity_client.trusts, + parsed_args.trust) + + trust._info.pop('roles_links', None) + trust._info.pop('links', None) + + # Format roles into something sensible + roles = trust._info.pop('roles') + msg = ''.join([r['name'] + ' ' for r in roles]) + trust._info['roles'] = msg + + return zip(*sorted(six.iteritems(trust._info))) diff --git a/openstackclient/tests/identity/v3/fakes.py b/openstackclient/tests/identity/v3/fakes.py index 68e67519..c868401a 100644 --- a/openstackclient/tests/identity/v3/fakes.py +++ b/openstackclient/tests/identity/v3/fakes.py @@ -181,7 +181,7 @@ ENDPOINT = { 'links': base_url + 'endpoints/' + endpoint_id, } -user_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' +user_id = 'bbbbbbb-aaaa-aaaa-aaaa-bbbbbbbaaaa' user_name = 'paul' user_description = 'Sir Paul' user_email = 'paul@applecorps.com' @@ -196,6 +196,22 @@ USER = { 'links': base_url + 'users/' + user_id, } +trust_id = 't-456' +trust_expires = None +trust_impersonation = False +trust_roles = {"id": role_id, "name": role_name}, + +TRUST = { + 'expires_at': trust_expires, + 'id': trust_id, + 'impersonation': trust_impersonation, + 'links': base_url + 'trusts/' + trust_id, + 'project_id': project_id, + 'roles': trust_roles, + 'trustee_user_id': user_id, + 'trustor_user_id': user_id, +} + token_expires = '2014-01-01T00:00:00Z' token_id = 'tttttttt-tttt-tttt-tttt-tttttttttttt' @@ -342,6 +358,8 @@ class FakeIdentityv3Client(object): self.session = mock.Mock() self.session.auth.auth_ref.service_catalog.resource_class = \ fakes.FakeResource(None, {}) + self.trusts = mock.Mock() + self.trusts.resource_class = fakes.FakeResource(None, {}) self.users = mock.Mock() self.users.resource_class = fakes.FakeResource(None, {}) self.role_assignments = mock.Mock() diff --git a/openstackclient/tests/identity/v3/test_trust.py b/openstackclient/tests/identity/v3/test_trust.py new file mode 100644 index 00000000..b3fbe7f0 --- /dev/null +++ b/openstackclient/tests/identity/v3/test_trust.py @@ -0,0 +1,229 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy + +from openstackclient.identity.v3 import trust +from openstackclient.tests import fakes +from openstackclient.tests.identity.v3 import fakes as identity_fakes + + +class TestTrust(identity_fakes.TestIdentityv3): + + def setUp(self): + super(TestTrust, self).setUp() + + self.trusts_mock = self.app.client_manager.identity.trusts + self.trusts_mock.reset_mock() + self.projects_mock = self.app.client_manager.identity.projects + self.projects_mock.reset_mock() + self.users_mock = self.app.client_manager.identity.users + self.users_mock.reset_mock() + self.roles_mock = self.app.client_manager.identity.roles + self.roles_mock.reset_mock() + + +class TestTrustCreate(TestTrust): + + def setUp(self): + super(TestTrustCreate, self).setUp() + + self.projects_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.PROJECT), + loaded=True, + ) + + self.users_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.USER), + loaded=True, + ) + + self.roles_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.ROLE), + loaded=True, + ) + + self.trusts_mock.create.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.TRUST), + loaded=True, + ) + + # Get the command object to test + self.cmd = trust.CreateTrust(self.app, None) + + def test_trust_create_basic(self): + arglist = [ + '--project', identity_fakes.project_id, + '--role', identity_fakes.role_id, + identity_fakes.user_id, + identity_fakes.user_id + ] + verifylist = [ + ('project', identity_fakes.project_id), + ('impersonate', False), + ('role', [identity_fakes.role_id]), + ('trustor', identity_fakes.user_id), + ('trustee', identity_fakes.user_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'impersonation': False, + 'project': identity_fakes.project_id, + 'role_names': [identity_fakes.role_name], + 'expires_at': None, + } + # TrustManager.create(trustee_id, trustor_id, impersonation=, + # project=, role_names=, expires_at=) + self.trusts_mock.create.assert_called_with( + identity_fakes.user_id, + identity_fakes.user_id, + **kwargs + ) + + collist = ('expires_at', 'id', 'impersonation', 'project_id', + 'roles', 'trustee_user_id', 'trustor_user_id') + self.assertEqual(collist, columns) + datalist = ( + identity_fakes.trust_expires, + identity_fakes.trust_id, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.role_name + ' ', + identity_fakes.user_id, + identity_fakes.user_id + ) + self.assertEqual(datalist, data) + + +class TestTrustDelete(TestTrust): + + def setUp(self): + super(TestTrustDelete, self).setUp() + + # This is the return value for utils.find_resource() + self.trusts_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.TRUST), + loaded=True, + ) + self.trusts_mock.delete.return_value = None + + # Get the command object to test + self.cmd = trust.DeleteTrust(self.app, None) + + def test_trust_delete(self): + arglist = [ + identity_fakes.trust_id, + ] + verifylist = [ + ('trust', [identity_fakes.trust_id]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.trusts_mock.delete.assert_called_with( + identity_fakes.trust_id, + ) + + +class TestTrustList(TestTrust): + + def setUp(self): + super(TestTrustList, self).setUp() + + self.trusts_mock.list.return_value = [ + fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.TRUST), + loaded=True, + ), + ] + + # Get the command object to test + self.cmd = trust.ListTrust(self.app, None) + + def test_trust_list_no_options(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.trusts_mock.list.assert_called_with() + + collist = ('ID', 'Expires At', 'Impersonation', 'Project ID', + 'Trustee User ID', 'Trustor User ID') + self.assertEqual(collist, columns) + datalist = (( + identity_fakes.trust_id, + identity_fakes.trust_expires, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.user_id, + identity_fakes.user_id + ), ) + self.assertEqual(datalist, tuple(data)) + + +class TestTrustShow(TestTrust): + + def setUp(self): + super(TestTrustShow, self).setUp() + + self.trusts_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.TRUST), + loaded=True, + ) + + # Get the command object to test + self.cmd = trust.ShowTrust(self.app, None) + + def test_trust_show(self): + arglist = [ + identity_fakes.trust_id, + ] + verifylist = [ + ('trust', identity_fakes.trust_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.trusts_mock.get.assert_called_with(identity_fakes.trust_id) + + collist = ('expires_at', 'id', 'impersonation', 'project_id', + 'roles', 'trustee_user_id', 'trustor_user_id') + self.assertEqual(collist, columns) + datalist = ( + identity_fakes.trust_expires, + identity_fakes.trust_id, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.role_name + ' ', + identity_fakes.user_id, + identity_fakes.user_id + ) + self.assertEqual(datalist, data) |
