summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorLance Bragstad <lbragstad@gmail.com>2018-06-11 22:03:52 +0000
committerLance Bragstad <lbragstad@gmail.com>2018-07-23 21:18:50 +0000
commit641a4faac1d021518350925b4297677eeff98fde (patch)
tree55a2b551eff2dea6b9934827a90ccfd0265c6d2d /openstackclient
parent3493948d13aadd1a329b37eb8fafb731b1f5c6a7 (diff)
downloadpython-openstackclient-641a4faac1d021518350925b4297677eeff98fde.tar.gz
Implement support for registered limits
This commit adds support for users to manage registered limits via the command line. bp unified-limits Depends-On: https://review.openstack.org/#/c/574391/ Change-Id: Id8377363f7a3248b45aeeba21d2acc02684a0305
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/identity/v3/registered_limit.py260
-rw-r--r--openstackclient/tests/functional/identity/v3/common.py37
-rw-r--r--openstackclient/tests/functional/identity/v3/test_registered_limit.py184
-rw-r--r--openstackclient/tests/unit/identity/v3/fakes.py23
-rw-r--r--openstackclient/tests/unit/identity/v3/test_registered_limit.py510
5 files changed, 1014 insertions, 0 deletions
diff --git a/openstackclient/identity/v3/registered_limit.py b/openstackclient/identity/v3/registered_limit.py
new file mode 100644
index 00000000..72e07297
--- /dev/null
+++ b/openstackclient/identity/v3/registered_limit.py
@@ -0,0 +1,260 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Registered limits action implementations."""
+
+import logging
+
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+import six
+
+from openstackclient.i18n import _
+from openstackclient.identity import common as common_utils
+
+LOG = logging.getLogger(__name__)
+
+
+class CreateRegisteredLimit(command.ShowOne):
+ _description = _("Create a registered limit")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateRegisteredLimit, self).get_parser(prog_name)
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description of the registered limit'),
+ )
+ parser.add_argument(
+ '--region',
+ metavar='<region>',
+ help=_('Region for the registered limit to affect'),
+ )
+ parser.add_argument(
+ '--service',
+ metavar='<service>',
+ required=True,
+ help=_('Service responsible for the resource to limit (required)'),
+ )
+ parser.add_argument(
+ '--default-limit',
+ type=int,
+ metavar='<default-limit>',
+ required=True,
+ help=_('The default limit for the resources to assume (required)'),
+ )
+ parser.add_argument(
+ 'resource_name',
+ metavar='<resource-name>',
+ help=_('The name of the resource to limit'),
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ identity_client = self.app.client_manager.identity
+
+ service = utils.find_resource(
+ identity_client.services, parsed_args.service
+ )
+ region = None
+ if parsed_args.region:
+ region = utils.find_resource(
+ identity_client.regions, parsed_args.region
+ )
+
+ registered_limit = identity_client.registered_limits.create(
+ service,
+ parsed_args.resource_name,
+ parsed_args.default_limit,
+ description=parsed_args.description,
+ region=region
+ )
+
+ registered_limit._info.pop('links', None)
+ return zip(*sorted(six.iteritems(registered_limit._info)))
+
+
+class DeleteRegisteredLimit(command.Command):
+ _description = _("Delete a registered limit")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteRegisteredLimit, self).get_parser(prog_name)
+ parser.add_argument(
+ 'registered_limit_id',
+ metavar='<registered-limit-id>',
+ nargs="+",
+ help=_('Registered limit to delete (ID)'),
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ identity_client = self.app.client_manager.identity
+
+ errors = 0
+ for registered_limit_id in parsed_args.registered_limit_id:
+ try:
+ identity_client.registered_limits.delete(registered_limit_id)
+ except Exception as e:
+ errors += 1
+ from pprint import pprint
+ pprint(type(e))
+ LOG.error(_("Failed to delete registered limit with ID "
+ "'%(id)s': %(e)s"),
+ {'id': registered_limit_id, 'e': e})
+
+ if errors > 0:
+ total = len(parsed_args.registered_limit_id)
+ msg = (_("%(errors)s of %(total)s registered limits failed to "
+ "delete.") % {'errors': errors, 'total': total})
+ raise exceptions.CommandError(msg)
+
+
+class ListRegisteredLimit(command.Lister):
+ _description = _("List registered limits")
+
+ def get_parser(self, prog_name):
+ parser = super(ListRegisteredLimit, self).get_parser(prog_name)
+ parser.add_argument(
+ '--service',
+ metavar='<service>',
+ help=_('Service responsible for the resource to limit'),
+ )
+ parser.add_argument(
+ '--resource-name',
+ metavar='<resource-name>',
+ dest='resource_name',
+ help=_('The name of the resource to limit'),
+ )
+ parser.add_argument(
+ '--region',
+ metavar='<region>',
+ help=_('Region for the limit to affect.'),
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ identity_client = self.app.client_manager.identity
+
+ service = None
+ if parsed_args.service:
+ service = common_utils.find_service(
+ identity_client, parsed_args.service
+ )
+ region = None
+ if parsed_args.region:
+ region = utils.find_resource(
+ identity_client.regions, parsed_args.region
+ )
+
+ registered_limits = identity_client.registered_limits.list(
+ service=service,
+ resource_name=parsed_args.resource_name,
+ region=region
+ )
+
+ columns = (
+ 'ID', 'Service ID', 'Resource Name', 'Default Limit',
+ 'Description', 'Region ID'
+ )
+ return (
+ columns,
+ (utils.get_item_properties(s, columns) for s in registered_limits),
+ )
+
+
+class SetRegisteredLimit(command.ShowOne):
+ _description = _("Update information about a registered limit")
+
+ def get_parser(self, prog_name):
+ parser = super(SetRegisteredLimit, self).get_parser(prog_name)
+ parser.add_argument(
+ 'registered_limit_id',
+ metavar='<registered-limit-id>',
+ help=_('Registered limit to update (ID)'),
+ )
+ parser.add_argument(
+ '--service',
+ metavar='<service>',
+ help=_('Service responsible for the resource to limit'),
+ )
+ parser.add_argument(
+ '--resource-name',
+ metavar='<resource-name>',
+ help=_('The name of the resource to limit'),
+ )
+ parser.add_argument(
+ '--default-limit',
+ metavar='<default-limit>',
+ type=int,
+ help=_('The default limit for the resources to assume'),
+ )
+ parser.add_argument(
+ '--description',
+ metavar='<description>',
+ help=_('Description of the registered limit'),
+ )
+ parser.add_argument(
+ '--region',
+ metavar='<region>',
+ help=_('Region for the registered limit to affect.'),
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ identity_client = self.app.client_manager.identity
+
+ service = None
+ if parsed_args.service:
+ service = common_utils.find_service(
+ identity_client, parsed_args.service
+ )
+
+ region = None
+ if parsed_args.region:
+ region = utils.find_resource(
+ identity_client.regions, parsed_args.region
+ )
+
+ registered_limit = identity_client.registered_limits.update(
+ parsed_args.registered_limit_id,
+ service=service,
+ resource_name=parsed_args.resource_name,
+ default_limit=parsed_args.default_limit,
+ description=parsed_args.description,
+ region=region
+ )
+
+ registered_limit._info.pop('links', None)
+ return zip(*sorted(six.iteritems(registered_limit._info)))
+
+
+class ShowRegisteredLimit(command.ShowOne):
+ _description = _("Display registered limit details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowRegisteredLimit, self).get_parser(prog_name)
+ parser.add_argument(
+ 'registered_limit_id',
+ metavar='<registered-limit-id>',
+ help=_('Registered limit to display (ID)'),
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ identity_client = self.app.client_manager.identity
+ registered_limit = identity_client.registered_limits.get(
+ parsed_args.registered_limit_id
+ )
+ registered_limit._info.pop('links', None)
+ return zip(*sorted(six.iteritems(registered_limit._info)))
diff --git a/openstackclient/tests/functional/identity/v3/common.py b/openstackclient/tests/functional/identity/v3/common.py
index 54132be5..525a31a2 100644
--- a/openstackclient/tests/functional/identity/v3/common.py
+++ b/openstackclient/tests/functional/identity/v3/common.py
@@ -54,6 +54,11 @@ class IdentityTests(base.TestCase):
'Auth URL']
IMPLIED_ROLE_LIST_HEADERS = ['Prior Role ID', 'Prior Role Name',
'Implied Role ID', 'Implied Role Name']
+ REGISTERED_LIMIT_FIELDS = ['id', 'service_id', 'resource_name',
+ 'default_limit', 'description', 'region_id']
+ REGISTERED_LIMIT_LIST_HEADERS = ['ID', 'Service ID', 'Resource Name',
+ 'Default Limit', 'Description',
+ 'Region ID']
@classmethod
def setUpClass(cls):
@@ -319,3 +324,35 @@ class IdentityTests(base.TestCase):
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.SERVICE_PROVIDER_FIELDS)
return service_provider
+
+ def _create_dummy_registered_limit(self, add_clean_up=True):
+ service_name = self._create_dummy_service()
+ resource_name = data_utils.rand_name('resource_name')
+ params = {
+ 'service_name': service_name,
+ 'default_limit': 10,
+ 'resource_name': resource_name
+ }
+ raw_output = self.openstack(
+ 'registered limit create'
+ ' --service %(service_name)s'
+ ' --default-limit %(default_limit)s'
+ ' %(resource_name)s' % params
+ )
+ items = self.parse_show(raw_output)
+ registered_limit_id = self._extract_value_from_items('id', items)
+
+ if add_clean_up:
+ self.addCleanup(
+ self.openstack,
+ 'registered limit delete %s' % registered_limit_id
+ )
+
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+ return registered_limit_id
+
+ def _extract_value_from_items(self, key, items):
+ for d in items:
+ for k, v in d.iteritems():
+ if k == key:
+ return v
diff --git a/openstackclient/tests/functional/identity/v3/test_registered_limit.py b/openstackclient/tests/functional/identity/v3/test_registered_limit.py
new file mode 100644
index 00000000..09e90ce2
--- /dev/null
+++ b/openstackclient/tests/functional/identity/v3/test_registered_limit.py
@@ -0,0 +1,184 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+
+from openstackclient.tests.functional.identity.v3 import common
+
+
+class RegisteredLimitTestCase(common.IdentityTests):
+
+ def test_registered_limit_create_with_service_name(self):
+ self._create_dummy_registered_limit()
+
+ def test_registered_limit_create_with_service_id(self):
+ service_name = self._create_dummy_service()
+ raw_output = self.openstack(
+ 'service show'
+ ' %(service_name)s' % {'service_name': service_name}
+ )
+ service_items = self.parse_show(raw_output)
+ service_id = self._extract_value_from_items('id', service_items)
+
+ raw_output = self.openstack(
+ 'registered limit create'
+ ' --service %(service_id)s'
+ ' --default-limit %(default_limit)s'
+ ' %(resource_name)s' % {
+ 'service_id': service_id,
+ 'default_limit': 10,
+ 'resource_name': 'cores'
+ }
+ )
+ items = self.parse_show(raw_output)
+ registered_limit_id = self._extract_value_from_items('id', items)
+ self.addCleanup(
+ self.openstack,
+ 'registered limit delete'
+ ' %(registered_limit_id)s' % {
+ 'registered_limit_id': registered_limit_id
+ }
+ )
+
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_create_with_options(self):
+ service_name = self._create_dummy_service()
+ region_id = self._create_dummy_region()
+ params = {
+ 'service_name': service_name,
+ 'resource_name': 'cores',
+ 'default_limit': 10,
+ 'description': 'default limit for cores',
+ 'region_id': region_id
+ }
+
+ raw_output = self.openstack(
+ 'registered limit create'
+ ' --description \'%(description)s\''
+ ' --region %(region_id)s'
+ ' --service %(service_name)s'
+ ' --default-limit %(default_limit)s'
+ ' %(resource_name)s' % params
+ )
+ items = self.parse_show(raw_output)
+ registered_limit_id = self._extract_value_from_items('id', items)
+ self.addCleanup(
+ self.openstack,
+ 'registered limit delete %(registered_limit_id)s' % {
+ 'registered_limit_id': registered_limit_id
+ }
+ )
+
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_show(self):
+ registered_limit_id = self._create_dummy_registered_limit()
+ raw_output = self.openstack(
+ 'registered limit show %(registered_limit_id)s' % {
+ 'registered_limit_id': registered_limit_id
+ }
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_set_region_id(self):
+ region_id = self._create_dummy_region()
+ registered_limit_id = self._create_dummy_registered_limit()
+
+ params = {
+ 'registered_limit_id': registered_limit_id,
+ 'region_id': region_id
+ }
+ raw_output = self.openstack(
+ 'registered limit set'
+ ' %(registered_limit_id)s'
+ ' --region %(region_id)s' % params
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_set_description(self):
+ registered_limit_id = self._create_dummy_registered_limit()
+ params = {
+ 'registered_limit_id': registered_limit_id,
+ 'description': 'updated description'
+ }
+ raw_output = self.openstack(
+ 'registered limit set'
+ ' %(registered_limit_id)s'
+ ' --description \'%(description)s\'' % params
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_set_service(self):
+ registered_limit_id = self._create_dummy_registered_limit()
+ service_name = self._create_dummy_service()
+ params = {
+ 'registered_limit_id': registered_limit_id,
+ 'service': service_name
+ }
+ raw_output = self.openstack(
+ 'registered limit set'
+ ' %(registered_limit_id)s'
+ ' --service %(service)s' % params
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_set_default_limit(self):
+ registered_limit_id = self._create_dummy_registered_limit()
+ params = {
+ 'registered_limit_id': registered_limit_id,
+ 'default_limit': 20
+ }
+ raw_output = self.openstack(
+ 'registered limit set'
+ ' %(registered_limit_id)s'
+ ' --default-limit %(default_limit)s' % params
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_set_resource_name(self):
+ registered_limit_id = self._create_dummy_registered_limit()
+ resource_name = data_utils.rand_name('resource_name')
+ params = {
+ 'registered_limit_id': registered_limit_id,
+ 'resource_name': resource_name
+ }
+ raw_output = self.openstack(
+ 'registered limit set'
+ ' %(registered_limit_id)s'
+ ' --resource-name %(resource_name)s' % params
+ )
+ items = self.parse_show(raw_output)
+ self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
+
+ def test_registered_limit_list(self):
+ self._create_dummy_registered_limit()
+ raw_output = self.openstack('registered limit list')
+ items = self.parse_listing(raw_output)
+ self.assert_table_structure(items, self.REGISTERED_LIMIT_LIST_HEADERS)
+
+ def test_registered_limit_delete(self):
+ registered_limit_id = self._create_dummy_registered_limit(
+ add_clean_up=False
+ )
+ raw_output = self.openstack(
+ 'registered limit delete'
+ ' %(registered_limit_id)s' % {
+ 'registered_limit_id': registered_limit_id
+ }
+ )
+ self.assertEqual(0, len(raw_output))
diff --git a/openstackclient/tests/unit/identity/v3/fakes.py b/openstackclient/tests/unit/identity/v3/fakes.py
index 7aa9cd7c..3cae4515 100644
--- a/openstackclient/tests/unit/identity/v3/fakes.py
+++ b/openstackclient/tests/unit/identity/v3/fakes.py
@@ -486,6 +486,27 @@ APP_CRED_OPTIONS = {
'secret': app_cred_secret
}
+registered_limit_id = 'registered-limit-id'
+registered_limit_default_limit = 10
+registered_limit_description = 'default limit of foobars'
+registered_limit_resource_name = 'foobars'
+REGISTERED_LIMIT = {
+ 'id': registered_limit_id,
+ 'default_limit': registered_limit_default_limit,
+ 'resource_name': registered_limit_resource_name,
+ 'service_id': service_id,
+ 'description': None,
+ 'region_id': None
+}
+REGISTERED_LIMIT_OPTIONS = {
+ 'id': registered_limit_id,
+ 'default_limit': registered_limit_default_limit,
+ 'resource_name': registered_limit_resource_name,
+ 'service_id': service_id,
+ 'description': registered_limit_description,
+ 'region_id': region_id
+}
+
def fake_auth_ref(fake_token, fake_service=None):
"""Create an auth_ref using keystoneauth's fixtures"""
@@ -578,6 +599,8 @@ class FakeIdentityv3Client(object):
{})
self.inference_rules = mock.Mock()
self.inference_rules.resource_class = fakes.FakeResource(None, {})
+ self.registered_limits = mock.Mock()
+ self.registered_limits.resource_class = fakes.FakeResource(None, {})
class FakeFederationManager(object):
diff --git a/openstackclient/tests/unit/identity/v3/test_registered_limit.py b/openstackclient/tests/unit/identity/v3/test_registered_limit.py
new file mode 100644
index 00000000..262ca4f9
--- /dev/null
+++ b/openstackclient/tests/unit/identity/v3/test_registered_limit.py
@@ -0,0 +1,510 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from keystoneauth1.exceptions import http as ksa_exceptions
+from osc_lib import exceptions
+
+from openstackclient.identity.v3 import registered_limit
+from openstackclient.tests.unit import fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+
+
+class TestRegisteredLimit(identity_fakes.TestIdentityv3):
+
+ def setUp(self):
+ super(TestRegisteredLimit, self).setUp()
+
+ identity_manager = self.app.client_manager.identity
+ self.registered_limit_mock = identity_manager.registered_limits
+
+ self.services_mock = identity_manager.services
+ self.services_mock.reset_mock()
+
+ self.regions_mock = identity_manager.regions
+ self.regions_mock.reset_mock()
+
+
+class TestRegisteredLimitCreate(TestRegisteredLimit):
+
+ def setUp(self):
+ super(TestRegisteredLimitCreate, self).setUp()
+
+ self.service = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.SERVICE),
+ loaded=True
+ )
+ self.services_mock.get.return_value = self.service
+
+ self.region = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.REGION),
+ loaded=True
+ )
+ self.regions_mock.get.return_value = self.region
+
+ self.cmd = registered_limit.CreateRegisteredLimit(self.app, None)
+
+ def test_registered_limit_create_without_options(self):
+ self.registered_limit_mock.create.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.REGISTERED_LIMIT),
+ loaded=True
+ )
+
+ resource_name = identity_fakes.registered_limit_resource_name
+ default_limit = identity_fakes.registered_limit_default_limit
+ arglist = [
+ '--service', identity_fakes.service_id,
+ '--default-limit', '10',
+ resource_name,
+ ]
+
+ verifylist = [
+ ('service', identity_fakes.service_id),
+ ('default_limit', default_limit),
+ ('resource_name', resource_name)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'description': None, 'region': None}
+ self.registered_limit_mock.create.assert_called_with(
+ self.service, resource_name, default_limit, **kwargs
+ )
+
+ collist = ('default_limit', 'description', 'id', 'region_id',
+ 'resource_name', 'service_id')
+
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+ def test_registered_limit_create_with_options(self):
+ self.registered_limit_mock.create.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.REGISTERED_LIMIT_OPTIONS),
+ loaded=True
+ )
+
+ resource_name = identity_fakes.registered_limit_resource_name
+ default_limit = identity_fakes.registered_limit_default_limit
+ description = identity_fakes.registered_limit_description
+ arglist = [
+ '--region', identity_fakes.region_id,
+ '--description', description,
+ '--service', identity_fakes.service_id,
+ '--default-limit', '10',
+ resource_name
+ ]
+
+ verifylist = [
+ ('region', identity_fakes.region_id),
+ ('description', description),
+ ('service', identity_fakes.service_id),
+ ('default_limit', default_limit),
+ ('resource_name', resource_name)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'description': description, 'region': self.region}
+ self.registered_limit_mock.create.assert_called_with(
+ self.service, resource_name, default_limit, **kwargs
+ )
+
+ collist = ('default_limit', 'description', 'id', 'region_id',
+ 'resource_name', 'service_id')
+
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ description,
+ identity_fakes.registered_limit_id,
+ identity_fakes.region_id,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+
+class TestRegisteredLimitDelete(TestRegisteredLimit):
+
+ def setUp(self):
+ super(TestRegisteredLimitDelete, self).setUp()
+
+ self.cmd = registered_limit.DeleteRegisteredLimit(self.app, None)
+
+ def test_registered_limit_delete(self):
+ self.registered_limit_mock.delete.return_value = None
+
+ arglist = [identity_fakes.registered_limit_id]
+ verifylist = [
+ ('registered_limit_id', [identity_fakes.registered_limit_id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.delete.assert_called_with(
+ identity_fakes.registered_limit_id
+ )
+ self.assertIsNone(result)
+
+ def test_registered_limit_delete_with_exception(self):
+ return_value = ksa_exceptions.NotFound()
+ self.registered_limit_mock.delete.side_effect = return_value
+
+ arglist = ['fake-registered-limit-id']
+ verifylist = [
+ ('registered_limit_id', ['fake-registered-limit-id'])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual(
+ '1 of 1 registered limits failed to delete.', str(e)
+ )
+
+
+class TestRegisteredLimitShow(TestRegisteredLimit):
+
+ def setUp(self):
+ super(TestRegisteredLimitShow, self).setUp()
+
+ self.registered_limit_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.REGISTERED_LIMIT),
+ loaded=True
+ )
+
+ self.cmd = registered_limit.ShowRegisteredLimit(self.app, None)
+
+ def test_registered_limit_show(self):
+ arglist = [identity_fakes.registered_limit_id]
+ verifylist = [
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.get.assert_called_with(
+ identity_fakes.registered_limit_id
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+
+class TestRegisteredLimitSet(TestRegisteredLimit):
+
+ def setUp(self):
+ super(TestRegisteredLimitSet, self).setUp()
+ self.cmd = registered_limit.SetRegisteredLimit(self.app, None)
+
+ def test_registered_limit_set_description(self):
+ registered_limit = copy.deepcopy(identity_fakes.REGISTERED_LIMIT)
+ registered_limit['description'] = (
+ identity_fakes.registered_limit_description
+ )
+ self.registered_limit_mock.update.return_value = fakes.FakeResource(
+ None, registered_limit, loaded=True
+ )
+
+ arglist = [
+ '--description', identity_fakes.registered_limit_description,
+ identity_fakes.registered_limit_id
+ ]
+ verifylist = [
+ ('description', identity_fakes.registered_limit_description),
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.update.assert_called_with(
+ identity_fakes.registered_limit_id,
+ service=None,
+ resource_name=None,
+ default_limit=None,
+ description=identity_fakes.registered_limit_description,
+ region=None
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ identity_fakes.registered_limit_description,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+ def test_registered_limit_set_default_limit(self):
+ registered_limit = copy.deepcopy(identity_fakes.REGISTERED_LIMIT)
+ default_limit = 20
+ registered_limit['default_limit'] = default_limit
+ self.registered_limit_mock.update.return_value = fakes.FakeResource(
+ None, registered_limit, loaded=True
+ )
+
+ arglist = [
+ '--default-limit', str(default_limit),
+ identity_fakes.registered_limit_id
+ ]
+ verifylist = [
+ ('default_limit', default_limit),
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.update.assert_called_with(
+ identity_fakes.registered_limit_id,
+ service=None,
+ resource_name=None,
+ default_limit=default_limit,
+ description=None,
+ region=None
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+ def test_registered_limit_set_resource_name(self):
+ registered_limit = copy.deepcopy(identity_fakes.REGISTERED_LIMIT)
+ resource_name = 'volumes'
+ registered_limit['resource_name'] = resource_name
+ self.registered_limit_mock.update.return_value = fakes.FakeResource(
+ None, registered_limit, loaded=True
+ )
+
+ arglist = [
+ '--resource-name', resource_name,
+ identity_fakes.registered_limit_id
+ ]
+ verifylist = [
+ ('resource_name', resource_name),
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.update.assert_called_with(
+ identity_fakes.registered_limit_id,
+ service=None,
+ resource_name=resource_name,
+ default_limit=None,
+ description=None,
+ region=None
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+ def test_registered_limit_set_service(self):
+ registered_limit = copy.deepcopy(identity_fakes.REGISTERED_LIMIT)
+ service = identity_fakes.FakeService.create_one_service()
+ registered_limit['service_id'] = service.id
+ self.registered_limit_mock.update.return_value = fakes.FakeResource(
+ None, registered_limit, loaded=True
+ )
+ self.services_mock.get.return_value = service
+
+ arglist = [
+ '--service', service.id,
+ identity_fakes.registered_limit_id
+ ]
+ verifylist = [
+ ('service', service.id),
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.update.assert_called_with(
+ identity_fakes.registered_limit_id,
+ service=service,
+ resource_name=None,
+ default_limit=None,
+ description=None,
+ region=None
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ service.id
+ )
+ self.assertEqual(datalist, data)
+
+ def test_registered_limit_set_region(self):
+ registered_limit = copy.deepcopy(identity_fakes.REGISTERED_LIMIT)
+ region = identity_fakes.REGION
+ region['id'] = 'RegionTwo'
+ region = fakes.FakeResource(
+ None,
+ copy.deepcopy(region),
+ loaded=True
+ )
+ registered_limit['region_id'] = region.id
+ self.registered_limit_mock.update.return_value = fakes.FakeResource(
+ None, registered_limit, loaded=True
+ )
+ self.regions_mock.get.return_value = region
+
+ arglist = [
+ '--region', region.id,
+ identity_fakes.registered_limit_id
+ ]
+ verifylist = [
+ ('region', region.id),
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.update.assert_called_with(
+ identity_fakes.registered_limit_id,
+ service=None,
+ resource_name=None,
+ default_limit=None,
+ description=None,
+ region=region
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ region.id,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)
+
+
+class TestRegisteredLimitList(TestRegisteredLimit):
+
+ def setUp(self):
+ super(TestRegisteredLimitList, self).setUp()
+
+ self.registered_limit_mock.get.return_value = fakes.FakeResource(
+ None,
+ copy.deepcopy(identity_fakes.REGISTERED_LIMIT),
+ loaded=True
+ )
+
+ self.cmd = registered_limit.ShowRegisteredLimit(self.app, None)
+
+ def test_limit_show(self):
+ arglist = [identity_fakes.registered_limit_id]
+ verifylist = [
+ ('registered_limit_id', identity_fakes.registered_limit_id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.registered_limit_mock.get.assert_called_with(
+ identity_fakes.registered_limit_id
+ )
+
+ collist = (
+ 'default_limit', 'description', 'id', 'region_id', 'resource_name',
+ 'service_id'
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ identity_fakes.registered_limit_default_limit,
+ None,
+ identity_fakes.registered_limit_id,
+ None,
+ identity_fakes.registered_limit_resource_name,
+ identity_fakes.service_id
+ )
+ self.assertEqual(datalist, data)