summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorSteve Martinelli <stevemar@ca.ibm.com>2014-02-06 05:53:24 +0000
committerSteve Martinelli <stevemar@ca.ibm.com>2015-02-08 20:49:43 -0500
commitcf23fd5cf6671770566e3280ff66ef8c5522cb4b (patch)
tree00b89f2d0bc23d3a685ad34f0b1bdb9ce39a9d5b /openstackclient
parentb02cce567855399794e654f45eb2619eee8cc357 (diff)
downloadpython-openstackclient-cf23fd5cf6671770566e3280ff66ef8c5522cb4b.tar.gz
Implement trust in identity v3 api
Added new module in identity v3 api to handle create, read, and delete operations of trust resources. Co-Authored-By: Lance Bragstad <lbragstad@gmail.com> Co-Authored-By: Steve Martinelli <stevemar@ca.ibm.com> Closes-Bug: #1413718 Change-Id: I2b360b141ff70d4f396466abede859a3db6644f4
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/identity/v3/trust.py228
-rw-r--r--openstackclient/tests/identity/v3/fakes.py20
-rw-r--r--openstackclient/tests/identity/v3/test_trust.py229
3 files changed, 476 insertions, 1 deletions
diff --git a/openstackclient/identity/v3/trust.py b/openstackclient/identity/v3/trust.py
new file mode 100644
index 00000000..e67b02e7
--- /dev/null
+++ b/openstackclient/identity/v3/trust.py
@@ -0,0 +1,228 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Identity v3 Trust action implementations"""
+
+import datetime
+import logging
+import six
+
+from cliff import command
+from cliff import lister
+from cliff import show
+
+from openstackclient.common import utils
+from openstackclient.identity import common
+
+
+class CreateTrust(show.ShowOne):
+ """Create new trust"""
+
+ log = logging.getLogger(__name__ + '.CreateTrust')
+
+ def get_parser(self, prog_name):
+ parser = super(CreateTrust, self).get_parser(prog_name)
+ parser.add_argument(
+ 'trustor',
+ metavar='<trustor-user>',
+ help='User that is delegating authorization (name or ID)',
+ )
+ parser.add_argument(
+ 'trustee',
+ metavar='<trustee-user>',
+ help='User that is assuming authorization (name or ID)',
+ )
+ parser.add_argument(
+ '--project',
+ metavar='<project>',
+ required=True,
+ help='Project being delegated (name or ID) (required)',
+ )
+ parser.add_argument(
+ '--role',
+ metavar='<role>',
+ action='append',
+ default=[],
+ help='Roles to authorize (name or ID) '
+ '(repeat to set multiple values) (required)',
+ required=True
+ )
+ parser.add_argument(
+ '--impersonate',
+ dest='impersonate',
+ action='store_true',
+ default=False,
+ help='Tokens generated from the trust will represent <trustor>'
+ ' (defaults to False)',
+ )
+ parser.add_argument(
+ '--expiration',
+ metavar='<expiration>',
+ help='Sets an expiration date for the trust'
+ ' (format of YYYY-mm-ddTHH:MM:SS)',
+ )
+ parser.add_argument(
+ '--project-domain',
+ metavar='<domain>',
+ help='Domain that contains <project> (name or ID)',
+ )
+ parser.add_argument(
+ '--trustor-domain',
+ metavar='<domain>',
+ help='Domain that contains <trustor> (name or ID)',
+ )
+ parser.add_argument(
+ '--trustee-domain',
+ metavar='<domain>',
+ help='Domain that contains <trustee> (name or ID)',
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug('take_action(%s)' % parsed_args)
+ identity_client = self.app.client_manager.identity
+
+ if parsed_args.project_domain:
+ project_domain = common.find_domain(identity_client,
+ parsed_args.project_domain).id
+ else:
+ project_domain = None
+
+ if parsed_args.trustor_domain:
+ trustor_domain = common.find_domain(identity_client,
+ parsed_args.trustor_domain).id
+ else:
+ trustor_domain = None
+
+ if parsed_args.trustee_domain:
+ trustee_domain = common.find_domain(identity_client,
+ parsed_args.trustee_domain).id
+ else:
+ trustee_domain = None
+
+ # NOTE(stevemar): Find the two users, project and roles that
+ # are necessary for making a trust usable, the API dictates that
+ # trustee, project and role are optional, but that makes the trust
+ # pointless, and trusts are immutable, so let's enforce it at the
+ # client level.
+ trustor_id = utils.find_resource(identity_client.users,
+ parsed_args.trustor,
+ domain_id=trustor_domain).id
+ trustee_id = utils.find_resource(identity_client.users,
+ parsed_args.trustee,
+ domain_id=trustee_domain).id
+ project_id = utils.find_resource(identity_client.projects,
+ parsed_args.project,
+ domain_id=project_domain).id
+
+ role_names = []
+ for role in parsed_args.role:
+ role_name = utils.find_resource(
+ identity_client.roles,
+ role,
+ ).name
+ role_names.append(role_name)
+
+ expires_at = None
+ if parsed_args.expiration:
+ expires_at = datetime.datetime.strptime(parsed_args.expiration,
+ '%Y-%m-%dT%H:%M:%S')
+
+ trust = identity_client.trusts.create(
+ trustee_id, trustor_id,
+ impersonation=parsed_args.impersonate,
+ project=project_id,
+ role_names=role_names,
+ expires_at=expires_at,
+ )
+
+ trust._info.pop('roles_links', None)
+ trust._info.pop('links', None)
+
+ # Format roles into something sensible
+ roles = trust._info.pop('roles')
+ msg = ''.join([r['name'] + ' ' for r in roles])
+ trust._info['roles'] = msg
+
+ return zip(*sorted(six.iteritems(trust._info)))
+
+
+class DeleteTrust(command.Command):
+ """Delete trust(s)"""
+
+ log = logging.getLogger(__name__ + '.DeleteTrust')
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteTrust, self).get_parser(prog_name)
+ parser.add_argument(
+ 'trust',
+ metavar='<trust>',
+ help='Trust(s) to delete',
+ nargs="+",
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug('take_action(%s)' % parsed_args)
+ identity_client = self.app.client_manager.identity
+ for t in parsed_args.trust:
+ trust_obj = utils.find_resource(identity_client.trusts, t)
+ identity_client.trusts.delete(trust_obj.id)
+
+
+class ListTrust(lister.Lister):
+ """List trusts"""
+
+ log = logging.getLogger(__name__ + '.ListTrust')
+
+ def take_action(self, parsed_args):
+ self.log.debug('take_action(%s)' % parsed_args)
+ columns = ('ID', 'Expires At', 'Impersonation', 'Project ID',
+ 'Trustee User ID', 'Trustor User ID')
+ data = self.app.client_manager.identity.trusts.list()
+ return (columns,
+ (utils.get_item_properties(
+ s, columns,
+ formatters={},
+ ) for s in data))
+
+
+class ShowTrust(show.ShowOne):
+ """Display trust details"""
+
+ log = logging.getLogger(__name__ + '.ShowTrust')
+
+ def get_parser(self, prog_name):
+ parser = super(ShowTrust, self).get_parser(prog_name)
+ parser.add_argument(
+ 'trust',
+ metavar='<trust>',
+ help='Trust to display',
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug('take_action(%s)' % parsed_args)
+ identity_client = self.app.client_manager.identity
+ trust = utils.find_resource(identity_client.trusts,
+ parsed_args.trust)
+
+ trust._info.pop('roles_links', None)
+ trust._info.pop('links', None)
+
+ # Format roles into something sensible
+ roles = trust._info.pop('roles')
+ msg = ''.join([r['name'] + ' ' for r in roles])
+ trust._info['roles'] = msg
+
+ return zip(*sorted(six.iteritems(trust._info)))
diff --git a/openstackclient/tests/identity/v3/fakes.py b/openstackclient/tests/identity/v3/fakes.py
index 68e67519..c868401a 100644
--- a/openstackclient/tests/identity/v3/fakes.py
+++ b/openstackclient/tests/identity/v3/fakes.py
@@ -181,7 +181,7 @@ ENDPOINT = {
'links': base_url + 'endpoints/' + endpoint_id,
}
-user_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
+user_id = 'bbbbbbb-aaaa-aaaa-aaaa-bbbbbbbaaaa'
user_name = 'paul'
user_description = 'Sir Paul'
user_email = 'paul@applecorps.com'
@@ -196,6 +196,22 @@ USER = {
'links': base_url + 'users/' + user_id,
}
+trust_id = 't-456'
+trust_expires = None
+trust_impersonation = False
+trust_roles = {"id": role_id, "name": role_name},
+
+TRUST = {
+ 'expires_at': trust_expires,
+ 'id': trust_id,
+ 'impersonation': trust_impersonation,
+ 'links': base_url + 'trusts/' + trust_id,
+ 'project_id': project_id,
+ 'roles': trust_roles,
+ 'trustee_user_id': user_id,
+ 'trustor_user_id': user_id,
+}
+
token_expires = '2014-01-01T00:00:00Z'
token_id = 'tttttttt-tttt-tttt-tttt-tttttttttttt'
@@ -342,6 +358,8 @@ class FakeIdentityv3Client(object):
self.session = mock.Mock()
self.session.auth.auth_ref.service_catalog.resource_class = \
fakes.FakeResource(None, {})
+ self.trusts = mock.Mock()
+ self.trusts.resource_class = fakes.FakeResource(None, {})
self.users = mock.Mock()
self.users.resource_class = fakes.FakeResource(None, {})
self.role_assignments = mock.Mock()
diff --git a/openstackclient/tests/identity/v3/test_trust.py b/openstackclient/tests/identity/v3/test_trust.py
new file mode 100644
index 00000000..b3fbe7f0
--- /dev/null
+++ b/openstackclient/tests/identity/v3/test_trust.py
@@ -0,0 +1,229 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+
+from openstackclient.identity.v3 import trust
+from openstackclient.tests import fakes
+from openstackclient.tests.identity.v3 import fakes as identity_fakes
+
+
+class TestTrust(identity_fakes.TestIdentityv3):
+
+ def setUp(self):
+ super(TestTrust, self).setUp()
+
+ self.trusts_mock = self.app.client_manager.identity.trusts
+ self.trusts_mock.reset_mock()
+ self.projects_mock = self.app.client_manager.identity.projects
+ self.projects_mock.reset_mock()
+ self.users_mock = self.app.client_manager.identity.users
+ self.users_mock.reset_mock()
+ self.roles_mock = self.app.client_manager.identity.roles
+ self.roles_mock.reset_mock()
+
+
+class TestTrustCreate(TestTrust):
+
+ def setUp(self):
+ super(TestTrustCreate, self).setUp()
+
+ self.projects_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.PROJECT),
+ loaded=True,
+ )
+
+ self.users_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.USER),
+ loaded=True,
+ )
+
+ self.roles_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.ROLE),
+ loaded=True,
+ )
+
+ self.trusts_mock.create.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.TRUST),
+ loaded=True,
+ )
+
+ # Get the command object to test
+ self.cmd = trust.CreateTrust(self.app, None)
+
+ def test_trust_create_basic(self):
+ arglist = [
+ '--project', identity_fakes.project_id,
+ '--role', identity_fakes.role_id,
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ ]
+ verifylist = [
+ ('project', identity_fakes.project_id),
+ ('impersonate', False),
+ ('role', [identity_fakes.role_id]),
+ ('trustor', identity_fakes.user_id),
+ ('trustee', identity_fakes.user_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'impersonation': False,
+ 'project': identity_fakes.project_id,
+ 'role_names': [identity_fakes.role_name],
+ 'expires_at': None,
+ }
+ # TrustManager.create(trustee_id, trustor_id, impersonation=,
+ # project=, role_names=, expires_at=)
+ self.trusts_mock.create.assert_called_with(
+ identity_fakes.user_id,
+ identity_fakes.user_id,
+ **kwargs
+ )
+
+ collist = ('expires_at', 'id', 'impersonation', 'project_id',
+ 'roles', 'trustee_user_id', 'trustor_user_id')
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.trust_expires,
+ identity_fakes.trust_id,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.role_name + ' ',
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ )
+ self.assertEqual(datalist, data)
+
+
+class TestTrustDelete(TestTrust):
+
+ def setUp(self):
+ super(TestTrustDelete, self).setUp()
+
+ # This is the return value for utils.find_resource()
+ self.trusts_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.TRUST),
+ loaded=True,
+ )
+ self.trusts_mock.delete.return_value = None
+
+ # Get the command object to test
+ self.cmd = trust.DeleteTrust(self.app, None)
+
+ def test_trust_delete(self):
+ arglist = [
+ identity_fakes.trust_id,
+ ]
+ verifylist = [
+ ('trust', [identity_fakes.trust_id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.trusts_mock.delete.assert_called_with(
+ identity_fakes.trust_id,
+ )
+
+
+class TestTrustList(TestTrust):
+
+ def setUp(self):
+ super(TestTrustList, self).setUp()
+
+ self.trusts_mock.list.return_value = [
+ fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.TRUST),
+ loaded=True,
+ ),
+ ]
+
+ # Get the command object to test
+ self.cmd = trust.ListTrust(self.app, None)
+
+ def test_trust_list_no_options(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.trusts_mock.list.assert_called_with()
+
+ collist = ('ID', 'Expires At', 'Impersonation', 'Project ID',
+ 'Trustee User ID', 'Trustor User ID')
+ self.assertEqual(collist, columns)
+ datalist = ((
+ identity_fakes.trust_id,
+ identity_fakes.trust_expires,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ ), )
+ self.assertEqual(datalist, tuple(data))
+
+
+class TestTrustShow(TestTrust):
+
+ def setUp(self):
+ super(TestTrustShow, self).setUp()
+
+ self.trusts_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.TRUST),
+ loaded=True,
+ )
+
+ # Get the command object to test
+ self.cmd = trust.ShowTrust(self.app, None)
+
+ def test_trust_show(self):
+ arglist = [
+ identity_fakes.trust_id,
+ ]
+ verifylist = [
+ ('trust', identity_fakes.trust_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.trusts_mock.get.assert_called_with(identity_fakes.trust_id)
+
+ collist = ('expires_at', 'id', 'impersonation', 'project_id',
+ 'roles', 'trustee_user_id', 'trustor_user_id')
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.trust_expires,
+ identity_fakes.trust_id,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.role_name + ' ',
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ )
+ self.assertEqual(datalist, data)