diff options
| author | Steve Martinelli <s.martinelli@gmail.com> | 2016-09-05 22:14:33 -0700 |
|---|---|---|
| committer | Steve Martinelli <s.martinelli@gmail.com> | 2016-09-08 15:19:50 -0700 |
| commit | 39839def2e356e8d145be89380c73a71423cf06d (patch) | |
| tree | f53c090b7ded46554866436e9d687492d2cef487 /openstackclient/tests/unit/common | |
| parent | 7d1a5d0854c732681a130255ddc6abb2e9721a7a (diff) | |
| download | python-openstackclient-39839def2e356e8d145be89380c73a71423cf06d.tar.gz | |
move unit tests to new "unit" test module
this will better isolate the unit tests from the functional tests.
unfortunately, the "integration" tests had to be lumped into the
"unit" tests since we need the separation in testr.conf
Change-Id: Ifd12198c1f90e4e3c951c73bfa1884ab300d8ded
Diffstat (limited to 'openstackclient/tests/unit/common')
| -rw-r--r-- | openstackclient/tests/unit/common/__init__.py | 0 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_availability_zone.py | 268 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_clientmanager.py | 69 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_command.py | 49 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_commandmanager.py | 107 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_configuration.py | 85 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_extension.py | 244 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_logs.py | 207 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_module.py | 130 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_parseractions.py | 221 | ||||
| -rw-r--r-- | openstackclient/tests/unit/common/test_quota.py | 381 |
11 files changed, 1761 insertions, 0 deletions
diff --git a/openstackclient/tests/unit/common/__init__.py b/openstackclient/tests/unit/common/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/openstackclient/tests/unit/common/__init__.py diff --git a/openstackclient/tests/unit/common/test_availability_zone.py b/openstackclient/tests/unit/common/test_availability_zone.py new file mode 100644 index 00000000..6c7adc43 --- /dev/null +++ b/openstackclient/tests/unit/common/test_availability_zone.py @@ -0,0 +1,268 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +import six + +from openstackclient.common import availability_zone +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils +from openstackclient.tests.unit.volume.v2 import fakes as volume_fakes + + +def _build_compute_az_datalist(compute_az, long_datalist=False): + datalist = () + if not long_datalist: + datalist = ( + compute_az.zoneName, + 'available', + ) + else: + for host, services in six.iteritems(compute_az.hosts): + for service, state in six.iteritems(services): + datalist += ( + compute_az.zoneName, + 'available', + '', + host, + service, + 'enabled :-) ' + state['updated_at'], + ) + return (datalist,) + + +def _build_volume_az_datalist(volume_az, long_datalist=False): + datalist = () + if not long_datalist: + datalist = ( + volume_az.zoneName, + 'available', + ) + else: + datalist = ( + volume_az.zoneName, + 'available', + '', '', '', '', + ) + return (datalist,) + + +def _build_network_az_datalist(network_az, long_datalist=False): + datalist = () + if not long_datalist: + datalist = ( + network_az.name, + network_az.state, + ) + else: + datalist = ( + network_az.name, + network_az.state, + network_az.resource, + '', '', '', + ) + return (datalist,) + + +class TestAvailabilityZone(utils.TestCommand): + + def setUp(self): + super(TestAvailabilityZone, self).setUp() + + compute_client = compute_fakes.FakeComputev2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.compute = compute_client + + self.compute_azs_mock = compute_client.availability_zones + self.compute_azs_mock.reset_mock() + + volume_client = volume_fakes.FakeVolumeClient( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.volume = volume_client + + self.volume_azs_mock = volume_client.availability_zones + self.volume_azs_mock.reset_mock() + + network_client = network_fakes.FakeNetworkV2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.network = network_client + + network_client.availability_zones = mock.Mock() + network_client.find_extension = mock.Mock() + self.network_azs_mock = network_client.availability_zones + + +class TestAvailabilityZoneList(TestAvailabilityZone): + + compute_azs = \ + compute_fakes.FakeAvailabilityZone.create_availability_zones() + volume_azs = \ + volume_fakes.FakeAvailabilityZone.create_availability_zones(count=1) + network_azs = \ + network_fakes.FakeAvailabilityZone.create_availability_zones() + + short_columnslist = ('Zone Name', 'Zone Status') + long_columnslist = ( + 'Zone Name', + 'Zone Status', + 'Zone Resource', + 'Host Name', + 'Service Name', + 'Service Status', + ) + + def setUp(self): + super(TestAvailabilityZoneList, self).setUp() + + self.compute_azs_mock.list.return_value = self.compute_azs + self.volume_azs_mock.list.return_value = self.volume_azs + self.network_azs_mock.return_value = self.network_azs + + # Get the command object to test + self.cmd = availability_zone.ListAvailabilityZone(self.app, None) + + def test_availability_zone_list_no_options(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute_azs_mock.list.assert_called_with() + self.volume_azs_mock.list.assert_called_with() + self.network_azs_mock.assert_called_with() + + self.assertEqual(self.short_columnslist, columns) + datalist = () + for compute_az in self.compute_azs: + datalist += _build_compute_az_datalist(compute_az) + for volume_az in self.volume_azs: + datalist += _build_volume_az_datalist(volume_az) + for network_az in self.network_azs: + datalist += _build_network_az_datalist(network_az) + self.assertEqual(datalist, tuple(data)) + + def test_availability_zone_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute_azs_mock.list.assert_called_with() + self.volume_azs_mock.list.assert_called_with() + self.network_azs_mock.assert_called_with() + + self.assertEqual(self.long_columnslist, columns) + datalist = () + for compute_az in self.compute_azs: + datalist += _build_compute_az_datalist(compute_az, + long_datalist=True) + for volume_az in self.volume_azs: + datalist += _build_volume_az_datalist(volume_az, + long_datalist=True) + for network_az in self.network_azs: + datalist += _build_network_az_datalist(network_az, + long_datalist=True) + self.assertEqual(datalist, tuple(data)) + + def test_availability_zone_list_compute(self): + arglist = [ + '--compute', + ] + verifylist = [ + ('compute', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute_azs_mock.list.assert_called_with() + self.volume_azs_mock.list.assert_not_called() + self.network_azs_mock.assert_not_called() + + self.assertEqual(self.short_columnslist, columns) + datalist = () + for compute_az in self.compute_azs: + datalist += _build_compute_az_datalist(compute_az) + self.assertEqual(datalist, tuple(data)) + + def test_availability_zone_list_volume(self): + arglist = [ + '--volume', + ] + verifylist = [ + ('volume', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute_azs_mock.list.assert_not_called() + self.volume_azs_mock.list.assert_called_with() + self.network_azs_mock.assert_not_called() + + self.assertEqual(self.short_columnslist, columns) + datalist = () + for volume_az in self.volume_azs: + datalist += _build_volume_az_datalist(volume_az) + self.assertEqual(datalist, tuple(data)) + + def test_availability_zone_list_network(self): + arglist = [ + '--network', + ] + verifylist = [ + ('network', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute_azs_mock.list.assert_not_called() + self.volume_azs_mock.list.assert_not_called() + self.network_azs_mock.assert_called_with() + + self.assertEqual(self.short_columnslist, columns) + datalist = () + for network_az in self.network_azs: + datalist += _build_network_az_datalist(network_az) + self.assertEqual(datalist, tuple(data)) diff --git a/openstackclient/tests/unit/common/test_clientmanager.py b/openstackclient/tests/unit/common/test_clientmanager.py new file mode 100644 index 00000000..7f82c35d --- /dev/null +++ b/openstackclient/tests/unit/common/test_clientmanager.py @@ -0,0 +1,69 @@ +# Copyright 2012-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy + +from keystoneauth1 import token_endpoint +from osc_lib.tests import utils as osc_lib_test_utils + +from openstackclient.common import clientmanager +from openstackclient.tests.unit import fakes + + +class TestClientManager(osc_lib_test_utils.TestClientManager): + + def _clientmanager_class(self): + """Allow subclasses to override the ClientManager class""" + return clientmanager.ClientManager + + def test_client_manager_token_endpoint(self): + token_auth = { + 'url': fakes.AUTH_URL, + 'token': fakes.AUTH_TOKEN, + } + client_manager = self._make_clientmanager( + auth_args=token_auth, + auth_plugin_name='token_endpoint', + ) + + self.assertEqual( + fakes.AUTH_URL, + client_manager._cli_options.config['auth']['url'], + ) + self.assertEqual( + fakes.AUTH_TOKEN, + client_manager.auth.get_token(None), + ) + self.assertIsInstance( + client_manager.auth, + token_endpoint.Token, + ) + self.assertTrue(client_manager.is_network_endpoint_enabled()) + + def test_client_manager_network_endpoint_disabled(self): + auth_args = copy.deepcopy(self.default_password_auth) + auth_args.update({ + 'user_domain_name': 'default', + 'project_domain_name': 'default', + }) + # v3 fake doesn't have network endpoint + client_manager = self._make_clientmanager( + auth_args=auth_args, + identity_api_version='3', + auth_plugin_name='v3password', + ) + + self.assertFalse(client_manager.is_service_available('network')) + self.assertFalse(client_manager.is_network_endpoint_enabled()) diff --git a/openstackclient/tests/unit/common/test_command.py b/openstackclient/tests/unit/common/test_command.py new file mode 100644 index 00000000..f24b290b --- /dev/null +++ b/openstackclient/tests/unit/common/test_command.py @@ -0,0 +1,49 @@ +# Copyright 2016 NEC Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from osc_lib import exceptions + +from openstackclient.common import command +from openstackclient.tests.unit import fakes as test_fakes +from openstackclient.tests.unit import utils as test_utils + + +class FakeCommand(command.Command): + + def take_action(self, parsed_args): + pass + + +class TestCommand(test_utils.TestCase): + + def test_command_has_logger(self): + cmd = FakeCommand(mock.Mock(), mock.Mock()) + self.assertTrue(hasattr(cmd, 'log')) + self.assertEqual('openstackclient.tests.unit.common.test_command.' + 'FakeCommand', cmd.log.name) + + def test_validate_os_beta_command_enabled(self): + cmd = FakeCommand(mock.Mock(), mock.Mock()) + cmd.app = mock.Mock() + cmd.app.options = test_fakes.FakeOptions() + + # No exception is raised when enabled. + cmd.app.options.os_beta_command = True + cmd.validate_os_beta_command_enabled() + + cmd.app.options.os_beta_command = False + self.assertRaises(exceptions.CommandError, + cmd.validate_os_beta_command_enabled) diff --git a/openstackclient/tests/unit/common/test_commandmanager.py b/openstackclient/tests/unit/common/test_commandmanager.py new file mode 100644 index 00000000..0c6c99c0 --- /dev/null +++ b/openstackclient/tests/unit/common/test_commandmanager.py @@ -0,0 +1,107 @@ +# Copyright 2012-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from openstackclient.common import commandmanager +from openstackclient.tests.unit import utils + + +class FakeCommand(object): + + @classmethod + def load(cls): + return cls + + def __init__(self): + return + +FAKE_CMD_ONE = FakeCommand +FAKE_CMD_TWO = FakeCommand +FAKE_CMD_ALPHA = FakeCommand +FAKE_CMD_BETA = FakeCommand + + +class FakeCommandManager(commandmanager.CommandManager): + commands = {} + + def load_commands(self, namespace): + if namespace == 'test': + self.commands['one'] = FAKE_CMD_ONE + self.commands['two'] = FAKE_CMD_TWO + self.group_list.append(namespace) + elif namespace == 'greek': + self.commands['alpha'] = FAKE_CMD_ALPHA + self.commands['beta'] = FAKE_CMD_BETA + self.group_list.append(namespace) + + +class TestCommandManager(utils.TestCase): + + def test_add_command_group(self): + mgr = FakeCommandManager('test') + + # Make sure add_command() still functions + mock_cmd_one = mock.Mock() + mgr.add_command('mock', mock_cmd_one) + cmd_mock, name, args = mgr.find_command(['mock']) + self.assertEqual(mock_cmd_one, cmd_mock) + + # Find a command added in initialization + cmd_one, name, args = mgr.find_command(['one']) + self.assertEqual(FAKE_CMD_ONE, cmd_one) + + # Load another command group + mgr.add_command_group('greek') + + # Find a new command + cmd_alpha, name, args = mgr.find_command(['alpha']) + self.assertEqual(FAKE_CMD_ALPHA, cmd_alpha) + + # Ensure that the original commands were not overwritten + cmd_two, name, args = mgr.find_command(['two']) + self.assertEqual(FAKE_CMD_TWO, cmd_two) + + def test_get_command_groups(self): + mgr = FakeCommandManager('test') + + # Make sure add_command() still functions + mock_cmd_one = mock.Mock() + mgr.add_command('mock', mock_cmd_one) + cmd_mock, name, args = mgr.find_command(['mock']) + self.assertEqual(mock_cmd_one, cmd_mock) + + # Load another command group + mgr.add_command_group('greek') + + gl = mgr.get_command_groups() + self.assertEqual(['test', 'greek'], gl) + + def test_get_command_names(self): + mock_cmd_one = mock.Mock() + mock_cmd_one.name = 'one' + mock_cmd_two = mock.Mock() + mock_cmd_two.name = 'cmd two' + mock_pkg_resources = mock.Mock( + return_value=[mock_cmd_one, mock_cmd_two], + ) + with mock.patch( + 'pkg_resources.iter_entry_points', + mock_pkg_resources, + ) as iter_entry_points: + mgr = commandmanager.CommandManager('test') + iter_entry_points.assert_called_once_with('test') + cmds = mgr.get_command_names('test') + self.assertEqual(['one', 'cmd two'], cmds) diff --git a/openstackclient/tests/unit/common/test_configuration.py b/openstackclient/tests/unit/common/test_configuration.py new file mode 100644 index 00000000..e10522b9 --- /dev/null +++ b/openstackclient/tests/unit/common/test_configuration.py @@ -0,0 +1,85 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from openstackclient.common import configuration +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit import utils + + +class TestConfiguration(utils.TestCommand): + + columns = ( + 'auth.password', + 'auth.token', + 'auth.username', + 'identity_api_version', + 'region', + ) + datalist = ( + configuration.REDACTED, + configuration.REDACTED, + fakes.USERNAME, + fakes.VERSION, + fakes.REGION_NAME, + ) + + opts = [mock.Mock(secret=True, dest="password"), + mock.Mock(secret=True, dest="token")] + + @mock.patch("keystoneauth1.loading.base.get_plugin_options", + return_value=opts) + def test_show(self, m_get_plugin_opts): + arglist = [] + verifylist = [('mask', True)] + cmd = configuration.ShowConfiguration(self.app, None) + parsed_args = self.check_parser(cmd, arglist, verifylist) + + columns, data = cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.datalist, data) + + @mock.patch("keystoneauth1.loading.base.get_plugin_options", + return_value=opts) + def test_show_unmask(self, m_get_plugin_opts): + arglist = ['--unmask'] + verifylist = [('mask', False)] + cmd = configuration.ShowConfiguration(self.app, None) + parsed_args = self.check_parser(cmd, arglist, verifylist) + + columns, data = cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + datalist = ( + fakes.PASSWORD, + fakes.AUTH_TOKEN, + fakes.USERNAME, + fakes.VERSION, + fakes.REGION_NAME, + ) + self.assertEqual(datalist, data) + + @mock.patch("keystoneauth1.loading.base.get_plugin_options", + return_value=opts) + def test_show_mask(self, m_get_plugin_opts): + arglist = ['--mask'] + verifylist = [('mask', True)] + cmd = configuration.ShowConfiguration(self.app, None) + parsed_args = self.check_parser(cmd, arglist, verifylist) + + columns, data = cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.datalist, data) diff --git a/openstackclient/tests/unit/common/test_extension.py b/openstackclient/tests/unit/common/test_extension.py new file mode 100644 index 00000000..bf856ed1 --- /dev/null +++ b/openstackclient/tests/unit/common/test_extension.py @@ -0,0 +1,244 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from openstackclient.common import extension +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils +from openstackclient.tests.unit.volume.v2 import fakes as volume_fakes + + +class TestExtension(utils.TestCommand): + + def setUp(self): + super(TestExtension, self).setUp() + + identity_client = identity_fakes.FakeIdentityv2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.identity = identity_client + self.identity_extensions_mock = identity_client.extensions + self.identity_extensions_mock.reset_mock() + + compute_client = compute_fakes.FakeComputev2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.compute = compute_client + compute_client.list_extensions = mock.Mock() + self.compute_extensions_mock = compute_client.list_extensions + self.compute_extensions_mock.reset_mock() + + volume_client = volume_fakes.FakeVolumeClient( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.volume = volume_client + volume_client.list_extensions = mock.Mock() + self.volume_extensions_mock = volume_client.list_extensions + self.volume_extensions_mock.reset_mock() + + network_client = network_fakes.FakeNetworkV2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.network = network_client + network_client.extensions = mock.Mock() + self.network_extensions_mock = network_client.extensions + self.network_extensions_mock.reset_mock() + + +class TestExtensionList(TestExtension): + + columns = ('Name', 'Alias', 'Description') + long_columns = ('Name', 'Namespace', 'Description', 'Alias', 'Updated', + 'Links') + + volume_extension = volume_fakes.FakeExtension.create_one_extension() + identity_extension = identity_fakes.FakeExtension.create_one_extension() + compute_extension = compute_fakes.FakeExtension.create_one_extension() + network_extension = network_fakes.FakeExtension.create_one_extension() + + def setUp(self): + super(TestExtensionList, self).setUp() + + self.identity_extensions_mock.list.return_value = [ + self.identity_extension] + self.compute_extensions_mock.show_all.return_value = [ + self.compute_extension] + self.volume_extensions_mock.show_all.return_value = [ + self.volume_extension] + self.network_extensions_mock.return_value = [self.network_extension] + + # Get the command object to test + self.cmd = extension.ListExtension(self.app, None) + + def _test_extension_list_helper(self, arglist, verifylist, + expected_data, long=False): + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + if long: + self.assertEqual(self.long_columns, columns) + else: + self.assertEqual(self.columns, columns) + self.assertEqual(expected_data, tuple(data)) + + def test_extension_list_no_options(self): + arglist = [] + verifylist = [] + datalist = ( + ( + self.identity_extension.name, + self.identity_extension.alias, + self.identity_extension.description, + ), + ( + self.compute_extension.name, + self.compute_extension.alias, + self.compute_extension.description, + ), + ( + self.volume_extension.name, + self.volume_extension.alias, + self.volume_extension.description, + ), + ( + self.network_extension.name, + self.network_extension.alias, + self.network_extension.description, + ), + ) + self._test_extension_list_helper(arglist, verifylist, datalist) + self.identity_extensions_mock.list.assert_called_with() + self.compute_extensions_mock.show_all.assert_called_with() + self.volume_extensions_mock.show_all.assert_called_with() + self.network_extensions_mock.assert_called_with() + + def test_extension_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + datalist = ( + ( + self.identity_extension.name, + self.identity_extension.namespace, + self.identity_extension.description, + self.identity_extension.alias, + self.identity_extension.updated, + self.identity_extension.links, + ), + ( + self.compute_extension.name, + self.compute_extension.namespace, + self.compute_extension.description, + self.compute_extension.alias, + self.compute_extension.updated, + self.compute_extension.links, + ), + ( + self.volume_extension.name, + self.volume_extension.namespace, + self.volume_extension.description, + self.volume_extension.alias, + self.volume_extension.updated, + self.volume_extension.links, + ), + ( + self.network_extension.name, + self.network_extension.namespace, + self.network_extension.description, + self.network_extension.alias, + self.network_extension.updated, + self.network_extension.links, + ), + ) + self._test_extension_list_helper(arglist, verifylist, datalist, True) + self.identity_extensions_mock.list.assert_called_with() + self.compute_extensions_mock.show_all.assert_called_with() + self.volume_extensions_mock.show_all.assert_called_with() + self.network_extensions_mock.assert_called_with() + + def test_extension_list_identity(self): + arglist = [ + '--identity', + ] + verifylist = [ + ('identity', True), + ] + datalist = (( + self.identity_extension.name, + self.identity_extension.alias, + self.identity_extension.description, + ), ) + self._test_extension_list_helper(arglist, verifylist, datalist) + self.identity_extensions_mock.list.assert_called_with() + + def test_extension_list_network(self): + arglist = [ + '--network', + ] + verifylist = [ + ('network', True), + ] + datalist = ( + ( + self.network_extension.name, + self.network_extension.alias, + self.network_extension.description, + ), + ) + self._test_extension_list_helper(arglist, verifylist, datalist) + self.network_extensions_mock.assert_called_with() + + def test_extension_list_compute(self): + arglist = [ + '--compute', + ] + verifylist = [ + ('compute', True), + ] + datalist = (( + self.compute_extension.name, + self.compute_extension.alias, + self.compute_extension.description, + ), ) + self._test_extension_list_helper(arglist, verifylist, datalist) + self.compute_extensions_mock.show_all.assert_called_with() + + def test_extension_list_volume(self): + arglist = [ + '--volume', + ] + verifylist = [ + ('volume', True), + ] + datalist = (( + self.volume_extension.name, + self.volume_extension.alias, + self.volume_extension.description, + ), ) + self._test_extension_list_helper(arglist, verifylist, datalist) + self.volume_extensions_mock.show_all.assert_called_with() diff --git a/openstackclient/tests/unit/common/test_logs.py b/openstackclient/tests/unit/common/test_logs.py new file mode 100644 index 00000000..4842c8d4 --- /dev/null +++ b/openstackclient/tests/unit/common/test_logs.py @@ -0,0 +1,207 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# NOTE(dtroyer): This file is deprecated in Jun 2016, remove after 4.x release +# or Jun 2017. + +import logging +import mock + +from openstackclient.common import logs +from openstackclient.tests.unit import utils + + +class TestContext(utils.TestCase): + + def test_log_level_from_options(self): + opts = mock.Mock() + opts.verbose_level = 0 + self.assertEqual(logging.ERROR, logs.log_level_from_options(opts)) + opts.verbose_level = 1 + self.assertEqual(logging.WARNING, logs.log_level_from_options(opts)) + opts.verbose_level = 2 + self.assertEqual(logging.INFO, logs.log_level_from_options(opts)) + opts.verbose_level = 3 + self.assertEqual(logging.DEBUG, logs.log_level_from_options(opts)) + + def test_log_level_from_config(self): + cfg = {'verbose_level': 0} + self.assertEqual(logging.ERROR, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1} + self.assertEqual(logging.WARNING, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 2} + self.assertEqual(logging.INFO, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 3} + self.assertEqual(logging.DEBUG, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'critical'} + self.assertEqual(logging.CRITICAL, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'error'} + self.assertEqual(logging.ERROR, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'warning'} + self.assertEqual(logging.WARNING, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'info'} + self.assertEqual(logging.INFO, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'debug'} + self.assertEqual(logging.DEBUG, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'bogus'} + self.assertEqual(logging.WARNING, logs.log_level_from_config(cfg)) + cfg = {'verbose_level': 1, 'log_level': 'info', 'debug': True} + self.assertEqual(logging.DEBUG, logs.log_level_from_config(cfg)) + + @mock.patch('warnings.simplefilter') + def test_set_warning_filter(self, simplefilter): + logs.set_warning_filter(logging.ERROR) + simplefilter.assert_called_with("ignore") + logs.set_warning_filter(logging.WARNING) + simplefilter.assert_called_with("ignore") + logs.set_warning_filter(logging.INFO) + simplefilter.assert_called_with("once") + + +class TestFileFormatter(utils.TestCase): + + def test_nothing(self): + formatter = logs._FileFormatter() + self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s %(message)s'), formatter.fmt) + + def test_options(self): + class Opts(object): + cloud = 'cloudy' + os_project_name = 'projecty' + username = 'usernamey' + options = Opts() + formatter = logs._FileFormatter(options=options) + self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [cloudy usernamey projecty] %(message)s'), + formatter.fmt) + + def test_config(self): + config = mock.Mock() + config.config = {'cloud': 'cloudy'} + config.auth = {'project_name': 'projecty', 'username': 'usernamey'} + formatter = logs._FileFormatter(config=config) + self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [cloudy usernamey projecty] %(message)s'), + formatter.fmt) + + +class TestLogConfigurator(utils.TestCase): + + def setUp(self): + super(TestLogConfigurator, self).setUp() + self.options = mock.Mock() + self.options.verbose_level = 1 + self.options.log_file = None + self.options.debug = False + self.root_logger = mock.Mock() + self.root_logger.setLevel = mock.Mock() + self.root_logger.addHandler = mock.Mock() + self.requests_log = mock.Mock() + self.requests_log.setLevel = mock.Mock() + self.cliff_log = mock.Mock() + self.cliff_log.setLevel = mock.Mock() + self.stevedore_log = mock.Mock() + self.stevedore_log.setLevel = mock.Mock() + self.iso8601_log = mock.Mock() + self.iso8601_log.setLevel = mock.Mock() + self.loggers = [ + self.root_logger, + self.requests_log, + self.cliff_log, + self.stevedore_log, + self.iso8601_log] + + @mock.patch('logging.StreamHandler') + @mock.patch('logging.getLogger') + @mock.patch('osc_lib.logs.set_warning_filter') + def test_init(self, warning_filter, getLogger, handle): + getLogger.side_effect = self.loggers + console_logger = mock.Mock() + console_logger.setFormatter = mock.Mock() + console_logger.setLevel = mock.Mock() + handle.return_value = console_logger + + configurator = logs.LogConfigurator(self.options) + + getLogger.assert_called_with('iso8601') # last call + warning_filter.assert_called_with(logging.WARNING) + self.root_logger.setLevel.assert_called_with(logging.DEBUG) + self.root_logger.addHandler.assert_called_with(console_logger) + self.requests_log.setLevel.assert_called_with(logging.ERROR) + self.cliff_log.setLevel.assert_called_with(logging.ERROR) + self.stevedore_log.setLevel.assert_called_with(logging.ERROR) + self.iso8601_log.setLevel.assert_called_with(logging.ERROR) + self.assertFalse(configurator.dump_trace) + + @mock.patch('logging.getLogger') + @mock.patch('osc_lib.logs.set_warning_filter') + def test_init_no_debug(self, warning_filter, getLogger): + getLogger.side_effect = self.loggers + self.options.debug = True + + configurator = logs.LogConfigurator(self.options) + + warning_filter.assert_called_with(logging.DEBUG) + self.requests_log.setLevel.assert_called_with(logging.DEBUG) + self.assertTrue(configurator.dump_trace) + + @mock.patch('logging.FileHandler') + @mock.patch('logging.getLogger') + @mock.patch('osc_lib.logs.set_warning_filter') + @mock.patch('osc_lib.logs._FileFormatter') + def test_init_log_file(self, formatter, warning_filter, getLogger, handle): + getLogger.side_effect = self.loggers + self.options.log_file = '/tmp/log_file' + file_logger = mock.Mock() + file_logger.setFormatter = mock.Mock() + file_logger.setLevel = mock.Mock() + handle.return_value = file_logger + mock_formatter = mock.Mock() + formatter.return_value = mock_formatter + + logs.LogConfigurator(self.options) + + handle.assert_called_with(filename=self.options.log_file) + self.root_logger.addHandler.assert_called_with(file_logger) + file_logger.setFormatter.assert_called_with(mock_formatter) + file_logger.setLevel.assert_called_with(logging.WARNING) + + @mock.patch('logging.FileHandler') + @mock.patch('logging.getLogger') + @mock.patch('osc_lib.logs.set_warning_filter') + @mock.patch('osc_lib.logs._FileFormatter') + def test_configure(self, formatter, warning_filter, getLogger, handle): + getLogger.side_effect = self.loggers + configurator = logs.LogConfigurator(self.options) + cloud_config = mock.Mock() + config_log = '/tmp/config_log' + cloud_config.config = { + 'log_file': config_log, + 'verbose_level': 1, + 'log_level': 'info'} + file_logger = mock.Mock() + file_logger.setFormatter = mock.Mock() + file_logger.setLevel = mock.Mock() + handle.return_value = file_logger + mock_formatter = mock.Mock() + formatter.return_value = mock_formatter + + configurator.configure(cloud_config) + + warning_filter.assert_called_with(logging.INFO) + handle.assert_called_with(filename=config_log) + self.root_logger.addHandler.assert_called_with(file_logger) + file_logger.setFormatter.assert_called_with(mock_formatter) + file_logger.setLevel.assert_called_with(logging.INFO) + self.assertFalse(configurator.dump_trace) diff --git a/openstackclient/tests/unit/common/test_module.py b/openstackclient/tests/unit/common/test_module.py new file mode 100644 index 00000000..eb54dbe0 --- /dev/null +++ b/openstackclient/tests/unit/common/test_module.py @@ -0,0 +1,130 @@ +# Copyright 2013 Nebula Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Test module module""" + +import mock + +from openstackclient.common import module as osc_module +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit import utils + + +# NOTE(dtroyer): module_1 must match the version list filter (not --all) +# currently == '*client*' +module_name_1 = 'fakeclient' +module_version_1 = '0.1.2' +MODULE_1 = { + '__version__': module_version_1, +} + +module_name_2 = 'zlib' +module_version_2 = '1.1' +MODULE_2 = { + '__version__': module_version_2, +} + +MODULES = { + module_name_1: fakes.FakeModule(module_name_1, module_version_1), + module_name_2: fakes.FakeModule(module_name_2, module_version_2), +} + + +class TestCommandList(utils.TestCommand): + + def setUp(self): + super(TestCommandList, self).setUp() + + self.app.command_manager = mock.Mock() + self.app.command_manager.get_command_groups.return_value = [ + 'openstack.common' + ] + self.app.command_manager.get_command_names.return_value = [ + 'limits show\nextension list' + ] + + # Get the command object to test + self.cmd = osc_module.ListCommand(self.app, None) + + def test_command_list_no_options(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + # TODO(bapalm): Adjust this when cliff properly supports + # handling the detection rather than using the hard-code below. + collist = ('Command Group', 'Commands') + self.assertEqual(collist, columns) + datalist = (( + 'openstack.common', + 'limits show\nextension list' + ),) + + self.assertEqual(datalist, tuple(data)) + + +@mock.patch.dict( + 'openstackclient.common.module.sys.modules', + values=MODULES, + clear=True, +) +class TestModuleList(utils.TestCommand): + + def setUp(self): + super(TestModuleList, self).setUp() + + # Get the command object to test + self.cmd = osc_module.ListModule(self.app, None) + + def test_module_list_no_options(self): + arglist = [] + verifylist = [ + ('all', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + # Additional modules may be present, just check our additions + self.assertIn(module_name_1, columns) + self.assertIn(module_version_1, data) + + def test_module_list_all(self): + arglist = [ + '--all', + ] + verifylist = [ + ('all', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + # Additional modules may be present, just check our additions + self.assertIn(module_name_1, columns) + self.assertIn(module_name_2, columns) + self.assertIn(module_version_1, data) + self.assertIn(module_version_2, data) diff --git a/openstackclient/tests/unit/common/test_parseractions.py b/openstackclient/tests/unit/common/test_parseractions.py new file mode 100644 index 00000000..1212ad23 --- /dev/null +++ b/openstackclient/tests/unit/common/test_parseractions.py @@ -0,0 +1,221 @@ +# Copyright 2012-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# NOTE(dtroyer): This file is deprecated in Jun 2016, remove after 4.x release +# or Jun 2017. + +import argparse + +from openstackclient.common import parseractions +from openstackclient.tests.unit import utils + + +class TestKeyValueAction(utils.TestCase): + + def setUp(self): + super(TestKeyValueAction, self).setUp() + + self.parser = argparse.ArgumentParser() + + # Set up our typical usage + self.parser.add_argument( + '--property', + metavar='<key=value>', + action=parseractions.KeyValueAction, + default={'green': '20%', 'format': '#rgb'}, + help='Property to store for this volume ' + '(repeat option to set multiple properties)', + ) + + def test_good_values(self): + results = self.parser.parse_args([ + '--property', 'red=', + '--property', 'green=100%', + '--property', 'blue=50%', + ]) + + actual = getattr(results, 'property', {}) + # All should pass through unmolested + expect = {'red': '', 'green': '100%', 'blue': '50%', 'format': '#rgb'} + self.assertEqual(expect, actual) + + def test_error_values(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + [ + '--property', 'red', + ] + ) + + +class TestMultiKeyValueAction(utils.TestCase): + + def setUp(self): + super(TestMultiKeyValueAction, self).setUp() + + self.parser = argparse.ArgumentParser() + + # Set up our typical usage + self.parser.add_argument( + '--test', + metavar='req1=xxx,req2=yyy', + action=parseractions.MultiKeyValueAction, + dest='test', + default=None, + required_keys=['req1', 'req2'], + optional_keys=['opt1', 'opt2'], + help='Test' + ) + + def test_good_values(self): + results = self.parser.parse_args([ + '--test', 'req1=aaa,req2=bbb', + '--test', 'req1=,req2=', + ]) + + actual = getattr(results, 'test', []) + expect = [ + {'req1': 'aaa', 'req2': 'bbb'}, + {'req1': '', 'req2': ''}, + ] + self.assertItemsEqual(expect, actual) + + def test_empty_required_optional(self): + self.parser.add_argument( + '--test-empty', + metavar='req1=xxx,req2=yyy', + action=parseractions.MultiKeyValueAction, + dest='test_empty', + default=None, + required_keys=[], + optional_keys=[], + help='Test' + ) + + results = self.parser.parse_args([ + '--test-empty', 'req1=aaa,req2=bbb', + '--test-empty', 'req1=,req2=', + ]) + + actual = getattr(results, 'test_empty', []) + expect = [ + {'req1': 'aaa', 'req2': 'bbb'}, + {'req1': '', 'req2': ''}, + ] + self.assertItemsEqual(expect, actual) + + def test_error_values_with_comma(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + [ + '--test', 'mmm,nnn=zzz', + ] + ) + + def test_error_values_without_comma(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + [ + '--test', 'mmmnnn', + ] + ) + + def test_missing_key(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + [ + '--test', 'req2=ddd', + ] + ) + + def test_invalid_key(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + [ + '--test', 'req1=aaa,req2=bbb,aaa=req1', + ] + ) + + def test_required_keys_not_list(self): + self.assertRaises( + TypeError, + self.parser.add_argument, + '--test-required-dict', + metavar='req1=xxx,req2=yyy', + action=parseractions.MultiKeyValueAction, + dest='test_required_dict', + default=None, + required_keys={'aaa': 'bbb'}, + optional_keys=['opt1', 'opt2'], + help='Test' + ) + + def test_optional_keys_not_list(self): + self.assertRaises( + TypeError, + self.parser.add_argument, + '--test-optional-dict', + metavar='req1=xxx,req2=yyy', + action=parseractions.MultiKeyValueAction, + dest='test_optional_dict', + default=None, + required_keys=['req1', 'req2'], + optional_keys={'aaa': 'bbb'}, + help='Test' + ) + + +class TestNonNegativeAction(utils.TestCase): + + def setUp(self): + super(TestNonNegativeAction, self).setUp() + + self.parser = argparse.ArgumentParser() + + # Set up our typical usage + self.parser.add_argument( + '--foo', + metavar='<foo>', + type=int, + action=parseractions.NonNegativeAction, + ) + + def test_negative_values(self): + self.assertRaises( + argparse.ArgumentTypeError, + self.parser.parse_args, + "--foo -1".split() + ) + + def test_zero_values(self): + results = self.parser.parse_args( + '--foo 0'.split() + ) + + actual = getattr(results, 'foo', None) + self.assertEqual(actual, 0) + + def test_positive_values(self): + results = self.parser.parse_args( + '--foo 1'.split() + ) + + actual = getattr(results, 'foo', None) + self.assertEqual(actual, 1) diff --git a/openstackclient/tests/unit/common/test_quota.py b/openstackclient/tests/unit/common/test_quota.py new file mode 100644 index 00000000..4a80a2b2 --- /dev/null +++ b/openstackclient/tests/unit/common/test_quota.py @@ -0,0 +1,381 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import mock + +from openstackclient.common import quota +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes + + +class FakeQuotaResource(fakes.FakeResource): + + _keys = {'property': 'value'} + + def set_keys(self, args): + self._keys.update(args) + + def unset_keys(self, keys): + for key in keys: + self._keys.pop(key, None) + + def get_keys(self): + return self._keys + + +class TestQuota(compute_fakes.TestComputev2): + + def setUp(self): + super(TestQuota, self).setUp() + self.quotas_mock = self.app.client_manager.compute.quotas + self.quotas_mock.reset_mock() + self.quotas_class_mock = self.app.client_manager.compute.quota_classes + self.quotas_class_mock.reset_mock() + volume_mock = mock.Mock() + volume_mock.quotas = mock.Mock() + self.app.client_manager.volume = volume_mock + self.volume_quotas_mock = volume_mock.quotas + self.volume_quotas_mock.reset_mock() + self.volume_quotas_class_mock = \ + self.app.client_manager.volume.quota_classes + self.volume_quotas_class_mock.reset_mock() + self.projects_mock = self.app.client_manager.identity.projects + self.projects_mock.reset_mock() + self.app.client_manager.auth_ref = mock.Mock() + self.app.client_manager.auth_ref.service_catalog = mock.Mock() + self.service_catalog_mock = \ + self.app.client_manager.auth_ref.service_catalog + self.service_catalog_mock.reset_mock() + self.app.client_manager.auth_ref.project_id = identity_fakes.project_id + + +class TestQuotaSet(TestQuota): + + def setUp(self): + super(TestQuotaSet, self).setUp() + + self.quotas_mock.find.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.quotas_mock.update.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.volume_quotas_mock.find.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.volume_quotas_mock.update.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.projects_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.PROJECT), + loaded=True, + ) + + self.network_mock = self.app.client_manager.network + self.network_mock.update_quota = mock.Mock() + + self.cmd = quota.SetQuota(self.app, None) + + def test_quota_set(self): + arglist = [ + '--floating-ips', str(compute_fakes.floating_ip_num), + '--fixed-ips', str(compute_fakes.fix_ip_num), + '--injected-files', str(compute_fakes.injected_file_num), + '--injected-file-size', str(compute_fakes.injected_file_size_num), + '--injected-path-size', str(compute_fakes.injected_path_size_num), + '--key-pairs', str(compute_fakes.key_pair_num), + '--cores', str(compute_fakes.core_num), + '--ram', str(compute_fakes.ram_num), + '--instances', str(compute_fakes.instance_num), + '--properties', str(compute_fakes.property_num), + '--secgroup-rules', str(compute_fakes.secgroup_rule_num), + '--secgroups', str(compute_fakes.secgroup_num), + '--server-groups', str(compute_fakes.servgroup_num), + '--server-group-members', str(compute_fakes.servgroup_members_num), + identity_fakes.project_name, + ] + verifylist = [ + ('floating_ips', compute_fakes.floating_ip_num), + ('fixed_ips', compute_fakes.fix_ip_num), + ('injected_files', compute_fakes.injected_file_num), + ('injected_file_content_bytes', + compute_fakes.injected_file_size_num), + ('injected_file_path_bytes', compute_fakes.injected_path_size_num), + ('key_pairs', compute_fakes.key_pair_num), + ('cores', compute_fakes.core_num), + ('ram', compute_fakes.ram_num), + ('instances', compute_fakes.instance_num), + ('metadata_items', compute_fakes.property_num), + ('security_group_rules', compute_fakes.secgroup_rule_num), + ('security_groups', compute_fakes.secgroup_num), + ('server_groups', compute_fakes.servgroup_num), + ('server_group_members', compute_fakes.servgroup_members_num), + ('project', identity_fakes.project_name), + ] + + self.app.client_manager.network_endpoint_enabled = False + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + kwargs = { + 'floating_ips': compute_fakes.floating_ip_num, + 'fixed_ips': compute_fakes.fix_ip_num, + 'injected_files': compute_fakes.injected_file_num, + 'injected_file_content_bytes': + compute_fakes.injected_file_size_num, + 'injected_file_path_bytes': compute_fakes.injected_path_size_num, + 'key_pairs': compute_fakes.key_pair_num, + 'cores': compute_fakes.core_num, + 'ram': compute_fakes.ram_num, + 'instances': compute_fakes.instance_num, + 'metadata_items': compute_fakes.property_num, + 'security_group_rules': compute_fakes.secgroup_rule_num, + 'security_groups': compute_fakes.secgroup_num, + 'server_groups': compute_fakes.servgroup_num, + 'server_group_members': compute_fakes.servgroup_members_num, + } + + self.quotas_mock.update.assert_called_with( + identity_fakes.project_id, + **kwargs + ) + + def test_quota_set_volume(self): + arglist = [ + '--gigabytes', str(compute_fakes.floating_ip_num), + '--snapshots', str(compute_fakes.fix_ip_num), + '--volumes', str(compute_fakes.injected_file_num), + identity_fakes.project_name, + ] + verifylist = [ + ('gigabytes', compute_fakes.floating_ip_num), + ('snapshots', compute_fakes.fix_ip_num), + ('volumes', compute_fakes.injected_file_num), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + kwargs = { + 'gigabytes': compute_fakes.floating_ip_num, + 'snapshots': compute_fakes.fix_ip_num, + 'volumes': compute_fakes.injected_file_num, + } + + self.volume_quotas_mock.update.assert_called_with( + identity_fakes.project_id, + **kwargs + ) + + def test_quota_set_network(self): + arglist = [ + '--subnets', str(network_fakes.QUOTA['subnet']), + '--networks', str(network_fakes.QUOTA['network']), + '--floating-ips', str(network_fakes.QUOTA['floatingip']), + '--subnetpools', str(network_fakes.QUOTA['subnetpool']), + '--secgroup-rules', + str(network_fakes.QUOTA['security_group_rule']), + '--secgroups', str(network_fakes.QUOTA['security_group']), + '--routers', str(network_fakes.QUOTA['router']), + '--rbac-policies', str(network_fakes.QUOTA['rbac_policy']), + '--ports', str(network_fakes.QUOTA['port']), + '--vips', str(network_fakes.QUOTA['vip']), + '--members', str(network_fakes.QUOTA['member']), + '--health-monitors', str(network_fakes.QUOTA['health_monitor']), + identity_fakes.project_name, + ] + verifylist = [ + ('subnet', network_fakes.QUOTA['subnet']), + ('network', network_fakes.QUOTA['network']), + ('floatingip', network_fakes.QUOTA['floatingip']), + ('subnetpool', network_fakes.QUOTA['subnetpool']), + ('security_group_rule', + network_fakes.QUOTA['security_group_rule']), + ('security_group', network_fakes.QUOTA['security_group']), + ('router', network_fakes.QUOTA['router']), + ('rbac_policy', network_fakes.QUOTA['rbac_policy']), + ('port', network_fakes.QUOTA['port']), + ('vip', network_fakes.QUOTA['vip']), + ('member', network_fakes.QUOTA['member']), + ('health_monitor', network_fakes.QUOTA['health_monitor']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + kwargs = { + 'subnet': network_fakes.QUOTA['subnet'], + 'network': network_fakes.QUOTA['network'], + 'floatingip': network_fakes.QUOTA['floatingip'], + 'subnetpool': network_fakes.QUOTA['subnetpool'], + 'security_group_rule': + network_fakes.QUOTA['security_group_rule'], + 'security_group': network_fakes.QUOTA['security_group'], + 'router': network_fakes.QUOTA['router'], + 'rbac_policy': network_fakes.QUOTA['rbac_policy'], + 'port': network_fakes.QUOTA['port'], + 'vip': network_fakes.QUOTA['vip'], + 'member': network_fakes.QUOTA['member'], + 'health_monitor': network_fakes.QUOTA['health_monitor'], + } + self.network_mock.update_quota.assert_called_with( + identity_fakes.project_id, + **kwargs + ) + + +class TestQuotaShow(TestQuota): + + def setUp(self): + super(TestQuotaShow, self).setUp() + + self.quotas_mock.get.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.quotas_mock.defaults.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.volume_quotas_mock.get.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.volume_quotas_mock.defaults.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + fake_network_endpoint = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.ENDPOINT), + loaded=True, + ) + + self.service_catalog_mock.get_endpoints.return_value = { + 'network': fake_network_endpoint + } + + self.quotas_class_mock.get.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.volume_quotas_class_mock.get.return_value = FakeQuotaResource( + None, + copy.deepcopy(compute_fakes.QUOTA), + loaded=True, + ) + + self.projects_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(identity_fakes.PROJECT), + loaded=True, + ) + + self.app.client_manager.network = network_fakes.FakeNetworkV2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.network = self.app.client_manager.network + self.network.get_quota = mock.Mock(return_value=network_fakes.QUOTA) + + self.cmd = quota.ShowQuota(self.app, None) + + def test_quota_show(self): + arglist = [ + identity_fakes.project_name, + ] + verifylist = [ + ('project', identity_fakes.project_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.quotas_mock.get.assert_called_with(identity_fakes.project_id) + self.volume_quotas_mock.get.assert_called_with( + identity_fakes.project_id) + self.network.get_quota.assert_called_with(identity_fakes.project_id) + + def test_quota_show_with_default(self): + arglist = [ + '--default', + identity_fakes.project_name, + ] + verifylist = [ + ('default', True), + ('project', identity_fakes.project_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.quotas_mock.defaults.assert_called_with(identity_fakes.project_id) + self.volume_quotas_mock.defaults.assert_called_with( + identity_fakes.project_id) + + def test_quota_show_with_class(self): + arglist = [ + '--class', + identity_fakes.project_name, + ] + verifylist = [ + ('quota_class', True), + ('project', identity_fakes.project_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.quotas_class_mock.get.assert_called_with( + identity_fakes.project_id) + self.volume_quotas_class_mock.get.assert_called_with( + identity_fakes.project_id) + + def test_quota_show_no_project(self): + parsed_args = self.check_parser(self.cmd, [], []) + + self.cmd.take_action(parsed_args) + + self.quotas_mock.get.assert_called_with(identity_fakes.project_id) + self.volume_quotas_mock.get.assert_called_with( + identity_fakes.project_id) + self.network.get_quota.assert_called_with(identity_fakes.project_id) |
