summaryrefslogtreecommitdiff
path: root/openstackclient/tests/network
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests/network')
-rw-r--r--openstackclient/tests/network/__init__.py0
-rw-r--r--openstackclient/tests/network/test_common.py174
-rw-r--r--openstackclient/tests/network/v2/__init__.py0
-rw-r--r--openstackclient/tests/network/v2/fakes.py1100
-rw-r--r--openstackclient/tests/network/v2/test_address_scope.py399
-rw-r--r--openstackclient/tests/network/v2/test_floating_ip.py565
-rw-r--r--openstackclient/tests/network/v2/test_floating_ip_pool.py97
-rw-r--r--openstackclient/tests/network/v2/test_ip_availability.py172
-rw-r--r--openstackclient/tests/network/v2/test_network.py1059
-rw-r--r--openstackclient/tests/network/v2/test_network_agent.py294
-rw-r--r--openstackclient/tests/network/v2/test_network_rbac.py430
-rw-r--r--openstackclient/tests/network/v2/test_network_segment.py200
-rw-r--r--openstackclient/tests/network/v2/test_port.py696
-rw-r--r--openstackclient/tests/network/v2/test_router.py751
-rw-r--r--openstackclient/tests/network/v2/test_security_group.py770
-rw-r--r--openstackclient/tests/network/v2/test_security_group_rule.py1177
-rw-r--r--openstackclient/tests/network/v2/test_subnet.py978
-rw-r--r--openstackclient/tests/network/v2/test_subnet_pool.py722
18 files changed, 0 insertions, 9584 deletions
diff --git a/openstackclient/tests/network/__init__.py b/openstackclient/tests/network/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/openstackclient/tests/network/__init__.py
+++ /dev/null
diff --git a/openstackclient/tests/network/test_common.py b/openstackclient/tests/network/test_common.py
deleted file mode 100644
index 48608734..00000000
--- a/openstackclient/tests/network/test_common.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import argparse
-import mock
-
-from openstackclient.network import common
-from openstackclient.tests import utils
-
-
-def _add_common_argument(parser):
- parser.add_argument(
- 'common',
- metavar='<common>',
- help='Common argument',
- )
- return parser
-
-
-def _add_network_argument(parser):
- parser.add_argument(
- 'network',
- metavar='<network>',
- help='Network argument',
- )
- return parser
-
-
-def _add_compute_argument(parser):
- parser.add_argument(
- 'compute',
- metavar='<compute>',
- help='Compute argument',
- )
- return parser
-
-
-class FakeNetworkAndComputeCommand(common.NetworkAndComputeCommand):
-
- def update_parser_common(self, parser):
- return _add_common_argument(parser)
-
- def update_parser_network(self, parser):
- return _add_network_argument(parser)
-
- def update_parser_compute(self, parser):
- return _add_compute_argument(parser)
-
- def take_action_network(self, client, parsed_args):
- return client.network_action(parsed_args)
-
- def take_action_compute(self, client, parsed_args):
- return client.compute_action(parsed_args)
-
-
-class FakeNetworkAndComputeLister(common.NetworkAndComputeLister):
-
- def update_parser_common(self, parser):
- return _add_common_argument(parser)
-
- def update_parser_network(self, parser):
- return _add_network_argument(parser)
-
- def update_parser_compute(self, parser):
- return _add_compute_argument(parser)
-
- def take_action_network(self, client, parsed_args):
- return client.network_action(parsed_args)
-
- def take_action_compute(self, client, parsed_args):
- return client.compute_action(parsed_args)
-
-
-class FakeNetworkAndComputeShowOne(common.NetworkAndComputeShowOne):
-
- def update_parser_common(self, parser):
- return _add_common_argument(parser)
-
- def update_parser_network(self, parser):
- return _add_network_argument(parser)
-
- def update_parser_compute(self, parser):
- return _add_compute_argument(parser)
-
- def take_action_network(self, client, parsed_args):
- return client.network_action(parsed_args)
-
- def take_action_compute(self, client, parsed_args):
- return client.compute_action(parsed_args)
-
-
-class TestNetworkAndCompute(utils.TestCommand):
-
- def setUp(self):
- super(TestNetworkAndCompute, self).setUp()
-
- self.namespace = argparse.Namespace()
-
- # Create network client mocks.
- self.app.client_manager.network = mock.Mock()
- self.network = self.app.client_manager.network
- self.network.network_action = mock.Mock(
- return_value='take_action_network')
-
- # Create compute client mocks.
- self.app.client_manager.compute = mock.Mock()
- self.compute = self.app.client_manager.compute
- self.compute.compute_action = mock.Mock(
- return_value='take_action_compute')
-
- # Subclasses can override the command object to test.
- self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace)
-
- def test_take_action_network(self):
- arglist = [
- 'common',
- 'network'
- ]
- verifylist = [
- ('common', 'common'),
- ('network', 'network')
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- self.network.network_action.assert_called_with(parsed_args)
- self.assertEqual('take_action_network', result)
-
- def test_take_action_compute(self):
- arglist = [
- 'common',
- 'compute'
- ]
- verifylist = [
- ('common', 'common'),
- ('compute', 'compute')
- ]
-
- self.app.client_manager.network_endpoint_enabled = False
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- self.compute.compute_action.assert_called_with(parsed_args)
- self.assertEqual('take_action_compute', result)
-
-
-class TestNetworkAndComputeCommand(TestNetworkAndCompute):
-
- def setUp(self):
- super(TestNetworkAndComputeCommand, self).setUp()
- self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace)
-
-
-class TestNetworkAndComputeLister(TestNetworkAndCompute):
-
- def setUp(self):
- super(TestNetworkAndComputeLister, self).setUp()
- self.cmd = FakeNetworkAndComputeLister(self.app, self.namespace)
-
-
-class TestNetworkAndComputeShowOne(TestNetworkAndCompute):
-
- def setUp(self):
- super(TestNetworkAndComputeShowOne, self).setUp()
- self.cmd = FakeNetworkAndComputeShowOne(self.app, self.namespace)
diff --git a/openstackclient/tests/network/v2/__init__.py b/openstackclient/tests/network/v2/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/openstackclient/tests/network/v2/__init__.py
+++ /dev/null
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
deleted file mode 100644
index 33bc4017..00000000
--- a/openstackclient/tests/network/v2/fakes.py
+++ /dev/null
@@ -1,1100 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import argparse
-import copy
-import mock
-import uuid
-
-from openstackclient.tests import fakes
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests import utils
-
-
-QUOTA = {
- "subnet": 10,
- "network": 10,
- "floatingip": 50,
- "subnetpool": -1,
- "security_group_rule": 100,
- "security_group": 10,
- "router": 10,
- "rbac_policy": -1,
- "port": 50,
- "vip": 10,
- "member": 10,
- "health_monitor": 10,
-}
-
-
-class FakeNetworkV2Client(object):
-
- def __init__(self, **kwargs):
- self.extensions = mock.Mock()
- self.extensions.resource_class = fakes.FakeResource(None, {})
-
-
-class TestNetworkV2(utils.TestCommand):
-
- def setUp(self):
- super(TestNetworkV2, self).setUp()
-
- self.namespace = argparse.Namespace()
-
- self.app.client_manager.session = mock.Mock()
-
- self.app.client_manager.network = FakeNetworkV2Client(
- endpoint=fakes.AUTH_URL,
- token=fakes.AUTH_TOKEN,
- )
-
- self.app.client_manager.identity = (
- identity_fakes_v3.FakeIdentityv3Client(
- endpoint=fakes.AUTH_URL,
- token=fakes.AUTH_TOKEN,
- )
- )
-
-
-class FakeAddressScope(object):
- """Fake one or more address scopes."""
-
- @staticmethod
- def create_one_address_scope(attrs=None):
- """Create a fake address scope.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object with name, id, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- address_scope_attrs = {
- 'name': 'address-scope-name-' + uuid.uuid4().hex,
- 'id': 'address-scope-id-' + uuid.uuid4().hex,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'shared': False,
- 'ip_version': 4,
- }
-
- # Overwrite default attributes.
- address_scope_attrs.update(attrs)
-
- address_scope = fakes.FakeResource(
- info=copy.deepcopy(address_scope_attrs),
- loaded=True)
-
- # Set attributes with special mapping in OpenStack SDK.
- address_scope.project_id = address_scope_attrs['tenant_id']
-
- return address_scope
-
- @staticmethod
- def create_address_scopes(attrs=None, count=2):
- """Create multiple fake address scopes.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of address scopes to fake
- :return:
- A list of FakeResource objects faking the address scopes
- """
- address_scopes = []
- for i in range(0, count):
- address_scopes.append(
- FakeAddressScope.create_one_address_scope(attrs))
-
- return address_scopes
-
- @staticmethod
- def get_address_scopes(address_scopes=None, count=2):
- """Get an iterable MagicMock object with a list of faked address scopes.
-
- If address scopes list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List address scopes:
- A list of FakeResource objects faking address scopes
- :param int count:
- The number of address scopes to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- address scopes
- """
- if address_scopes is None:
- address_scopes = FakeAddressScope.create_address_scopes(count)
- return mock.MagicMock(side_effect=address_scopes)
-
-
-class FakeAvailabilityZone(object):
- """Fake one or more network availability zones (AZs)."""
-
- @staticmethod
- def create_one_availability_zone(attrs=None):
- """Create a fake AZ.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object with name, state, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- availability_zone = {
- 'name': uuid.uuid4().hex,
- 'state': 'available',
- 'resource': 'network',
- }
-
- # Overwrite default attributes.
- availability_zone.update(attrs)
-
- availability_zone = fakes.FakeResource(
- info=copy.deepcopy(availability_zone),
- loaded=True)
- return availability_zone
-
- @staticmethod
- def create_availability_zones(attrs=None, count=2):
- """Create multiple fake AZs.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of AZs to fake
- :return:
- A list of FakeResource objects faking the AZs
- """
- availability_zones = []
- for i in range(0, count):
- availability_zone = \
- FakeAvailabilityZone.create_one_availability_zone(attrs)
- availability_zones.append(availability_zone)
-
- return availability_zones
-
-
-class FakeIPAvailability(object):
- """Fake one or more network ip availabilities."""
-
- @staticmethod
- def create_one_ip_availability():
- """Create a fake list with ip availability stats of a network.
-
- :return:
- A FakeResource object with network_name, network_id, etc.
- """
-
- # Set default attributes.
- network_ip_availability = {
- 'network_id': 'network-id-' + uuid.uuid4().hex,
- 'network_name': 'network-name-' + uuid.uuid4().hex,
- 'tenant_id': '',
- 'subnet_ip_availability': [],
- 'total_ips': 254,
- 'used_ips': 6,
- }
-
- network_ip_availability = fakes.FakeResource(
- info=copy.deepcopy(network_ip_availability),
- loaded=True)
- return network_ip_availability
-
- @staticmethod
- def create_ip_availability(count=2):
- """Create fake list of ip availability stats of multiple networks.
-
- :param int count:
- The number of networks to fake
- :return:
- A list of FakeResource objects faking network ip availability stats
- """
- network_ip_availabilities = []
- for i in range(0, count):
- network_ip_availability = \
- FakeIPAvailability.create_one_ip_availability()
- network_ip_availabilities.append(network_ip_availability)
-
- return network_ip_availabilities
-
-
-class FakeExtension(object):
- """Fake one or more extension."""
-
- @staticmethod
- def create_one_extension(attrs=None):
- """Create a fake extension.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object with name, namespace, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- extension_info = {
- 'name': 'name-' + uuid.uuid4().hex,
- 'namespace': 'http://docs.openstack.org/network/',
- 'description': 'description-' + uuid.uuid4().hex,
- 'updated': '2013-07-09T12:00:0-00:00',
- 'alias': 'Dystopian',
- 'links': '[{"href":''"https://github.com/os/network", "type"}]',
- }
-
- # Overwrite default attributes.
- extension_info.update(attrs)
-
- extension = fakes.FakeResource(
- info=copy.deepcopy(extension_info),
- loaded=True)
- return extension
-
-
-class FakeNetwork(object):
- """Fake one or more networks."""
-
- @staticmethod
- def create_one_network(attrs=None):
- """Create a fake network.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, name, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- network_attrs = {
- 'id': 'network-id-' + uuid.uuid4().hex,
- 'name': 'network-name-' + uuid.uuid4().hex,
- 'status': 'ACTIVE',
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'admin_state_up': True,
- 'shared': False,
- 'subnets': ['a', 'b'],
- 'provider_network_type': 'vlan',
- 'router:external': True,
- 'availability_zones': [],
- 'availability_zone_hints': [],
- 'is_default': False,
- 'port_security_enabled': True,
- }
-
- # Overwrite default attributes.
- network_attrs.update(attrs)
-
- network = fakes.FakeResource(info=copy.deepcopy(network_attrs),
- loaded=True)
-
- # Set attributes with special mapping in OpenStack SDK.
- network.project_id = network_attrs['tenant_id']
- network.is_router_external = network_attrs['router:external']
- network.is_port_security_enabled = \
- network_attrs['port_security_enabled']
-
- return network
-
- @staticmethod
- def create_networks(attrs=None, count=2):
- """Create multiple fake networks.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of networks to fake
- :return:
- A list of FakeResource objects faking the networks
- """
- networks = []
- for i in range(0, count):
- networks.append(FakeNetwork.create_one_network(attrs))
-
- return networks
-
- @staticmethod
- def get_networks(networks=None, count=2):
- """Get an iterable MagicMock object with a list of faked networks.
-
- If networks list is provided, then initialize the Mock object with the
- list. Otherwise create one.
-
- :param List networks:
- A list of FakeResource objects faking networks
- :param int count:
- The number of networks to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- networks
- """
- if networks is None:
- networks = FakeNetwork.create_networks(count)
- return mock.MagicMock(side_effect=networks)
-
-
-class FakeNetworkSegment(object):
- """Fake one or more network segments."""
-
- @staticmethod
- def create_one_network_segment(attrs=None):
- """Create a fake network segment.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object faking the network segment
- """
- attrs = attrs or {}
-
- # Set default attributes.
- network_segment_attrs = {
- 'id': 'network-segment-id-' + uuid.uuid4().hex,
- 'network_id': 'network-id-' + uuid.uuid4().hex,
- 'network_type': 'vlan',
- 'physical_network': 'physical-network-name-' + uuid.uuid4().hex,
- 'segmentation_id': 1024,
- }
-
- # Overwrite default attributes.
- network_segment_attrs.update(attrs)
-
- network_segment = fakes.FakeResource(
- info=copy.deepcopy(network_segment_attrs),
- loaded=True
- )
-
- return network_segment
-
- @staticmethod
- def create_network_segments(attrs=None, count=2):
- """Create multiple fake network segments.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of network segments to fake
- :return:
- A list of FakeResource objects faking the network segments
- """
- network_segments = []
- for i in range(0, count):
- network_segments.append(
- FakeNetworkSegment.create_one_network_segment(attrs)
- )
- return network_segments
-
-
-class FakePort(object):
- """Fake one or more ports."""
-
- @staticmethod
- def create_one_port(attrs=None):
- """Create a fake port.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, name, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- port_attrs = {
- 'admin_state_up': True,
- 'allowed_address_pairs': [{}],
- 'binding:host_id': 'binding-host-id-' + uuid.uuid4().hex,
- 'binding:profile': {},
- 'binding:vif_details': {},
- 'binding:vif_type': 'ovs',
- 'binding:vnic_type': 'normal',
- 'device_id': 'device-id-' + uuid.uuid4().hex,
- 'device_owner': 'compute:nova',
- 'dns_assignment': [{}],
- 'dns_name': 'dns-name-' + uuid.uuid4().hex,
- 'extra_dhcp_opts': [{}],
- 'fixed_ips': [{}],
- 'id': 'port-id-' + uuid.uuid4().hex,
- 'mac_address': 'fa:16:3e:a9:4e:72',
- 'name': 'port-name-' + uuid.uuid4().hex,
- 'network_id': 'network-id-' + uuid.uuid4().hex,
- 'port_security_enabled': True,
- 'security_groups': [],
- 'status': 'ACTIVE',
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- }
-
- # Overwrite default attributes.
- port_attrs.update(attrs)
-
- port = fakes.FakeResource(info=copy.deepcopy(port_attrs),
- loaded=True)
-
- # Set attributes with special mappings in OpenStack SDK.
- port.project_id = port_attrs['tenant_id']
- port.binding_host_id = port_attrs['binding:host_id']
- port.binding_profile = port_attrs['binding:profile']
- port.binding_vif_details = port_attrs['binding:vif_details']
- port.binding_vif_type = port_attrs['binding:vif_type']
- port.binding_vnic_type = port_attrs['binding:vnic_type']
-
- return port
-
- @staticmethod
- def create_ports(attrs=None, count=2):
- """Create multiple fake ports.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of ports to fake
- :return:
- A list of FakeResource objects faking the ports
- """
- ports = []
- for i in range(0, count):
- ports.append(FakePort.create_one_port(attrs))
-
- return ports
-
- @staticmethod
- def get_ports(ports=None, count=2):
- """Get an iterable MagicMock object with a list of faked ports.
-
- If ports list is provided, then initialize the Mock object with the
- list. Otherwise create one.
-
- :param List ports:
- A list of FakeResource objects faking ports
- :param int count:
- The number of ports to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- ports
- """
- if ports is None:
- ports = FakePort.create_ports(count)
- return mock.MagicMock(side_effect=ports)
-
-
-class FakeNetworkAgent(object):
- """Fake one or more network agents."""
-
- @staticmethod
- def create_one_network_agent(attrs=None):
- """Create a fake network agent
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, agent_type, and so on.
- """
- attrs = attrs or {}
-
- # Set default attributes
- agent_attrs = {
- 'id': 'agent-id-' + uuid.uuid4().hex,
- 'agent_type': 'agent-type-' + uuid.uuid4().hex,
- 'host': 'host-' + uuid.uuid4().hex,
- 'availability_zone': 'zone-' + uuid.uuid4().hex,
- 'alive': True,
- 'admin_state_up': True,
- 'binary': 'binary-' + uuid.uuid4().hex,
- 'configurations': {'subnet': 2, 'networks': 1},
- }
- agent_attrs.update(attrs)
- agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs),
- loaded=True)
- return agent
-
- @staticmethod
- def create_network_agents(attrs=None, count=2):
- """Create multiple fake network agents.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of network agents to fake
- :return:
- A list of FakeResource objects faking the network agents
- """
- agents = []
- for i in range(0, count):
- agents.append(FakeNetworkAgent.create_one_network_agent(attrs))
-
- return agents
-
- @staticmethod
- def get_network_agents(agents=None, count=2):
- """Get an iterable MagicMock object with a list of faked network agents.
-
- If network agents list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List agents:
- A list of FakeResource objects faking network agents
- :param int count:
- The number of network agents to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- network agents
- """
- if agents is None:
- agents = FakeNetworkAgent.create_network_agents(count)
- return mock.MagicMock(side_effect=agents)
-
-
-class FakeNetworkRBAC(object):
- """Fake one or more network rbac policies."""
-
- @staticmethod
- def create_one_network_rbac(attrs=None):
- """Create a fake network rbac
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, action, target_tenant,
- tenant_id, type
- """
- attrs = attrs or {}
-
- # Set default attributes
- rbac_attrs = {
- 'id': 'rbac-id-' + uuid.uuid4().hex,
- 'object_type': 'network',
- 'object_id': 'object-id-' + uuid.uuid4().hex,
- 'action': 'access_as_shared',
- 'target_tenant': 'target-tenant-' + uuid.uuid4().hex,
- 'tenant_id': 'tenant-id-' + uuid.uuid4().hex,
- }
- rbac_attrs.update(attrs)
- rbac = fakes.FakeResource(info=copy.deepcopy(rbac_attrs),
- loaded=True)
- # Set attributes with special mapping in OpenStack SDK.
- rbac.project_id = rbac_attrs['tenant_id']
- rbac.target_project_id = rbac_attrs['target_tenant']
- return rbac
-
- @staticmethod
- def create_network_rbacs(attrs=None, count=2):
- """Create multiple fake network rbac policies.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of rbac policies to fake
- :return:
- A list of FakeResource objects faking the rbac policies
- """
- rbac_policies = []
- for i in range(0, count):
- rbac_policies.append(FakeNetworkRBAC.
- create_one_network_rbac(attrs))
-
- return rbac_policies
-
- @staticmethod
- def get_network_rbacs(rbac_policies=None, count=2):
- """Get an iterable MagicMock object with a list of faked rbac policies.
-
- If rbac policies list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List rbac_policies:
- A list of FakeResource objects faking rbac policies
- :param int count:
- The number of rbac policies to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- rbac policies
- """
- if rbac_policies is None:
- rbac_policies = FakeNetworkRBAC.create_network_rbacs(count)
- return mock.MagicMock(side_effect=rbac_policies)
-
-
-class FakeRouter(object):
- """Fake one or more routers."""
-
- @staticmethod
- def create_one_router(attrs=None):
- """Create a fake router.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, name, admin_state_up,
- status, tenant_id
- """
- attrs = attrs or {}
-
- # Set default attributes.
- router_attrs = {
- 'id': 'router-id-' + uuid.uuid4().hex,
- 'name': 'router-name-' + uuid.uuid4().hex,
- 'status': 'ACTIVE',
- 'admin_state_up': True,
- 'distributed': False,
- 'ha': False,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'routes': [],
- 'external_gateway_info': {},
- 'availability_zone_hints': [],
- 'availability_zones': [],
- }
-
- # Overwrite default attributes.
- router_attrs.update(attrs)
-
- router = fakes.FakeResource(info=copy.deepcopy(router_attrs),
- loaded=True)
-
- # Set attributes with special mapping in OpenStack SDK.
- router.project_id = router_attrs['tenant_id']
-
- return router
-
- @staticmethod
- def create_routers(attrs=None, count=2):
- """Create multiple fake routers.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of routers to fake
- :return:
- A list of FakeResource objects faking the routers
- """
- routers = []
- for i in range(0, count):
- routers.append(FakeRouter.create_one_router(attrs))
-
- return routers
-
- @staticmethod
- def get_routers(routers=None, count=2):
- """Get an iterable MagicMock object with a list of faked routers.
-
- If routers list is provided, then initialize the Mock object with the
- list. Otherwise create one.
-
- :param List routers:
- A list of FakeResource objects faking routers
- :param int count:
- The number of routers to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- routers
- """
- if routers is None:
- routers = FakeRouter.create_routers(count)
- return mock.MagicMock(side_effect=routers)
-
-
-class FakeSecurityGroup(object):
- """Fake one or more security groups."""
-
- @staticmethod
- def create_one_security_group(attrs=None):
- """Create a fake security group.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, name, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- security_group_attrs = {
- 'id': 'security-group-id-' + uuid.uuid4().hex,
- 'name': 'security-group-name-' + uuid.uuid4().hex,
- 'description': 'security-group-description-' + uuid.uuid4().hex,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'security_group_rules': [],
- }
-
- # Overwrite default attributes.
- security_group_attrs.update(attrs)
-
- security_group = fakes.FakeResource(
- info=copy.deepcopy(security_group_attrs),
- loaded=True)
-
- # Set attributes with special mapping in OpenStack SDK.
- security_group.project_id = security_group_attrs['tenant_id']
-
- return security_group
-
- @staticmethod
- def create_security_groups(attrs=None, count=2):
- """Create multiple fake security groups.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of security groups to fake
- :return:
- A list of FakeResource objects faking the security groups
- """
- security_groups = []
- for i in range(0, count):
- security_groups.append(
- FakeSecurityGroup.create_one_security_group(attrs))
-
- return security_groups
-
- @staticmethod
- def get_security_groups(security_groups=None, count=2):
- """Get an iterable MagicMock object with a list of faked security groups.
-
- If security groups list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List security groups:
- A list of FakeResource objects faking security groups
- :param int count:
- The number of security groups to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- security groups
- """
- if security_groups is None:
- security_groups = FakeSecurityGroup.create_security_groups(count)
- return mock.MagicMock(side_effect=security_groups)
-
-
-class FakeSecurityGroupRule(object):
- """Fake one or more security group rules."""
-
- @staticmethod
- def create_one_security_group_rule(attrs=None):
- """Create a fake security group rule.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, etc.
- """
- attrs = attrs or {}
-
- # Set default attributes.
- security_group_rule_attrs = {
- 'direction': 'ingress',
- 'ethertype': 'IPv4',
- 'id': 'security-group-rule-id-' + uuid.uuid4().hex,
- 'port_range_max': None,
- 'port_range_min': None,
- 'protocol': 'tcp',
- 'remote_group_id': None,
- 'remote_ip_prefix': '0.0.0.0/0',
- 'security_group_id': 'security-group-id-' + uuid.uuid4().hex,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- }
-
- # Overwrite default attributes.
- security_group_rule_attrs.update(attrs)
-
- security_group_rule = fakes.FakeResource(
- info=copy.deepcopy(security_group_rule_attrs),
- loaded=True)
-
- # Set attributes with special mapping in OpenStack SDK.
- security_group_rule.project_id = security_group_rule_attrs['tenant_id']
-
- return security_group_rule
-
- @staticmethod
- def create_security_group_rules(attrs=None, count=2):
- """Create multiple fake security group rules.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of security group rules to fake
- :return:
- A list of FakeResource objects faking the security group rules
- """
- security_group_rules = []
- for i in range(0, count):
- security_group_rules.append(
- FakeSecurityGroupRule.create_one_security_group_rule(attrs))
-
- return security_group_rules
-
- @staticmethod
- def get_security_group_rules(security_group_rules=None, count=2):
- """Get an iterable MagicMock object with a list of faked security group rules.
-
- If security group rules list is provided, then initialize the Mock
- object with the list. Otherwise create one.
-
- :param List security group rules:
- A list of FakeResource objects faking security group rules
- :param int count:
- The number of security group rules to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- security group rules
- """
- if security_group_rules is None:
- security_group_rules = (
- FakeSecurityGroupRule.create_security_group_rules(count))
- return mock.MagicMock(side_effect=security_group_rules)
-
-
-class FakeSubnet(object):
- """Fake one or more subnets."""
-
- @staticmethod
- def create_one_subnet(attrs=None):
- """Create a fake subnet.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object faking the subnet
- """
- attrs = attrs or {}
-
- # Set default attributes.
- project_id = 'project-id-' + uuid.uuid4().hex
- subnet_attrs = {
- 'id': 'subnet-id-' + uuid.uuid4().hex,
- 'name': 'subnet-name-' + uuid.uuid4().hex,
- 'network_id': 'network-id-' + uuid.uuid4().hex,
- 'cidr': '10.10.10.0/24',
- 'tenant_id': project_id,
- 'enable_dhcp': True,
- 'dns_nameservers': [],
- 'allocation_pools': [],
- 'host_routes': [],
- 'ip_version': 4,
- 'gateway_ip': '10.10.10.1',
- 'ipv6_address_mode': None,
- 'ipv6_ra_mode': None,
- 'segment_id': None,
- 'service_types': [],
- 'subnetpool_id': None,
- }
-
- # Overwrite default attributes.
- subnet_attrs.update(attrs)
-
- subnet = fakes.FakeResource(info=copy.deepcopy(subnet_attrs),
- loaded=True)
-
- # Set attributes with special mappings in OpenStack SDK.
- subnet.project_id = subnet_attrs['tenant_id']
-
- return subnet
-
- @staticmethod
- def create_subnets(attrs=None, count=2):
- """Create multiple fake subnets.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of subnets to fake
- :return:
- A list of FakeResource objects faking the subnets
- """
- subnets = []
- for i in range(0, count):
- subnets.append(FakeSubnet.create_one_subnet(attrs))
-
- return subnets
-
- @staticmethod
- def get_subnets(subnets=None, count=2):
- """Get an iterable MagicMock object with a list of faked subnets.
-
- If subnets list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List subnets:
- A list of FakeResource objects faking subnets
- :param int count:
- The number of subnets to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- subnets
- """
- if subnets is None:
- subnets = FakeSubnet.create_subnets(count)
- return mock.MagicMock(side_effect=subnets)
-
-
-class FakeFloatingIP(object):
- """Fake one or more floating ip."""
-
- @staticmethod
- def create_one_floating_ip(attrs=None):
- """Create a fake floating ip.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id, ip, and so on
- """
- attrs = attrs or {}
-
- # Set default attributes.
- floating_ip_attrs = {
- 'id': 'floating-ip-id-' + uuid.uuid4().hex,
- 'floating_ip_address': '1.0.9.0',
- 'fixed_ip_address': '2.0.9.0',
- 'dns_domain': None,
- 'dns_name': None,
- 'status': 'DOWN',
- 'floating_network_id': 'network-id-' + uuid.uuid4().hex,
- 'router_id': 'router-id-' + uuid.uuid4().hex,
- 'port_id': 'port-id-' + uuid.uuid4().hex,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- }
-
- # Overwrite default attributes.
- floating_ip_attrs.update(attrs)
-
- floating_ip = fakes.FakeResource(
- info=copy.deepcopy(floating_ip_attrs),
- loaded=True
- )
-
- # Set attributes with special mappings in OpenStack SDK.
- floating_ip.project_id = floating_ip_attrs['tenant_id']
-
- return floating_ip
-
- @staticmethod
- def create_floating_ips(attrs=None, count=2):
- """Create multiple fake floating ips.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of floating ips to fake
- :return:
- A list of FakeResource objects faking the floating ips
- """
- floating_ips = []
- for i in range(0, count):
- floating_ips.append(FakeFloatingIP.create_one_floating_ip(attrs))
- return floating_ips
-
- @staticmethod
- def get_floating_ips(floating_ips=None, count=2):
- """Get an iterable MagicMock object with a list of faked floating ips.
-
- If floating_ips list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List floating ips:
- A list of FakeResource objects faking floating ips
- :param int count:
- The number of floating ips to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- floating ips
- """
- if floating_ips is None:
- floating_ips = FakeFloatingIP.create_floating_ips(count)
- return mock.MagicMock(side_effect=floating_ips)
-
-
-class FakeSubnetPool(object):
- """Fake one or more subnet pools."""
-
- @staticmethod
- def create_one_subnet_pool(attrs=None):
- """Create a fake subnet pool.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object faking the subnet pool
- """
- attrs = attrs or {}
-
- # Set default attributes.
- subnet_pool_attrs = {
- 'id': 'subnet-pool-id-' + uuid.uuid4().hex,
- 'name': 'subnet-pool-name-' + uuid.uuid4().hex,
- 'prefixes': ['10.0.0.0/24', '10.1.0.0/24'],
- 'default_prefixlen': '8',
- 'address_scope_id': 'address-scope-id-' + uuid.uuid4().hex,
- 'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'is_default': False,
- 'shared': False,
- 'max_prefixlen': '32',
- 'min_prefixlen': '8',
- 'default_quota': None,
- 'ip_version': '4',
- }
-
- # Overwrite default attributes.
- subnet_pool_attrs.update(attrs)
-
- subnet_pool = fakes.FakeResource(
- info=copy.deepcopy(subnet_pool_attrs),
- loaded=True
- )
-
- # Set attributes with special mapping in OpenStack SDK.
- subnet_pool.project_id = subnet_pool_attrs['tenant_id']
-
- return subnet_pool
-
- @staticmethod
- def create_subnet_pools(attrs=None, count=2):
- """Create multiple fake subnet pools.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of subnet pools to fake
- :return:
- A list of FakeResource objects faking the subnet pools
- """
- subnet_pools = []
- for i in range(0, count):
- subnet_pools.append(
- FakeSubnetPool.create_one_subnet_pool(attrs)
- )
-
- return subnet_pools
-
- @staticmethod
- def get_subnet_pools(subnet_pools=None, count=2):
- """Get an iterable MagicMock object with a list of faked subnet pools.
-
- If subnet_pools list is provided, then initialize the Mock object
- with the list. Otherwise create one.
-
- :param List subnet pools:
- A list of FakeResource objects faking subnet pools
- :param int count:
- The number of subnet pools to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- subnet pools
- """
- if subnet_pools is None:
- subnet_pools = FakeSubnetPool.create_subnet_pools(count)
- return mock.MagicMock(side_effect=subnet_pools)
diff --git a/openstackclient/tests/network/v2/test_address_scope.py b/openstackclient/tests/network/v2/test_address_scope.py
deleted file mode 100644
index 342cf49b..00000000
--- a/openstackclient/tests/network/v2/test_address_scope.py
+++ /dev/null
@@ -1,399 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import address_scope
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestAddressScope(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestAddressScope, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestCreateAddressScope(TestAddressScope):
-
- project = identity_fakes_v3.FakeProject.create_one_project()
- domain = identity_fakes_v3.FakeDomain.create_one_domain()
- # The new address scope created.
- new_address_scope = (
- network_fakes.FakeAddressScope.create_one_address_scope(
- attrs={
- 'tenant_id': project.id,
- }
- ))
- columns = (
- 'id',
- 'ip_version',
- 'name',
- 'project_id',
- 'shared'
- )
- data = (
- new_address_scope.id,
- new_address_scope.ip_version,
- new_address_scope.name,
- new_address_scope.project_id,
- new_address_scope.shared,
- )
-
- def setUp(self):
- super(TestCreateAddressScope, self).setUp()
- self.network.create_address_scope = mock.Mock(
- return_value=self.new_address_scope)
-
- # Get the command object to test
- self.cmd = address_scope.CreateAddressScope(self.app, self.namespace)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- # Missing required args should bail here
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self.new_address_scope.name,
- ]
- verifylist = [
- ('project', None),
- ('ip_version', self.new_address_scope.ip_version),
- ('name', self.new_address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_address_scope.assert_called_once_with(**{
- 'ip_version': self.new_address_scope.ip_version,
- 'name': self.new_address_scope.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- '--ip-version', str(self.new_address_scope.ip_version),
- '--share',
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self.new_address_scope.name,
- ]
- verifylist = [
- ('ip_version', self.new_address_scope.ip_version),
- ('share', True),
- ('project', self.project.name),
- ('project_domain', self.domain.name),
- ('name', self.new_address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_address_scope.assert_called_once_with(**{
- 'ip_version': self.new_address_scope.ip_version,
- 'shared': True,
- 'tenant_id': self.project.id,
- 'name': self.new_address_scope.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_no_share(self):
- arglist = [
- '--no-share',
- self.new_address_scope.name,
- ]
- verifylist = [
- ('no_share', True),
- ('name', self.new_address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_address_scope.assert_called_once_with(**{
- 'ip_version': self.new_address_scope.ip_version,
- 'shared': False,
- 'name': self.new_address_scope.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteAddressScope(TestAddressScope):
-
- # The address scope to delete.
- _address_scopes = (
- network_fakes.FakeAddressScope.create_address_scopes(count=2))
-
- def setUp(self):
- super(TestDeleteAddressScope, self).setUp()
- self.network.delete_address_scope = mock.Mock(return_value=None)
- self.network.find_address_scope = (
- network_fakes.FakeAddressScope.get_address_scopes(
- address_scopes=self._address_scopes)
- )
-
- # Get the command object to test
- self.cmd = address_scope.DeleteAddressScope(self.app, self.namespace)
-
- def test_address_scope_delete(self):
- arglist = [
- self._address_scopes[0].name,
- ]
- verifylist = [
- ('address_scope', [self._address_scopes[0].name]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.find_address_scope.assert_called_once_with(
- self._address_scopes[0].name, ignore_missing=False)
- self.network.delete_address_scope.assert_called_once_with(
- self._address_scopes[0])
- self.assertIsNone(result)
-
- def test_multi_address_scopes_delete(self):
- arglist = []
- verifylist = []
-
- for a in self._address_scopes:
- arglist.append(a.name)
- verifylist = [
- ('address_scope', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for a in self._address_scopes:
- calls.append(call(a))
- self.network.delete_address_scope.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_address_scopes_delete_with_exception(self):
- arglist = [
- self._address_scopes[0].name,
- 'unexist_address_scope',
- ]
- verifylist = [
- ('address_scope',
- [self._address_scopes[0].name, 'unexist_address_scope']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._address_scopes[0], exceptions.CommandError]
- self.network.find_address_scope = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 address scopes failed to delete.', str(e))
-
- self.network.find_address_scope.assert_any_call(
- self._address_scopes[0].name, ignore_missing=False)
- self.network.find_address_scope.assert_any_call(
- 'unexist_address_scope', ignore_missing=False)
- self.network.delete_address_scope.assert_called_once_with(
- self._address_scopes[0]
- )
-
-
-class TestListAddressScope(TestAddressScope):
-
- # The address scopes to list up.
- address_scopes = (
- network_fakes.FakeAddressScope.create_address_scopes(count=3))
- columns = (
- 'ID',
- 'Name',
- 'IP Version',
- 'Shared',
- 'Project',
- )
- data = []
- for scope in address_scopes:
- data.append((
- scope.id,
- scope.name,
- scope.ip_version,
- scope.shared,
- scope.project_id,
- ))
-
- def setUp(self):
- super(TestListAddressScope, self).setUp()
- self.network.address_scopes = mock.Mock(
- return_value=self.address_scopes)
-
- # Get the command object to test
- self.cmd = address_scope.ListAddressScope(self.app, self.namespace)
-
- def test_address_scope_list(self):
- arglist = []
- verifylist = []
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.address_scopes.assert_called_once_with(**{})
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestSetAddressScope(TestAddressScope):
-
- # The address scope to set.
- _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
-
- def setUp(self):
- super(TestSetAddressScope, self).setUp()
- self.network.update_address_scope = mock.Mock(return_value=None)
- self.network.find_address_scope = mock.Mock(
- return_value=self._address_scope)
-
- # Get the command object to test
- self.cmd = address_scope.SetAddressScope(self.app, self.namespace)
-
- def test_set_nothing(self):
- arglist = [self._address_scope.name, ]
- verifylist = [
- ('address_scope', self._address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_address_scope.assert_called_with(
- self._address_scope, **attrs)
- self.assertIsNone(result)
-
- def test_set_name_and_share(self):
- arglist = [
- '--name', 'new_address_scope',
- '--share',
- self._address_scope.name,
- ]
- verifylist = [
- ('name', 'new_address_scope'),
- ('share', True),
- ('address_scope', self._address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'name': "new_address_scope",
- 'shared': True,
- }
- self.network.update_address_scope.assert_called_with(
- self._address_scope, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_share(self):
- arglist = [
- '--no-share',
- self._address_scope.name,
- ]
- verifylist = [
- ('no_share', True),
- ('address_scope', self._address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'shared': False,
- }
- self.network.update_address_scope.assert_called_with(
- self._address_scope, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowAddressScope(TestAddressScope):
-
- # The address scope to show.
- _address_scope = (
- network_fakes.FakeAddressScope.create_one_address_scope())
- columns = (
- 'id',
- 'ip_version',
- 'name',
- 'project_id',
- 'shared',
- )
- data = (
- _address_scope.id,
- _address_scope.ip_version,
- _address_scope.name,
- _address_scope.project_id,
- _address_scope.shared,
- )
-
- def setUp(self):
- super(TestShowAddressScope, self).setUp()
- self.network.find_address_scope = mock.Mock(
- return_value=self._address_scope)
-
- # Get the command object to test
- self.cmd = address_scope.ShowAddressScope(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- # Missing required args should bail here
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._address_scope.name,
- ]
- verifylist = [
- ('address_scope', self._address_scope.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_address_scope.assert_called_once_with(
- self._address_scope.name, ignore_missing=False)
- self.assertEqual(self.columns, columns)
- self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/network/v2/test_floating_ip.py b/openstackclient/tests/network/v2/test_floating_ip.py
deleted file mode 100644
index 234fe446..00000000
--- a/openstackclient/tests/network/v2/test_floating_ip.py
+++ /dev/null
@@ -1,565 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import floating_ip
-from openstackclient.tests.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-# Tests for Neutron network
-#
-class TestFloatingIPNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestFloatingIPNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestCreateFloatingIPNetwork(TestFloatingIPNetwork):
-
- # Fake data for option tests.
- floating_network = network_fakes.FakeNetwork.create_one_network()
- subnet = network_fakes.FakeSubnet.create_one_subnet()
- port = network_fakes.FakePort.create_one_port()
-
- # The floating ip to be deleted.
- floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip(
- attrs={
- 'floating_network_id': floating_network.id,
- 'port_id': port.id,
- }
- )
-
- columns = (
- 'dns_domain',
- 'dns_name',
- 'fixed_ip_address',
- 'floating_ip_address',
- 'floating_network_id',
- 'id',
- 'port_id',
- 'project_id',
- 'router_id',
- 'status',
- )
-
- data = (
- floating_ip.dns_domain,
- floating_ip.dns_name,
- floating_ip.fixed_ip_address,
- floating_ip.floating_ip_address,
- floating_ip.floating_network_id,
- floating_ip.id,
- floating_ip.port_id,
- floating_ip.project_id,
- floating_ip.router_id,
- floating_ip.status,
- )
-
- def setUp(self):
- super(TestCreateFloatingIPNetwork, self).setUp()
-
- self.network.create_ip = mock.Mock(return_value=self.floating_ip)
-
- self.network.find_network = mock.Mock(
- return_value=self.floating_network)
- self.network.find_subnet = mock.Mock(return_value=self.subnet)
- self.network.find_port = mock.Mock(return_value=self.port)
-
- # Get the command object to test
- self.cmd = floating_ip.CreateFloatingIP(self.app, self.namespace)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self.floating_ip.floating_network_id,
- ]
- verifylist = [
- ('network', self.floating_ip.floating_network_id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_ip.assert_called_once_with(**{
- 'floating_network_id': self.floating_ip.floating_network_id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- '--subnet', self.subnet.id,
- '--port', self.floating_ip.port_id,
- '--floating-ip-address', self.floating_ip.floating_ip_address,
- '--fixed-ip-address', self.floating_ip.fixed_ip_address,
- self.floating_ip.floating_network_id,
- ]
- verifylist = [
- ('subnet', self.subnet.id),
- ('port', self.floating_ip.port_id),
- ('floating_ip_address', self.floating_ip.floating_ip_address),
- ('fixed_ip_address', self.floating_ip.fixed_ip_address),
- ('network', self.floating_ip.floating_network_id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_ip.assert_called_once_with(**{
- 'subnet_id': self.subnet.id,
- 'port_id': self.floating_ip.port_id,
- 'floating_ip_address': self.floating_ip.floating_ip_address,
- 'fixed_ip_address': self.floating_ip.fixed_ip_address,
- 'floating_network_id': self.floating_ip.floating_network_id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
-
- # The floating ips to be deleted.
- floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2)
-
- def setUp(self):
- super(TestDeleteFloatingIPNetwork, self).setUp()
-
- self.network.delete_ip = mock.Mock(return_value=None)
- self.network.find_ip = (
- network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
-
- # Get the command object to test
- self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace)
-
- def test_floating_ip_delete(self):
- arglist = [
- self.floating_ips[0].id,
- ]
- verifylist = [
- ('floating_ip', [self.floating_ips[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.find_ip.assert_called_once_with(
- self.floating_ips[0].id, ignore_missing=False)
- self.network.delete_ip.assert_called_once_with(self.floating_ips[0])
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete(self):
- arglist = []
- verifylist = []
-
- for f in self.floating_ips:
- arglist.append(f.id)
- verifylist = [
- ('floating_ip', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for f in self.floating_ips:
- calls.append(call(f))
- self.network.delete_ip.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete_with_exception(self):
- arglist = [
- self.floating_ips[0].id,
- 'unexist_floating_ip',
- ]
- verifylist = [
- ('floating_ip',
- [self.floating_ips[0].id, 'unexist_floating_ip']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self.floating_ips[0], exceptions.CommandError]
- self.network.find_ip = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
-
- self.network.find_ip.assert_any_call(
- self.floating_ips[0].id, ignore_missing=False)
- self.network.find_ip.assert_any_call(
- 'unexist_floating_ip', ignore_missing=False)
- self.network.delete_ip.assert_called_once_with(
- self.floating_ips[0]
- )
-
-
-class TestListFloatingIPNetwork(TestFloatingIPNetwork):
-
- # The floating ips to list up
- floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=3)
-
- columns = (
- 'ID',
- 'Floating IP Address',
- 'Fixed IP Address',
- 'Port',
- )
-
- data = []
- for ip in floating_ips:
- data.append((
- ip.id,
- ip.floating_ip_address,
- ip.fixed_ip_address,
- ip.port_id,
- ))
-
- def setUp(self):
- super(TestListFloatingIPNetwork, self).setUp()
-
- self.network.ips = mock.Mock(return_value=self.floating_ips)
-
- # Get the command object to test
- self.cmd = floating_ip.ListFloatingIP(self.app, self.namespace)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.ips.assert_called_once_with(**{})
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowFloatingIPNetwork(TestFloatingIPNetwork):
-
- # The floating ip to display.
- floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip()
-
- columns = (
- 'dns_domain',
- 'dns_name',
- 'fixed_ip_address',
- 'floating_ip_address',
- 'floating_network_id',
- 'id',
- 'port_id',
- 'project_id',
- 'router_id',
- 'status',
- )
-
- data = (
- floating_ip.dns_domain,
- floating_ip.dns_name,
- floating_ip.fixed_ip_address,
- floating_ip.floating_ip_address,
- floating_ip.floating_network_id,
- floating_ip.id,
- floating_ip.port_id,
- floating_ip.tenant_id,
- floating_ip.router_id,
- floating_ip.status,
- )
-
- def setUp(self):
- super(TestShowFloatingIPNetwork, self).setUp()
-
- self.network.find_ip = mock.Mock(return_value=self.floating_ip)
-
- # Get the command object to test
- self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace)
-
- def test_floating_ip_show(self):
- arglist = [
- self.floating_ip.id,
- ]
- verifylist = [
- ('floating_ip', self.floating_ip.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_ip.assert_called_once_with(
- self.floating_ip.id,
- ignore_missing=False
- )
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-# Tests for Nova network
-#
-class TestFloatingIPCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestFloatingIPCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
-class TestCreateFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ip to be deleted.
- floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
-
- columns = (
- 'fixed_ip',
- 'id',
- 'instance_id',
- 'ip',
- 'pool',
- )
-
- data = (
- floating_ip.fixed_ip,
- floating_ip.id,
- floating_ip.instance_id,
- floating_ip.ip,
- floating_ip.pool,
- )
-
- def setUp(self):
- super(TestCreateFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.create.return_value = self.floating_ip
-
- # Get the command object to test
- self.cmd = floating_ip.CreateFloatingIP(self.app, None)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self.floating_ip.pool,
- ]
- verifylist = [
- ('network', self.floating_ip.pool),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.create.assert_called_once_with(
- self.floating_ip.pool)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ips to be deleted.
- floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
-
- def setUp(self):
- super(TestDeleteFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.delete.return_value = None
-
- # Return value of utils.find_resource()
- self.compute.floating_ips.get = (
- compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
-
- # Get the command object to test
- self.cmd = floating_ip.DeleteFloatingIP(self.app, None)
-
- def test_floating_ip_delete(self):
- arglist = [
- self.floating_ips[0].id,
- ]
- verifylist = [
- ('floating_ip', [self.floating_ips[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.delete.assert_called_once_with(
- self.floating_ips[0].id
- )
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete(self):
- arglist = []
- verifylist = []
-
- for f in self.floating_ips:
- arglist.append(f.id)
- verifylist = [
- ('floating_ip', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for f in self.floating_ips:
- calls.append(call(f.id))
- self.compute.floating_ips.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete_with_exception(self):
- arglist = [
- self.floating_ips[0].id,
- 'unexist_floating_ip',
- ]
- verifylist = [
- ('floating_ip',
- [self.floating_ips[0].id, 'unexist_floating_ip']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self.floating_ips[0], exceptions.CommandError]
- self.compute.floating_ips.get = (
- mock.MagicMock(side_effect=find_mock_result)
- )
- self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
-
- self.compute.floating_ips.get.assert_any_call(
- self.floating_ips[0].id)
- self.compute.floating_ips.get.assert_any_call(
- 'unexist_floating_ip')
- self.compute.floating_ips.delete.assert_called_once_with(
- self.floating_ips[0].id
- )
-
-
-class TestListFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ips to be list up
- floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3)
-
- columns = (
- 'ID',
- 'Floating IP Address',
- 'Fixed IP Address',
- 'Server',
- 'Pool',
- )
-
- data = []
- for ip in floating_ips:
- data.append((
- ip.id,
- ip.ip,
- ip.fixed_ip,
- ip.instance_id,
- ip.pool,
- ))
-
- def setUp(self):
- super(TestListFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.list.return_value = self.floating_ips
-
- # Get the command object to test
- self.cmd = floating_ip.ListFloatingIP(self.app, None)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ip to display.
- floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
-
- columns = (
- 'fixed_ip',
- 'id',
- 'instance_id',
- 'ip',
- 'pool',
- )
-
- data = (
- floating_ip.fixed_ip,
- floating_ip.id,
- floating_ip.instance_id,
- floating_ip.ip,
- floating_ip.pool,
- )
-
- def setUp(self):
- super(TestShowFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Return value of utils.find_resource()
- self.compute.floating_ips.get.return_value = self.floating_ip
-
- # Get the command object to test
- self.cmd = floating_ip.ShowFloatingIP(self.app, None)
-
- def test_floating_ip_show(self):
- arglist = [
- self.floating_ip.id,
- ]
- verifylist = [
- ('floating_ip', self.floating_ip.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_floating_ip_pool.py b/openstackclient/tests/network/v2/test_floating_ip_pool.py
deleted file mode 100644
index 22d20d20..00000000
--- a/openstackclient/tests/network/v2/test_floating_ip_pool.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import floating_ip_pool
-from openstackclient.tests.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.network.v2 import fakes as network_fakes
-
-
-# Tests for Network API v2
-#
-class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestFloatingIPPoolNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork):
-
- def setUp(self):
- super(TestListFloatingIPPoolNetwork, self).setUp()
-
- # Get the command object to test
- self.cmd = floating_ip_pool.ListFloatingIPPool(self.app,
- self.namespace)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
-
-# Tests for Compute network
-#
-class TestFloatingIPPoolCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestFloatingIPPoolCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
-class TestListFloatingIPPoolCompute(TestFloatingIPPoolCompute):
-
- # The floating ip pools to list up
- floating_ip_pools = \
- compute_fakes.FakeFloatingIPPool.create_floating_ip_pools(count=3)
-
- columns = (
- 'Name',
- )
-
- data = []
- for pool in floating_ip_pools:
- data.append((
- pool.name,
- ))
-
- def setUp(self):
- super(TestListFloatingIPPoolCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ip_pools.list.return_value = \
- self.floating_ip_pools
-
- # Get the command object to test
- self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ip_pools.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/network/v2/test_ip_availability.py b/openstackclient/tests/network/v2/test_ip_availability.py
deleted file mode 100644
index f74bf8f7..00000000
--- a/openstackclient/tests/network/v2/test_ip_availability.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-
-from osc_lib import utils as common_utils
-
-from openstackclient.network.v2 import ip_availability
-from openstackclient.tests.identity.v3 import fakes as identity_fakes
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestIPAvailability(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestIPAvailability, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
-
- self.project = identity_fakes.FakeProject.create_one_project()
- self.projects_mock.get.return_value = self.project
-
-
-class TestListIPAvailability(TestIPAvailability):
-
- _ip_availability = \
- network_fakes.FakeIPAvailability.create_ip_availability(count=3)
- columns = (
- 'Network ID',
- 'Network Name',
- 'Total IPs',
- 'Used IPs',
- )
- data = []
- for net in _ip_availability:
- data.append((
- net.network_id,
- net.network_name,
- net.total_ips,
- net.used_ips,
- ))
-
- def setUp(self):
- super(TestListIPAvailability, self).setUp()
-
- self.cmd = ip_availability.ListIPAvailability(
- self.app, self.namespace)
- self.network.network_ip_availabilities = mock.Mock(
- return_value=self._ip_availability)
-
- def test_list_no_options(self):
- arglist = []
- verifylist = []
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'ip_version': 4}
-
- self.network.network_ip_availabilities.assert_called_once_with(
- **filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_list_ip_version(self):
- arglist = [
- '--ip-version', str(4),
- ]
- verifylist = [
- ('ip_version', 4)
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'ip_version': 4}
-
- self.network.network_ip_availabilities.assert_called_once_with(
- **filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_list_project(self):
- arglist = [
- '--project', self.project.name
- ]
- verifylist = [
- ('project', self.project.name)
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'tenant_id': self.project.id,
- 'ip_version': 4}
-
- self.network.network_ip_availabilities.assert_called_once_with(
- **filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowIPAvailability(TestIPAvailability):
-
- _ip_availability = \
- network_fakes.FakeIPAvailability.create_one_ip_availability()
-
- columns = (
- 'network_id',
- 'network_name',
- 'project_id',
- 'subnet_ip_availability',
- 'total_ips',
- 'used_ips',
- )
- data = (
- _ip_availability.network_id,
- _ip_availability.network_name,
- _ip_availability.tenant_id,
- common_utils.format_list(
- _ip_availability.subnet_ip_availability),
- _ip_availability.total_ips,
- _ip_availability.used_ips,
- )
-
- def setUp(self):
- super(TestShowIPAvailability, self).setUp()
-
- self.network.find_network_ip_availability = mock.Mock(
- return_value=self._ip_availability)
-
- # Get the command object to test
- self.cmd = ip_availability.ShowIPAvailability(
- self.app, self.namespace)
-
- def test_show_no_option(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._ip_availability.network_name,
- ]
- verifylist = [
- ('network', self._ip_availability.network_name)
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
- self.network.find_network_ip_availability.assert_called_once_with(
- self._ip_availability.network_name,
- ignore_missing=False)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_network.py b/openstackclient/tests/network/v2/test_network.py
deleted file mode 100644
index bb606819..00000000
--- a/openstackclient/tests/network/v2/test_network.py
+++ /dev/null
@@ -1,1059 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-from osc_lib import utils
-
-from openstackclient.network.v2 import network
-from openstackclient.tests.compute.v2 import fakes as compute_fakes
-from openstackclient.tests import fakes
-from openstackclient.tests.identity.v2_0 import fakes as identity_fakes_v2
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-# Tests for Neutron network
-#
-class TestNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestCreateNetworkIdentityV3(TestNetwork):
-
- project = identity_fakes_v3.FakeProject.create_one_project()
- domain = identity_fakes_v3.FakeDomain.create_one_domain()
- # The new network created.
- _network = network_fakes.FakeNetwork.create_one_network(
- attrs={
- 'tenant_id': project.id,
- 'availability_zone_hints': ["nova"],
- }
- )
-
- columns = (
- 'admin_state_up',
- 'availability_zone_hints',
- 'availability_zones',
- 'id',
- 'is_default',
- 'name',
- 'port_security_enabled',
- 'project_id',
- 'provider_network_type',
- 'router:external',
- 'shared',
- 'status',
- 'subnets',
- )
-
- data = (
- network._format_admin_state(_network.admin_state_up),
- utils.format_list(_network.availability_zone_hints),
- utils.format_list(_network.availability_zones),
- _network.id,
- _network.is_default,
- _network.name,
- _network.is_port_security_enabled,
- _network.project_id,
- _network.provider_network_type,
- network._format_router_external(_network.is_router_external),
- _network.shared,
- _network.status,
- utils.format_list(_network.subnets),
- )
-
- def setUp(self):
- super(TestCreateNetworkIdentityV3, self).setUp()
-
- self.network.create_network = mock.Mock(return_value=self._network)
-
- # Get the command object to test
- self.cmd = network.CreateNetwork(self.app, self.namespace)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self._network.name,
- ]
- verifylist = [
- ('name', self._network.name),
- ('enable', True),
- ('share', None),
- ('project', None),
- ('external', False),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_network.assert_called_once_with(**{
- 'admin_state_up': True,
- 'name': self._network.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- "--disable",
- "--share",
- "--project", self.project.name,
- "--project-domain", self.domain.name,
- "--availability-zone-hint", "nova",
- "--external", "--default",
- "--provider-network-type", "vlan",
- "--provider-physical-network", "physnet1",
- "--provider-segment", "400",
- "--transparent-vlan",
- "--enable-port-security",
- self._network.name,
- ]
- verifylist = [
- ('disable', True),
- ('share', True),
- ('project', self.project.name),
- ('project_domain', self.domain.name),
- ('availability_zone_hints', ["nova"]),
- ('external', True),
- ('default', True),
- ('provider_network_type', 'vlan'),
- ('physical_network', 'physnet1'),
- ('segmentation_id', '400'),
- ('transparent_vlan', True),
- ('enable_port_security', True),
- ('name', self._network.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_network.assert_called_once_with(**{
- 'admin_state_up': False,
- 'availability_zone_hints': ["nova"],
- 'name': self._network.name,
- 'shared': True,
- 'tenant_id': self.project.id,
- 'is_default': True,
- 'router:external': True,
- 'provider:network_type': 'vlan',
- 'provider:physical_network': 'physnet1',
- 'provider:segmentation_id': '400',
- 'vlan_transparent': True,
- 'port_security_enabled': True,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_other_options(self):
- arglist = [
- "--enable",
- "--no-share",
- "--disable-port-security",
- self._network.name,
- ]
- verifylist = [
- ('enable', True),
- ('no_share', True),
- ('name', self._network.name),
- ('external', False),
- ('disable_port_security', True),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_network.assert_called_once_with(**{
- 'admin_state_up': True,
- 'name': self._network.name,
- 'shared': False,
- 'port_security_enabled': False,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestCreateNetworkIdentityV2(TestNetwork):
-
- project = identity_fakes_v2.FakeProject.create_one_project()
- # The new network created.
- _network = network_fakes.FakeNetwork.create_one_network(
- attrs={'tenant_id': project.id}
- )
-
- columns = (
- 'admin_state_up',
- 'availability_zone_hints',
- 'availability_zones',
- 'id',
- 'is_default',
- 'name',
- 'port_security_enabled',
- 'project_id',
- 'provider_network_type',
- 'router:external',
- 'shared',
- 'status',
- 'subnets',
- )
-
- data = (
- network._format_admin_state(_network.admin_state_up),
- utils.format_list(_network.availability_zone_hints),
- utils.format_list(_network.availability_zones),
- _network.id,
- _network.is_default,
- _network.name,
- _network.is_port_security_enabled,
- _network.project_id,
- _network.provider_network_type,
- network._format_router_external(_network.is_router_external),
- _network.shared,
- _network.status,
- utils.format_list(_network.subnets),
- )
-
- def setUp(self):
- super(TestCreateNetworkIdentityV2, self).setUp()
-
- self.network.create_network = mock.Mock(return_value=self._network)
-
- # Get the command object to test
- self.cmd = network.CreateNetwork(self.app, self.namespace)
-
- # Set identity client v2. And get a shortcut to Identity client.
- identity_client = identity_fakes_v2.FakeIdentityv2Client(
- endpoint=fakes.AUTH_URL,
- token=fakes.AUTH_TOKEN,
- )
- self.app.client_manager.identity = identity_client
- self.identity = self.app.client_manager.identity
-
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.identity.tenants
- self.projects_mock.get.return_value = self.project
-
- # There is no DomainManager Mock in fake identity v2.
-
- def test_create_with_project_identityv2(self):
- arglist = [
- "--project", self.project.name,
- self._network.name,
- ]
- verifylist = [
- ('enable', True),
- ('share', None),
- ('name', self._network.name),
- ('project', self.project.name),
- ('external', False),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_network.assert_called_once_with(**{
- 'admin_state_up': True,
- 'name': self._network.name,
- 'tenant_id': self.project.id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_with_domain_identityv2(self):
- arglist = [
- "--project", self.project.name,
- "--project-domain", "domain-name",
- self._network.name,
- ]
- verifylist = [
- ('enable', True),
- ('share', None),
- ('project', self.project.name),
- ('project_domain', "domain-name"),
- ('name', self._network.name),
- ('external', False),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- self.assertRaises(
- AttributeError,
- self.cmd.take_action,
- parsed_args,
- )
-
-
-class TestDeleteNetwork(TestNetwork):
-
- def setUp(self):
- super(TestDeleteNetwork, self).setUp()
-
- # The networks to delete
- self._networks = network_fakes.FakeNetwork.create_networks(count=3)
-
- self.network.delete_network = mock.Mock(return_value=None)
-
- self.network.find_network = network_fakes.FakeNetwork.get_networks(
- networks=self._networks)
-
- # Get the command object to test
- self.cmd = network.DeleteNetwork(self.app, self.namespace)
-
- def test_delete_one_network(self):
- arglist = [
- self._networks[0].name,
- ]
- verifylist = [
- ('network', [self._networks[0].name]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.delete_network.assert_called_once_with(self._networks[0])
- self.assertIsNone(result)
-
- def test_delete_multiple_networks(self):
- arglist = []
- for n in self._networks:
- arglist.append(n.id)
- verifylist = [
- ('network', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for n in self._networks:
- calls.append(call(n))
- self.network.delete_network.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_delete_multiple_networks_exception(self):
- arglist = [
- self._networks[0].id,
- 'xxxx-yyyy-zzzz',
- self._networks[1].id,
- ]
- verifylist = [
- ('network', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # Fake exception in find_network()
- ret_find = [
- self._networks[0],
- exceptions.NotFound('404'),
- self._networks[1],
- ]
- self.network.find_network = mock.Mock(side_effect=ret_find)
-
- # Fake exception in delete_network()
- ret_delete = [
- None,
- exceptions.NotFound('404'),
- ]
- self.network.delete_network = mock.Mock(side_effect=ret_delete)
-
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- # The second call of find_network() should fail. So delete_network()
- # was only called twice.
- calls = [
- call(self._networks[0]),
- call(self._networks[1]),
- ]
- self.network.delete_network.assert_has_calls(calls)
-
-
-class TestListNetwork(TestNetwork):
-
- # The networks going to be listed up.
- _network = network_fakes.FakeNetwork.create_networks(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Subnets',
- )
- columns_long = (
- 'ID',
- 'Name',
- 'Status',
- 'Project',
- 'State',
- 'Shared',
- 'Subnets',
- 'Network Type',
- 'Router Type',
- 'Availability Zones',
- )
-
- data = []
- for net in _network:
- data.append((
- net.id,
- net.name,
- utils.format_list(net.subnets),
- ))
-
- data_long = []
- for net in _network:
- data_long.append((
- net.id,
- net.name,
- net.status,
- net.project_id,
- network._format_admin_state(net.admin_state_up),
- net.shared,
- utils.format_list(net.subnets),
- net.provider_network_type,
- network._format_router_external(net.is_router_external),
- utils.format_list(net.availability_zones),
- ))
-
- def setUp(self):
- super(TestListNetwork, self).setUp()
-
- # Get the command object to test
- self.cmd = network.ListNetwork(self.app, self.namespace)
-
- self.network.networks = mock.Mock(return_value=self._network)
-
- def test_network_list_no_options(self):
- arglist = []
- verifylist = [
- ('external', False),
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.networks.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_list_external(self):
- arglist = [
- '--external',
- ]
- verifylist = [
- ('external', True),
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.networks.assert_called_once_with(
- **{'router:external': True}
- )
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_network_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ('external', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.networks.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertEqual(self.data_long, list(data))
-
-
-class TestSetNetwork(TestNetwork):
-
- # The network to set.
- _network = network_fakes.FakeNetwork.create_one_network()
-
- def setUp(self):
- super(TestSetNetwork, self).setUp()
-
- self.network.update_network = mock.Mock(return_value=None)
-
- self.network.find_network = mock.Mock(return_value=self._network)
-
- # Get the command object to test
- self.cmd = network.SetNetwork(self.app, self.namespace)
-
- def test_set_this(self):
- arglist = [
- self._network.name,
- '--enable',
- '--name', 'noob',
- '--share',
- '--external',
- '--default',
- '--provider-network-type', 'vlan',
- '--provider-physical-network', 'physnet1',
- '--provider-segment', '400',
- '--no-transparent-vlan',
- '--enable-port-security',
- ]
- verifylist = [
- ('network', self._network.name),
- ('enable', True),
- ('name', 'noob'),
- ('share', True),
- ('external', True),
- ('default', True),
- ('provider_network_type', 'vlan'),
- ('physical_network', 'physnet1'),
- ('segmentation_id', '400'),
- ('no_transparent_vlan', True),
- ('enable_port_security', True),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'name': 'noob',
- 'admin_state_up': True,
- 'shared': True,
- 'router:external': True,
- 'is_default': True,
- 'provider:network_type': 'vlan',
- 'provider:physical_network': 'physnet1',
- 'provider:segmentation_id': '400',
- 'vlan_transparent': False,
- 'port_security_enabled': True,
- }
- self.network.update_network.assert_called_once_with(
- self._network, **attrs)
- self.assertIsNone(result)
-
- def test_set_that(self):
- arglist = [
- self._network.name,
- '--disable',
- '--no-share',
- '--internal',
- '--disable-port-security',
- ]
- verifylist = [
- ('network', self._network.name),
- ('disable', True),
- ('no_share', True),
- ('internal', True),
- ('disable_port_security', True),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': False,
- 'shared': False,
- 'router:external': False,
- 'port_security_enabled': False,
- }
- self.network.update_network.assert_called_once_with(
- self._network, **attrs)
- self.assertIsNone(result)
-
- def test_set_nothing(self):
- arglist = [self._network.name, ]
- verifylist = [('network', self._network.name), ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_network.assert_called_once_with(
- self._network, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowNetwork(TestNetwork):
-
- # The network to show.
- _network = network_fakes.FakeNetwork.create_one_network()
-
- columns = (
- 'admin_state_up',
- 'availability_zone_hints',
- 'availability_zones',
- 'id',
- 'is_default',
- 'name',
- 'port_security_enabled',
- 'project_id',
- 'provider_network_type',
- 'router:external',
- 'shared',
- 'status',
- 'subnets',
- )
-
- data = (
- network._format_admin_state(_network.admin_state_up),
- utils.format_list(_network.availability_zone_hints),
- utils.format_list(_network.availability_zones),
- _network.id,
- _network.is_default,
- _network.name,
- _network.is_port_security_enabled,
- _network.project_id,
- _network.provider_network_type,
- network._format_router_external(_network.is_router_external),
- _network.shared,
- _network.status,
- utils.format_list(_network.subnets),
- )
-
- def setUp(self):
- super(TestShowNetwork, self).setUp()
-
- self.network.find_network = mock.Mock(return_value=self._network)
-
- # Get the command object to test
- self.cmd = network.ShowNetwork(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._network.name,
- ]
- verifylist = [
- ('network', self._network.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_network.assert_called_once_with(
- self._network.name, ignore_missing=False)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-# Tests for Nova network
-#
-class TestNetworkCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestNetworkCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
-class TestCreateNetworkCompute(TestNetworkCompute):
-
- # The network to create.
- _network = compute_fakes.FakeNetwork.create_one_network()
-
- columns = (
- 'bridge',
- 'bridge_interface',
- 'broadcast',
- 'cidr',
- 'cidr_v6',
- 'created_at',
- 'deleted',
- 'deleted_at',
- 'dhcp_server',
- 'dhcp_start',
- 'dns1',
- 'dns2',
- 'enable_dhcp',
- 'gateway',
- 'gateway_v6',
- 'host',
- 'id',
- 'injected',
- 'label',
- 'mtu',
- 'multi_host',
- 'netmask',
- 'netmask_v6',
- 'priority',
- 'project_id',
- 'rxtx_base',
- 'share_address',
- 'updated_at',
- 'vlan',
- 'vpn_private_address',
- 'vpn_public_address',
- 'vpn_public_port',
- )
-
- data = (
- _network.bridge,
- _network.bridge_interface,
- _network.broadcast,
- _network.cidr,
- _network.cidr_v6,
- _network.created_at,
- _network.deleted,
- _network.deleted_at,
- _network.dhcp_server,
- _network.dhcp_start,
- _network.dns1,
- _network.dns2,
- _network.enable_dhcp,
- _network.gateway,
- _network.gateway_v6,
- _network.host,
- _network.id,
- _network.injected,
- _network.label,
- _network.mtu,
- _network.multi_host,
- _network.netmask,
- _network.netmask_v6,
- _network.priority,
- _network.project_id,
- _network.rxtx_base,
- _network.share_address,
- _network.updated_at,
- _network.vlan,
- _network.vpn_private_address,
- _network.vpn_public_address,
- _network.vpn_public_port,
- )
-
- def setUp(self):
- super(TestCreateNetworkCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.networks.create.return_value = self._network
-
- # Get the command object to test
- self.cmd = network.CreateNetwork(self.app, None)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- # Missing required args should raise exception here
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- "--subnet", self._network.cidr,
- self._network.label,
- ]
- verifylist = [
- ('subnet', self._network.cidr),
- ('name', self._network.label),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.networks.create.assert_called_once_with(**{
- 'cidr': self._network.cidr,
- 'label': self._network.label,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteNetworkCompute(TestNetworkCompute):
-
- def setUp(self):
- super(TestDeleteNetworkCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # The networks to delete
- self._networks = compute_fakes.FakeNetwork.create_networks(count=3)
-
- self.compute.networks.delete.return_value = None
-
- # Return value of utils.find_resource()
- self.compute.networks.get = \
- compute_fakes.FakeNetwork.get_networks(networks=self._networks)
-
- # Get the command object to test
- self.cmd = network.DeleteNetwork(self.app, None)
-
- def test_delete_one_network(self):
- arglist = [
- self._networks[0].label,
- ]
- verifylist = [
- ('network', [self._networks[0].label]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.compute.networks.delete.assert_called_once_with(
- self._networks[0].id)
- self.assertIsNone(result)
-
- def test_delete_multiple_networks(self):
- arglist = []
- for n in self._networks:
- arglist.append(n.label)
- verifylist = [
- ('network', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for n in self._networks:
- calls.append(call(n.id))
- self.compute.networks.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_delete_multiple_networks_exception(self):
- arglist = [
- self._networks[0].id,
- 'xxxx-yyyy-zzzz',
- self._networks[1].id,
- ]
- verifylist = [
- ('network', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # Fake exception in utils.find_resource()
- # In compute v2, we use utils.find_resource() to find a network.
- # It calls get() several times, but find() only one time. So we
- # choose to fake get() always raise exception, then pass through.
- # And fake find() to find the real network or not.
- self.compute.networks.get.side_effect = Exception()
- ret_find = [
- self._networks[0],
- Exception(),
- self._networks[1],
- ]
- self.compute.networks.find.side_effect = ret_find
-
- # Fake exception in delete()
- ret_delete = [
- None,
- Exception(),
- ]
- self.compute.networks.delete = mock.Mock(side_effect=ret_delete)
-
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- # The second call of utils.find_resource() should fail. So delete()
- # was only called twice.
- calls = [
- call(self._networks[0].id),
- call(self._networks[1].id),
- ]
- self.compute.networks.delete.assert_has_calls(calls)
-
-
-class TestListNetworkCompute(TestNetworkCompute):
-
- # The networks going to be listed up.
- _networks = compute_fakes.FakeNetwork.create_networks(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Subnet',
- )
-
- data = []
- for net in _networks:
- data.append((
- net.id,
- net.label,
- net.cidr,
- ))
-
- def setUp(self):
- super(TestListNetworkCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.networks.list.return_value = self._networks
-
- # Get the command object to test
- self.cmd = network.ListNetwork(self.app, None)
-
- def test_network_list_no_options(self):
- arglist = []
- verifylist = [
- ('external', False),
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.networks.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowNetworkCompute(TestNetworkCompute):
-
- # The network to show.
- _network = compute_fakes.FakeNetwork.create_one_network()
-
- columns = (
- 'bridge',
- 'bridge_interface',
- 'broadcast',
- 'cidr',
- 'cidr_v6',
- 'created_at',
- 'deleted',
- 'deleted_at',
- 'dhcp_server',
- 'dhcp_start',
- 'dns1',
- 'dns2',
- 'enable_dhcp',
- 'gateway',
- 'gateway_v6',
- 'host',
- 'id',
- 'injected',
- 'label',
- 'mtu',
- 'multi_host',
- 'netmask',
- 'netmask_v6',
- 'priority',
- 'project_id',
- 'rxtx_base',
- 'share_address',
- 'updated_at',
- 'vlan',
- 'vpn_private_address',
- 'vpn_public_address',
- 'vpn_public_port',
- )
-
- data = (
- _network.bridge,
- _network.bridge_interface,
- _network.broadcast,
- _network.cidr,
- _network.cidr_v6,
- _network.created_at,
- _network.deleted,
- _network.deleted_at,
- _network.dhcp_server,
- _network.dhcp_start,
- _network.dns1,
- _network.dns2,
- _network.enable_dhcp,
- _network.gateway,
- _network.gateway_v6,
- _network.host,
- _network.id,
- _network.injected,
- _network.label,
- _network.mtu,
- _network.multi_host,
- _network.netmask,
- _network.netmask_v6,
- _network.priority,
- _network.project_id,
- _network.rxtx_base,
- _network.share_address,
- _network.updated_at,
- _network.vlan,
- _network.vpn_private_address,
- _network.vpn_public_address,
- _network.vpn_public_port,
- )
-
- def setUp(self):
- super(TestShowNetworkCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Return value of utils.find_resource()
- self.compute.networks.get.return_value = self._network
-
- # Get the command object to test
- self.cmd = network.ShowNetwork(self.app, None)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._network.label,
- ]
- verifylist = [
- ('network', self._network.label),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_network_agent.py b/openstackclient/tests/network/v2/test_network_agent.py
deleted file mode 100644
index 269d4e1d..00000000
--- a/openstackclient/tests/network/v2/test_network_agent.py
+++ /dev/null
@@ -1,294 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-from osc_lib import utils
-
-from openstackclient.network.v2 import network_agent
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestNetworkAgent(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestNetworkAgent, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestDeleteNetworkAgent(TestNetworkAgent):
-
- network_agents = (
- network_fakes.FakeNetworkAgent.create_network_agents(count=2))
-
- def setUp(self):
- super(TestDeleteNetworkAgent, self).setUp()
- self.network.delete_agent = mock.Mock(return_value=None)
- self.network.get_agent = (
- network_fakes.FakeNetworkAgent.get_network_agents(
- agents=self.network_agents)
- )
-
- # Get the command object to test
- self.cmd = network_agent.DeleteNetworkAgent(self.app, self.namespace)
-
- def test_network_agent_delete(self):
- arglist = [
- self.network_agents[0].id,
- ]
- verifylist = [
- ('network_agent', [self.network_agents[0].id]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.get_agent.assert_called_once_with(
- self.network_agents[0].id, ignore_missing=False)
- self.network.delete_agent.assert_called_once_with(
- self.network_agents[0])
- self.assertIsNone(result)
-
- def test_multi_network_agents_delete(self):
- arglist = []
- verifylist = []
-
- for n in self.network_agents:
- arglist.append(n.id)
- verifylist = [
- ('network_agent', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for n in self.network_agents:
- calls.append(call(n))
- self.network.delete_agent.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_network_agents_delete_with_exception(self):
- arglist = [
- self.network_agents[0].id,
- 'unexist_network_agent',
- ]
- verifylist = [
- ('network_agent',
- [self.network_agents[0].id, 'unexist_network_agent']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self.network_agents[0], exceptions.CommandError]
- self.network.get_agent = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 network agents failed to delete.', str(e))
-
- self.network.get_agent.assert_any_call(
- self.network_agents[0].id, ignore_missing=False)
- self.network.get_agent.assert_any_call(
- 'unexist_network_agent', ignore_missing=False)
- self.network.delete_agent.assert_called_once_with(
- self.network_agents[0]
- )
-
-
-class TestListNetworkAgent(TestNetworkAgent):
-
- network_agents = (
- network_fakes.FakeNetworkAgent.create_network_agents(count=3))
-
- columns = (
- 'ID',
- 'Agent Type',
- 'Host',
- 'Availability Zone',
- 'Alive',
- 'State',
- 'Binary'
- )
- data = []
- for agent in network_agents:
- data.append((
- agent.id,
- agent.agent_type,
- agent.host,
- agent.availability_zone,
- agent.alive,
- network_agent._format_admin_state(agent.admin_state_up),
- agent.binary,
- ))
-
- def setUp(self):
- super(TestListNetworkAgent, self).setUp()
- self.network.agents = mock.Mock(
- return_value=self.network_agents)
-
- # Get the command object to test
- self.cmd = network_agent.ListNetworkAgent(self.app, self.namespace)
-
- def test_network_agents_list(self):
- arglist = []
- verifylist = []
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.agents.assert_called_once_with(**{})
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestSetNetworkAgent(TestNetworkAgent):
-
- _network_agent = (
- network_fakes.FakeNetworkAgent.create_one_network_agent())
-
- def setUp(self):
- super(TestSetNetworkAgent, self).setUp()
- self.network.update_agent = mock.Mock(return_value=None)
- self.network.get_agent = mock.Mock(return_value=self._network_agent)
-
- # Get the command object to test
- self.cmd = network_agent.SetNetworkAgent(self.app, self.namespace)
-
- def test_set_nothing(self):
- arglist = [
- self._network_agent.id,
- ]
- verifylist = [
- ('network_agent', self._network_agent.id),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_agent.assert_called_once_with(
- self._network_agent, **attrs)
- self.assertIsNone(result)
-
- def test_set_all(self):
- arglist = [
- '--description', 'new_description',
- '--enable',
- self._network_agent.id,
- ]
- verifylist = [
- ('description', 'new_description'),
- ('enable', True),
- ('disable', False),
- ('network_agent', self._network_agent.id),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'description': 'new_description',
- 'admin_state_up': True,
- }
- self.network.update_agent.assert_called_once_with(
- self._network_agent, **attrs)
- self.assertIsNone(result)
-
- def test_set_with_disable(self):
- arglist = [
- '--disable',
- self._network_agent.id,
- ]
- verifylist = [
- ('enable', False),
- ('disable', True),
- ('network_agent', self._network_agent.id),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': False,
- }
- self.network.update_agent.assert_called_once_with(
- self._network_agent, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowNetworkAgent(TestNetworkAgent):
-
- _network_agent = (
- network_fakes.FakeNetworkAgent.create_one_network_agent())
-
- columns = (
- 'admin_state_up',
- 'agent_type',
- 'alive',
- 'availability_zone',
- 'binary',
- 'configurations',
- 'host',
- 'id',
- )
- data = (
- network_agent._format_admin_state(_network_agent.admin_state_up),
- _network_agent.agent_type,
- _network_agent.alive,
- _network_agent.availability_zone,
- _network_agent.binary,
- utils.format_dict(_network_agent.configurations),
- _network_agent.host,
- _network_agent.id,
- )
-
- def setUp(self):
- super(TestShowNetworkAgent, self).setUp()
- self.network.get_agent = mock.Mock(
- return_value=self._network_agent)
-
- # Get the command object to test
- self.cmd = network_agent.ShowNetworkAgent(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- # Missing required args should bail here
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._network_agent.id,
- ]
- verifylist = [
- ('network_agent', self._network_agent.id),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.get_agent.assert_called_once_with(
- self._network_agent.id, ignore_missing=False)
- self.assertEqual(self.columns, columns)
- self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/network/v2/test_network_rbac.py b/openstackclient/tests/network/v2/test_network_rbac.py
deleted file mode 100644
index 9250e91b..00000000
--- a/openstackclient/tests/network/v2/test_network_rbac.py
+++ /dev/null
@@ -1,430 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import network_rbac
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestNetworkRBAC(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestNetworkRBAC, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
-
-
-class TestCreateNetworkRBAC(TestNetworkRBAC):
-
- network_object = network_fakes.FakeNetwork.create_one_network()
- project = identity_fakes_v3.FakeProject.create_one_project()
- rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
- attrs={'tenant_id': project.id,
- 'target_tenant': project.id,
- 'object_id': network_object.id}
- )
-
- columns = (
- 'action',
- 'id',
- 'object_id',
- 'object_type',
- 'project_id',
- 'target_project_id',
- )
-
- data = [
- rbac_policy.action,
- rbac_policy.id,
- rbac_policy.object_id,
- rbac_policy.object_type,
- rbac_policy.tenant_id,
- rbac_policy.target_tenant,
- ]
-
- def setUp(self):
- super(TestCreateNetworkRBAC, self).setUp()
-
- # Get the command object to test
- self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace)
-
- self.network.create_rbac_policy = mock.Mock(
- return_value=self.rbac_policy)
- self.network.find_network = mock.Mock(
- return_value=self.network_object)
- self.projects_mock.get.return_value = self.project
-
- def test_network_rbac_create_no_type(self):
- arglist = [
- '--action', self.rbac_policy.action,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('action', self.rbac_policy.action),
- ('rbac_policy', self.rbac_policy.id),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_network_rbac_create_no_action(self):
- arglist = [
- '--type', self.rbac_policy.object_type,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('type', self.rbac_policy.object_type),
- ('rbac_policy', self.rbac_policy.id),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_network_rbac_create_invalid_type(self):
- arglist = [
- '--action', self.rbac_policy.action,
- '--type', 'invalid_type',
- '--target-project', self.rbac_policy.target_tenant,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('action', self.rbac_policy.action),
- ('type', 'invalid_type'),
- ('target-project', self.rbac_policy.target_tenant),
- ('rbac_policy', self.rbac_policy.id),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_network_rbac_create_invalid_action(self):
- arglist = [
- '--type', self.rbac_policy.object_type,
- '--action', 'invalid_action',
- '--target-project', self.rbac_policy.target_tenant,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('type', self.rbac_policy.object_type),
- ('action', 'invalid_action'),
- ('target-project', self.rbac_policy.target_tenant),
- ('rbac_policy', self.rbac_policy.id),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_network_rbac_create(self):
- arglist = [
- '--type', self.rbac_policy.object_type,
- '--action', self.rbac_policy.action,
- '--target-project', self.rbac_policy.target_tenant,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('type', self.rbac_policy.object_type),
- ('action', self.rbac_policy.action),
- ('target_project', self.rbac_policy.target_tenant),
- ('rbac_object', self.rbac_policy.object_id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # DisplayCommandBase.take_action() returns two tuples
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_rbac_policy.assert_called_with(**{
- 'object_id': self.rbac_policy.object_id,
- 'object_type': self.rbac_policy.object_type,
- 'action': self.rbac_policy.action,
- 'target_tenant': self.rbac_policy.target_tenant,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_network_rbac_create_all_options(self):
- arglist = [
- '--type', self.rbac_policy.object_type,
- '--action', self.rbac_policy.action,
- '--target-project', self.rbac_policy.target_tenant,
- '--project', self.rbac_policy.tenant_id,
- '--project-domain', self.project.domain_id,
- '--target-project-domain', self.project.domain_id,
- self.rbac_policy.object_id,
- ]
- verifylist = [
- ('type', self.rbac_policy.object_type),
- ('action', self.rbac_policy.action),
- ('target_project', self.rbac_policy.target_tenant),
- ('project', self.rbac_policy.tenant_id),
- ('project_domain', self.project.domain_id),
- ('target_project_domain', self.project.domain_id),
- ('rbac_object', self.rbac_policy.object_id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # DisplayCommandBase.take_action() returns two tuples
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_rbac_policy.assert_called_with(**{
- 'object_id': self.rbac_policy.object_id,
- 'object_type': self.rbac_policy.object_type,
- 'action': self.rbac_policy.action,
- 'target_tenant': self.rbac_policy.target_tenant,
- 'tenant_id': self.rbac_policy.tenant_id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestDeleteNetworkRBAC(TestNetworkRBAC):
-
- rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2)
-
- def setUp(self):
- super(TestDeleteNetworkRBAC, self).setUp()
- self.network.delete_rbac_policy = mock.Mock(return_value=None)
- self.network.find_rbac_policy = (
- network_fakes.FakeNetworkRBAC.get_network_rbacs(
- rbac_policies=self.rbac_policies)
- )
-
- # Get the command object to test
- self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace)
-
- def test_network_rbac_delete(self):
- arglist = [
- self.rbac_policies[0].id,
- ]
- verifylist = [
- ('rbac_policy', [self.rbac_policies[0].id]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.find_rbac_policy.assert_called_once_with(
- self.rbac_policies[0].id, ignore_missing=False)
- self.network.delete_rbac_policy.assert_called_once_with(
- self.rbac_policies[0])
- self.assertIsNone(result)
-
- def test_multi_network_rbacs_delete(self):
- arglist = []
- verifylist = []
-
- for r in self.rbac_policies:
- arglist.append(r.id)
- verifylist = [
- ('rbac_policy', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for r in self.rbac_policies:
- calls.append(call(r))
- self.network.delete_rbac_policy.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_network_policies_delete_with_exception(self):
- arglist = [
- self.rbac_policies[0].id,
- 'unexist_rbac_policy',
- ]
- verifylist = [
- ('rbac_policy',
- [self.rbac_policies[0].id, 'unexist_rbac_policy']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self.rbac_policies[0], exceptions.CommandError]
- self.network.find_rbac_policy = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e))
-
- self.network.find_rbac_policy.assert_any_call(
- self.rbac_policies[0].id, ignore_missing=False)
- self.network.find_rbac_policy.assert_any_call(
- 'unexist_rbac_policy', ignore_missing=False)
- self.network.delete_rbac_policy.assert_called_once_with(
- self.rbac_policies[0]
- )
-
-
-class TestListNetworkRABC(TestNetworkRBAC):
-
- # The network rbac policies going to be listed up.
- rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=3)
-
- columns = (
- 'ID',
- 'Object Type',
- 'Object ID',
- )
-
- data = []
- for r in rbac_policies:
- data.append((
- r.id,
- r.object_type,
- r.object_id,
- ))
-
- def setUp(self):
- super(TestListNetworkRABC, self).setUp()
-
- # Get the command object to test
- self.cmd = network_rbac.ListNetworkRBAC(self.app, self.namespace)
-
- self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies)
-
- def test_network_rbac_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # DisplayCommandBase.take_action() returns two tuples
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.rbac_policies.assert_called_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestSetNetworkRBAC(TestNetworkRBAC):
-
- project = identity_fakes_v3.FakeProject.create_one_project()
- rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
- attrs={'target_tenant': project.id})
-
- def setUp(self):
- super(TestSetNetworkRBAC, self).setUp()
-
- # Get the command object to test
- self.cmd = network_rbac.SetNetworkRBAC(self.app, self.namespace)
-
- self.network.find_rbac_policy = mock.Mock(
- return_value=self.rbac_policy)
- self.network.update_rbac_policy = mock.Mock(return_value=None)
- self.projects_mock.get.return_value = self.project
-
- def test_network_rbac_set_nothing(self):
- arglist = [
- self.rbac_policy.id,
- ]
- verifylist = [
- ('rbac_policy', self.rbac_policy.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.find_rbac_policy.assert_called_once_with(
- self.rbac_policy.id, ignore_missing=False
- )
- attrs = {}
- self.network.update_rbac_policy.assert_called_once_with(
- self.rbac_policy, **attrs)
- self.assertIsNone(result)
-
- def test_network_rbac_set(self):
- arglist = [
- '--target-project', self.project.id,
- self.rbac_policy.id,
- ]
- verifylist = [
- ('target_project', self.project.id),
- ('rbac_policy', self.rbac_policy.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.find_rbac_policy.assert_called_once_with(
- self.rbac_policy.id, ignore_missing=False
- )
- attrs = {'target_tenant': self.project.id}
- self.network.update_rbac_policy.assert_called_once_with(
- self.rbac_policy, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowNetworkRBAC(TestNetworkRBAC):
-
- rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac()
-
- columns = (
- 'action',
- 'id',
- 'object_id',
- 'object_type',
- 'project_id',
- 'target_project_id',
- )
-
- data = [
- rbac_policy.action,
- rbac_policy.id,
- rbac_policy.object_id,
- rbac_policy.object_type,
- rbac_policy.tenant_id,
- rbac_policy.target_tenant,
- ]
-
- def setUp(self):
- super(TestShowNetworkRBAC, self).setUp()
-
- # Get the command object to test
- self.cmd = network_rbac.ShowNetworkRBAC(self.app, self.namespace)
-
- self.network.find_rbac_policy = mock.Mock(
- return_value=self.rbac_policy)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_network_rbac_show_all_options(self):
- arglist = [
- self.rbac_policy.id,
- ]
- verifylist = [
- ('rbac_policy', self.rbac_policy.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # DisplayCommandBase.take_action() returns two tuples
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_rbac_policy.assert_called_with(
- self.rbac_policy.id, ignore_missing=False
- )
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/network/v2/test_network_segment.py b/openstackclient/tests/network/v2/test_network_segment.py
deleted file mode 100644
index a635d845..00000000
--- a/openstackclient/tests/network/v2/test_network_segment.py
+++ /dev/null
@@ -1,200 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import network_segment
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestNetworkSegment(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestNetworkSegment, self).setUp()
-
- # Enable beta commands.
- self.app.options.os_beta_command = True
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestListNetworkSegment(TestNetworkSegment):
- _network = network_fakes.FakeNetwork.create_one_network()
- _network_segments = \
- network_fakes.FakeNetworkSegment.create_network_segments(count=3)
-
- columns = (
- 'ID',
- 'Network',
- 'Network Type',
- 'Segment',
- )
- columns_long = columns + (
- 'Physical Network',
- )
-
- data = []
- for _network_segment in _network_segments:
- data.append((
- _network_segment.id,
- _network_segment.network_id,
- _network_segment.network_type,
- _network_segment.segmentation_id,
- ))
-
- data_long = []
- for _network_segment in _network_segments:
- data_long.append((
- _network_segment.id,
- _network_segment.network_id,
- _network_segment.network_type,
- _network_segment.segmentation_id,
- _network_segment.physical_network,
- ))
-
- def setUp(self):
- super(TestListNetworkSegment, self).setUp()
-
- # Get the command object to test
- self.cmd = network_segment.ListNetworkSegment(self.app, self.namespace)
-
- self.network.find_network = mock.Mock(return_value=self._network)
- self.network.segments = mock.Mock(return_value=self._network_segments)
-
- def test_list_no_option(self):
- arglist = []
- verifylist = [
- ('long', False),
- ('network', None),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.segments.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_list_no_beta_commands(self):
- self.app.options.os_beta_command = False
- parsed_args = self.check_parser(self.cmd, [], [])
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- def test_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ('network', None),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.segments.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertEqual(self.data_long, list(data))
-
- def test_list_network(self):
- arglist = [
- '--network',
- self._network.id,
- ]
- verifylist = [
- ('long', False),
- ('network', self._network.id)
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.segments.assert_called_once_with(
- **{'network_id': self._network.id}
- )
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowNetworkSegment(TestNetworkSegment):
-
- # The network segment to show.
- _network_segment = \
- network_fakes.FakeNetworkSegment.create_one_network_segment()
-
- columns = (
- 'id',
- 'network_id',
- 'network_type',
- 'physical_network',
- 'segmentation_id',
- )
-
- data = (
- _network_segment.id,
- _network_segment.network_id,
- _network_segment.network_type,
- _network_segment.physical_network,
- _network_segment.segmentation_id,
- )
-
- def setUp(self):
- super(TestShowNetworkSegment, self).setUp()
-
- self.network.find_segment = mock.Mock(
- return_value=self._network_segment
- )
-
- # Get the command object to test
- self.cmd = network_segment.ShowNetworkSegment(self.app, self.namespace)
-
- def test_show_no_options(self):
- # Missing required args should bail here
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, [], [])
-
- def test_show_no_beta_commands(self):
- arglist = [
- self._network_segment.id,
- ]
- verifylist = [
- ('network_segment', self._network_segment.id),
- ]
- self.app.options.os_beta_command = False
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- def test_show_all_options(self):
- arglist = [
- self._network_segment.id,
- ]
- verifylist = [
- ('network_segment', self._network_segment.id),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_segment.assert_called_once_with(
- self._network_segment.id,
- ignore_missing=False
- )
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_port.py b/openstackclient/tests/network/v2/test_port.py
deleted file mode 100644
index a1cecec8..00000000
--- a/openstackclient/tests/network/v2/test_port.py
+++ /dev/null
@@ -1,696 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import argparse
-import mock
-
-from mock import call
-from osc_lib import exceptions
-from osc_lib import utils
-
-from openstackclient.network.v2 import port
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestPort(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestPort, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
- def _get_common_cols_data(self, fake_port):
- columns = (
- 'admin_state_up',
- 'allowed_address_pairs',
- 'binding_host_id',
- 'binding_profile',
- 'binding_vif_details',
- 'binding_vif_type',
- 'binding_vnic_type',
- 'device_id',
- 'device_owner',
- 'dns_assignment',
- 'dns_name',
- 'extra_dhcp_opts',
- 'fixed_ips',
- 'id',
- 'mac_address',
- 'name',
- 'network_id',
- 'port_security_enabled',
- 'project_id',
- 'security_groups',
- 'status',
- )
-
- data = (
- port._format_admin_state(fake_port.admin_state_up),
- utils.format_list_of_dicts(fake_port.allowed_address_pairs),
- fake_port.binding_host_id,
- utils.format_dict(fake_port.binding_profile),
- utils.format_dict(fake_port.binding_vif_details),
- fake_port.binding_vif_type,
- fake_port.binding_vnic_type,
- fake_port.device_id,
- fake_port.device_owner,
- utils.format_list_of_dicts(fake_port.dns_assignment),
- fake_port.dns_name,
- utils.format_list_of_dicts(fake_port.extra_dhcp_opts),
- utils.format_list_of_dicts(fake_port.fixed_ips),
- fake_port.id,
- fake_port.mac_address,
- fake_port.name,
- fake_port.network_id,
- fake_port.port_security_enabled,
- fake_port.project_id,
- utils.format_list(fake_port.security_groups),
- fake_port.status,
- )
-
- return columns, data
-
-
-class TestCreatePort(TestPort):
-
- _port = network_fakes.FakePort.create_one_port()
-
- def setUp(self):
- super(TestCreatePort, self).setUp()
-
- self.network.create_port = mock.Mock(return_value=self._port)
- fake_net = network_fakes.FakeNetwork.create_one_network({
- 'id': self._port.network_id,
- })
- self.network.find_network = mock.Mock(return_value=fake_net)
- self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
- self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
- # Get the command object to test
- self.cmd = port.CreatePort(self.app, self.namespace)
-
- def test_create_default_options(self):
- arglist = [
- '--network', self._port.network_id,
- 'test-port',
- ]
- verifylist = [
- ('network', self._port.network_id,),
- ('enable', True),
- ('name', 'test-port'),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_port.assert_called_once_with(**{
- 'admin_state_up': True,
- 'network_id': self._port.network_id,
- 'name': 'test-port',
- })
-
- ref_columns, ref_data = self._get_common_cols_data(self._port)
- self.assertEqual(ref_columns, columns)
- self.assertEqual(ref_data, data)
-
- def test_create_full_options(self):
- arglist = [
- '--mac-address', 'aa:aa:aa:aa:aa:aa',
- '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2'
- % self.fake_subnet.id,
- '--device', 'deviceid',
- '--device-owner', 'fakeowner',
- '--disable',
- '--vnic-type', 'macvtap',
- '--binding-profile', 'foo=bar',
- '--binding-profile', 'foo2=bar2',
- '--network', self._port.network_id,
- 'test-port',
-
- ]
- verifylist = [
- ('mac_address', 'aa:aa:aa:aa:aa:aa'),
- (
- 'fixed_ip',
- [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}]
- ),
- ('device', 'deviceid'),
- ('device_owner', 'fakeowner'),
- ('disable', True),
- ('vnic_type', 'macvtap'),
- ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
- ('network', self._port.network_id),
- ('name', 'test-port'),
-
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_port.assert_called_once_with(**{
- 'mac_address': 'aa:aa:aa:aa:aa:aa',
- 'fixed_ips': [{'subnet_id': self.fake_subnet.id,
- 'ip_address': '10.0.0.2'}],
- 'device_id': 'deviceid',
- 'device_owner': 'fakeowner',
- 'admin_state_up': False,
- 'binding:vnic_type': 'macvtap',
- 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
- 'network_id': self._port.network_id,
- 'name': 'test-port',
- })
-
- ref_columns, ref_data = self._get_common_cols_data(self._port)
- self.assertEqual(ref_columns, columns)
- self.assertEqual(ref_data, data)
-
- def test_create_invalid_json_binding_profile(self):
- arglist = [
- '--network', self._port.network_id,
- '--binding-profile', '{"parent_name":"fake_parent"',
- 'test-port',
- ]
- self.assertRaises(argparse.ArgumentTypeError,
- self.check_parser,
- self.cmd,
- arglist,
- None)
-
- def test_create_invalid_key_value_binding_profile(self):
- arglist = [
- '--network', self._port.network_id,
- '--binding-profile', 'key',
- 'test-port',
- ]
- self.assertRaises(argparse.ArgumentTypeError,
- self.check_parser,
- self.cmd,
- arglist,
- None)
-
- def test_create_json_binding_profile(self):
- arglist = [
- '--network', self._port.network_id,
- '--binding-profile', '{"parent_name":"fake_parent"}',
- '--binding-profile', '{"tag":42}',
- 'test-port',
- ]
- verifylist = [
- ('network', self._port.network_id,),
- ('enable', True),
- ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}),
- ('name', 'test-port'),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_port.assert_called_once_with(**{
- 'admin_state_up': True,
- 'network_id': self._port.network_id,
- 'binding:profile': {'parent_name': 'fake_parent', 'tag': 42},
- 'name': 'test-port',
- })
-
- ref_columns, ref_data = self._get_common_cols_data(self._port)
- self.assertEqual(ref_columns, columns)
- self.assertEqual(ref_data, data)
-
-
-class TestDeletePort(TestPort):
-
- # Ports to delete.
- _ports = network_fakes.FakePort.create_ports(count=2)
-
- def setUp(self):
- super(TestDeletePort, self).setUp()
-
- self.network.delete_port = mock.Mock(return_value=None)
- self.network.find_port = network_fakes.FakePort.get_ports(
- ports=self._ports)
- # Get the command object to test
- self.cmd = port.DeletePort(self.app, self.namespace)
-
- def test_port_delete(self):
- arglist = [
- self._ports[0].name,
- ]
- verifylist = [
- ('port', [self._ports[0].name]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.find_port.assert_called_once_with(
- self._ports[0].name, ignore_missing=False)
- self.network.delete_port.assert_called_once_with(self._ports[0])
- self.assertIsNone(result)
-
- def test_multi_ports_delete(self):
- arglist = []
- verifylist = []
-
- for p in self._ports:
- arglist.append(p.name)
- verifylist = [
- ('port', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for p in self._ports:
- calls.append(call(p))
- self.network.delete_port.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_ports_delete_with_exception(self):
- arglist = [
- self._ports[0].name,
- 'unexist_port',
- ]
- verifylist = [
- ('port',
- [self._ports[0].name, 'unexist_port']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._ports[0], exceptions.CommandError]
- self.network.find_port = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 ports failed to delete.', str(e))
-
- self.network.find_port.assert_any_call(
- self._ports[0].name, ignore_missing=False)
- self.network.find_port.assert_any_call(
- 'unexist_port', ignore_missing=False)
- self.network.delete_port.assert_called_once_with(
- self._ports[0]
- )
-
-
-class TestListPort(TestPort):
-
- _ports = network_fakes.FakePort.create_ports(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'MAC Address',
- 'Fixed IP Addresses',
- )
-
- data = []
- for prt in _ports:
- data.append((
- prt.id,
- prt.name,
- prt.mac_address,
- utils.format_list_of_dicts(prt.fixed_ips),
- ))
-
- def setUp(self):
- super(TestListPort, self).setUp()
-
- # Get the command object to test
- self.cmd = port.ListPort(self.app, self.namespace)
- self.network.ports = mock.Mock(return_value=self._ports)
- fake_router = network_fakes.FakeRouter.create_one_router({
- 'id': 'fake-router-id',
- })
- self.network.find_router = mock.Mock(return_value=fake_router)
-
- def test_port_list_no_options(self):
- arglist = []
- verifylist = []
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.ports.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_port_list_router_opt(self):
- arglist = [
- '--router', 'fake-router-name',
- ]
-
- verifylist = [
- ('router', 'fake-router-name')
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.ports.assert_called_once_with(**{
- 'device_id': 'fake-router-id'
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_port_list_device_owner_opt(self):
- arglist = [
- '--device-owner', self._ports[0].device_owner,
- ]
-
- verifylist = [
- ('device_owner', self._ports[0].device_owner)
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.ports.assert_called_once_with(**{
- 'device_owner': self._ports[0].device_owner
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_port_list_all_opt(self):
- arglist = [
- '--device-owner', self._ports[0].device_owner,
- '--router', 'fake-router-name',
- ]
-
- verifylist = [
- ('device_owner', self._ports[0].device_owner),
- ('router', 'fake-router-name')
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.ports.assert_called_once_with(**{
- 'device_owner': self._ports[0].device_owner,
- 'device_id': 'fake-router-id'
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestSetPort(TestPort):
-
- _port = network_fakes.FakePort.create_one_port()
-
- def setUp(self):
- super(TestSetPort, self).setUp()
- self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
- self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
- self.network.find_port = mock.Mock(return_value=self._port)
- self.network.update_port = mock.Mock(return_value=None)
-
- # Get the command object to test
- self.cmd = port.SetPort(self.app, self.namespace)
-
- def test_set_fixed_ip(self):
- arglist = [
- '--fixed-ip', 'ip-address=10.0.0.11',
- self._port.name,
- ]
- verifylist = [
- ('fixed_ip', [{'ip-address': '10.0.0.11'}]),
- ('port', self._port.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'fixed_ips': [{'ip_address': '10.0.0.11'}],
- }
- self.network.update_port.assert_called_once_with(self._port, **attrs)
- self.assertIsNone(result)
-
- def test_append_fixed_ip(self):
- _testport = network_fakes.FakePort.create_one_port(
- {'fixed_ips': [{'ip_address': '0.0.0.1'}]})
- self.network.find_port = mock.Mock(return_value=_testport)
- arglist = [
- '--fixed-ip', 'ip-address=10.0.0.12',
- _testport.name,
- ]
- verifylist = [
- ('fixed_ip', [{'ip-address': '10.0.0.12'}]),
- ('port', _testport.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'fixed_ips': [
- {'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}],
- }
- self.network.update_port.assert_called_once_with(_testport, **attrs)
- self.assertIsNone(result)
-
- def test_set_this(self):
- arglist = [
- '--disable',
- '--no-fixed-ip',
- '--no-binding-profile',
- self._port.name,
- ]
- verifylist = [
- ('disable', True),
- ('no_binding_profile', True),
- ('no_fixed_ip', True),
- ('port', self._port.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': False,
- 'binding:profile': {},
- 'fixed_ips': [],
- }
- self.network.update_port.assert_called_once_with(self._port, **attrs)
- self.assertIsNone(result)
-
- def test_set_that(self):
- arglist = [
- '--enable',
- '--vnic-type', 'macvtap',
- '--binding-profile', 'foo=bar',
- '--host', 'binding-host-id-xxxx',
- '--name', 'newName',
- self._port.name,
- ]
- verifylist = [
- ('enable', True),
- ('vnic_type', 'macvtap'),
- ('binding_profile', {'foo': 'bar'}),
- ('host', 'binding-host-id-xxxx'),
- ('name', 'newName'),
- ('port', self._port.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': True,
- 'binding:vnic_type': 'macvtap',
- 'binding:profile': {'foo': 'bar'},
- 'binding:host_id': 'binding-host-id-xxxx',
- 'name': 'newName',
- }
- self.network.update_port.assert_called_once_with(self._port, **attrs)
- self.assertIsNone(result)
-
- def test_set_nothing(self):
- arglist = [
- self._port.name,
- ]
- verifylist = [
- ('port', self._port.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_port.assert_called_once_with(self._port, **attrs)
- self.assertIsNone(result)
-
- def test_set_invalid_json_binding_profile(self):
- arglist = [
- '--binding-profile', '{"parent_name"}',
- 'test-port',
- ]
- self.assertRaises(argparse.ArgumentTypeError,
- self.check_parser,
- self.cmd,
- arglist,
- None)
-
- def test_set_invalid_key_value_binding_profile(self):
- arglist = [
- '--binding-profile', 'key',
- 'test-port',
- ]
- self.assertRaises(argparse.ArgumentTypeError,
- self.check_parser,
- self.cmd,
- arglist,
- None)
-
- def test_set_mixed_binding_profile(self):
- arglist = [
- '--binding-profile', 'foo=bar',
- '--binding-profile', '{"foo2": "bar2"}',
- self._port.name,
- ]
- verifylist = [
- ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
- ('port', self._port.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
- }
- self.network.update_port.assert_called_once_with(self._port, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowPort(TestPort):
-
- # The port to show.
- _port = network_fakes.FakePort.create_one_port()
-
- def setUp(self):
- super(TestShowPort, self).setUp()
-
- self.network.find_port = mock.Mock(return_value=self._port)
-
- # Get the command object to test
- self.cmd = port.ShowPort(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._port.name,
- ]
- verifylist = [
- ('port', self._port.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_port.assert_called_once_with(
- self._port.name, ignore_missing=False)
-
- ref_columns, ref_data = self._get_common_cols_data(self._port)
- self.assertEqual(ref_columns, columns)
- self.assertEqual(ref_data, data)
-
-
-class TestUnsetPort(TestPort):
-
- def setUp(self):
- super(TestUnsetPort, self).setUp()
- self._testport = network_fakes.FakePort.create_one_port(
- {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
- 'ip_address': '0.0.0.1'},
- {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
- 'ip_address': '1.0.0.0'}],
- 'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'}})
- self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet(
- {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'})
- self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
- self.network.find_port = mock.Mock(return_value=self._testport)
- self.network.update_port = mock.Mock(return_value=None)
- # Get the command object to test
- self.cmd = port.UnsetPort(self.app, self.namespace)
-
- def test_unset_port_parameters(self):
- arglist = [
- '--fixed-ip',
- 'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0',
- '--binding-profile', 'Superman',
- self._testport.name,
- ]
- verifylist = [
- ('fixed_ip', [{
- 'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152',
- 'ip-address': '1.0.0.0'}]),
- ('binding_profile', ['Superman']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'fixed_ips': [{
- 'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
- 'ip_address': '0.0.0.1'}],
- 'binding:profile': {'batman': 'Joker'}
- }
- self.network.update_port.assert_called_once_with(
- self._testport, **attrs)
- self.assertIsNone(result)
-
- def test_unset_port_fixed_ip_not_existent(self):
- arglist = [
- '--fixed-ip', 'ip-address=1.0.0.1',
- '--binding-profile', 'Superman',
- self._testport.name,
- ]
- verifylist = [
- ('fixed_ip', [{'ip-address': '1.0.0.1'}]),
- ('binding_profile', ['Superman']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action,
- parsed_args)
-
- def test_unset_port_binding_profile_not_existent(self):
- arglist = [
- '--fixed-ip', 'ip-address=1.0.0.0',
- '--binding-profile', 'Neo',
- self._testport.name,
- ]
- verifylist = [
- ('fixed_ip', [{'ip-address': '1.0.0.0'}]),
- ('binding_profile', ['Neo']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action,
- parsed_args)
diff --git a/openstackclient/tests/network/v2/test_router.py b/openstackclient/tests/network/v2/test_router.py
deleted file mode 100644
index 1ef4707b..00000000
--- a/openstackclient/tests/network/v2/test_router.py
+++ /dev/null
@@ -1,751 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-from osc_lib import utils as osc_utils
-
-from openstackclient.network.v2 import router
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestRouter(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestRouter, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestAddPortToRouter(TestRouter):
- '''Add port to Router '''
-
- _port = network_fakes.FakePort.create_one_port()
- _router = network_fakes.FakeRouter.create_one_router(
- attrs={'port': _port.id})
-
- def setUp(self):
- super(TestAddPortToRouter, self).setUp()
- self.network.router_add_interface = mock.Mock()
- self.cmd = router.AddPortToRouter(self.app, self.namespace)
- self.network.find_router = mock.Mock(return_value=self._router)
- self.network.find_port = mock.Mock(return_value=self._port)
-
- def test_add_port_no_option(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_add_port_required_options(self):
- arglist = [
- self._router.id,
- self._router.port,
- ]
- verifylist = [
- ('router', self._router.id),
- ('port', self._router.port),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.router_add_interface.assert_called_with(self._router, **{
- 'port_id': self._router.port,
- })
- self.assertIsNone(result)
-
-
-class TestAddSubnetToRouter(TestRouter):
- '''Add subnet to Router '''
-
- _subnet = network_fakes.FakeSubnet.create_one_subnet()
- _router = network_fakes.FakeRouter.create_one_router(
- attrs={'subnet': _subnet.id})
-
- def setUp(self):
- super(TestAddSubnetToRouter, self).setUp()
- self.network.router_add_interface = mock.Mock()
- self.cmd = router.AddSubnetToRouter(self.app, self.namespace)
- self.network.find_router = mock.Mock(return_value=self._router)
- self.network.find_subnet = mock.Mock(return_value=self._subnet)
-
- def test_add_subnet_no_option(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_add_subnet_required_options(self):
- arglist = [
- self._router.id,
- self._router.subnet,
- ]
- verifylist = [
- ('router', self._router.id),
- ('subnet', self._router.subnet),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.router_add_interface.assert_called_with(
- self._router, **{'subnet_id': self._router.subnet})
-
- self.assertIsNone(result)
-
-
-class TestCreateRouter(TestRouter):
-
- # The new router created.
- new_router = network_fakes.FakeRouter.create_one_router()
-
- columns = (
- 'admin_state_up',
- 'availability_zone_hints',
- 'availability_zones',
- 'distributed',
- 'external_gateway_info',
- 'ha',
- 'id',
- 'name',
- 'project_id',
- 'routes',
- 'status',
- )
- data = (
- router._format_admin_state(new_router.admin_state_up),
- osc_utils.format_list(new_router.availability_zone_hints),
- osc_utils.format_list(new_router.availability_zones),
- new_router.distributed,
- router._format_external_gateway_info(new_router.external_gateway_info),
- new_router.ha,
- new_router.id,
- new_router.name,
- new_router.tenant_id,
- router._format_routes(new_router.routes),
- new_router.status,
- )
-
- def setUp(self):
- super(TestCreateRouter, self).setUp()
-
- self.network.create_router = mock.Mock(return_value=self.new_router)
-
- # Get the command object to test
- self.cmd = router.CreateRouter(self.app, self.namespace)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self.new_router.name,
- ]
- verifylist = [
- ('name', self.new_router.name),
- ('enable', True),
- ('distributed', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_router.assert_called_once_with(**{
- 'admin_state_up': True,
- 'name': self.new_router.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_with_AZ_hints(self):
- arglist = [
- self.new_router.name,
- '--availability-zone-hint', 'fake-az',
- '--availability-zone-hint', 'fake-az2',
- ]
- verifylist = [
- ('name', self.new_router.name),
- ('availability_zone_hints', ['fake-az', 'fake-az2']),
- ('enable', True),
- ('distributed', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
- self.network.create_router.assert_called_once_with(**{
- 'admin_state_up': True,
- 'name': self.new_router.name,
- 'availability_zone_hints': ['fake-az', 'fake-az2'],
- })
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteRouter(TestRouter):
-
- # The routers to delete.
- _routers = network_fakes.FakeRouter.create_routers(count=2)
-
- def setUp(self):
- super(TestDeleteRouter, self).setUp()
-
- self.network.delete_router = mock.Mock(return_value=None)
-
- self.network.find_router = (
- network_fakes.FakeRouter.get_routers(self._routers))
-
- # Get the command object to test
- self.cmd = router.DeleteRouter(self.app, self.namespace)
-
- def test_router_delete(self):
- arglist = [
- self._routers[0].name,
- ]
- verifylist = [
- ('router', [self._routers[0].name]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.delete_router.assert_called_once_with(self._routers[0])
- self.assertIsNone(result)
-
- def test_multi_routers_delete(self):
- arglist = []
- verifylist = []
-
- for r in self._routers:
- arglist.append(r.name)
- verifylist = [
- ('router', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for r in self._routers:
- calls.append(call(r))
- self.network.delete_router.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_routers_delete_with_exception(self):
- arglist = [
- self._routers[0].name,
- 'unexist_router',
- ]
- verifylist = [
- ('router',
- [self._routers[0].name, 'unexist_router']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._routers[0], exceptions.CommandError]
- self.network.find_router = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 routers failed to delete.', str(e))
-
- self.network.find_router.assert_any_call(
- self._routers[0].name, ignore_missing=False)
- self.network.find_router.assert_any_call(
- 'unexist_router', ignore_missing=False)
- self.network.delete_router.assert_called_once_with(
- self._routers[0]
- )
-
-
-class TestListRouter(TestRouter):
-
- # The routers going to be listed up.
- routers = network_fakes.FakeRouter.create_routers(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Status',
- 'State',
- 'Distributed',
- 'HA',
- 'Project',
- )
- columns_long = columns + (
- 'Routes',
- 'External gateway info',
- 'Availability zones'
- )
-
- data = []
- for r in routers:
- data.append((
- r.id,
- r.name,
- r.status,
- router._format_admin_state(r.admin_state_up),
- r.distributed,
- r.ha,
- r.tenant_id,
- ))
- data_long = []
- for i in range(0, len(routers)):
- r = routers[i]
- data_long.append(
- data[i] + (
- router._format_routes(r.routes),
- router._format_external_gateway_info(r.external_gateway_info),
- osc_utils.format_list(r.availability_zones),
- )
- )
-
- def setUp(self):
- super(TestListRouter, self).setUp()
-
- # Get the command object to test
- self.cmd = router.ListRouter(self.app, self.namespace)
-
- self.network.routers = mock.Mock(return_value=self.routers)
-
- def test_router_list_no_options(self):
- arglist = []
- verifylist = [
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.routers.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_router_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class Lister in cliff, abstract method take_action()
- # returns a tuple containing the column names and an iterable
- # containing the data to be listed.
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.routers.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertEqual(self.data_long, list(data))
-
-
-class TestRemovePortFromRouter(TestRouter):
- '''Remove port from a Router '''
-
- _port = network_fakes.FakePort.create_one_port()
- _router = network_fakes.FakeRouter.create_one_router(
- attrs={'port': _port.id})
-
- def setUp(self):
- super(TestRemovePortFromRouter, self).setUp()
- self.network.router_remove_interface = mock.Mock()
- self.cmd = router.RemovePortFromRouter(self.app, self.namespace)
- self.network.find_router = mock.Mock(return_value=self._router)
- self.network.find_port = mock.Mock(return_value=self._port)
-
- def test_remove_port_no_option(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_remove_port_required_options(self):
- arglist = [
- self._router.id,
- self._router.port,
- ]
- verifylist = [
- ('router', self._router.id),
- ('port', self._router.port),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.router_remove_interface.assert_called_with(
- self._router, **{'port_id': self._router.port})
- self.assertIsNone(result)
-
-
-class TestRemoveSubnetFromRouter(TestRouter):
- '''Remove subnet from Router '''
-
- _subnet = network_fakes.FakeSubnet.create_one_subnet()
- _router = network_fakes.FakeRouter.create_one_router(
- attrs={'subnet': _subnet.id})
-
- def setUp(self):
- super(TestRemoveSubnetFromRouter, self).setUp()
- self.network.router_remove_interface = mock.Mock()
- self.cmd = router.RemoveSubnetFromRouter(self.app, self.namespace)
- self.network.find_router = mock.Mock(return_value=self._router)
- self.network.find_subnet = mock.Mock(return_value=self._subnet)
-
- def test_remove_subnet_no_option(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_remove_subnet_required_options(self):
- arglist = [
- self._router.id,
- self._router.subnet,
- ]
- verifylist = [
- ('subnet', self._router.subnet),
- ('router', self._router.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.router_remove_interface.assert_called_with(
- self._router, **{'subnet_id': self._router.subnet})
- self.assertIsNone(result)
-
-
-class TestSetRouter(TestRouter):
-
- # The router to set.
- _default_route = {'destination': '10.20.20.0/24', 'nexthop': '10.20.30.1'}
- _router = network_fakes.FakeRouter.create_one_router(
- attrs={'routes': [_default_route]}
- )
-
- def setUp(self):
- super(TestSetRouter, self).setUp()
-
- self.network.update_router = mock.Mock(return_value=None)
-
- self.network.find_router = mock.Mock(return_value=self._router)
-
- # Get the command object to test
- self.cmd = router.SetRouter(self.app, self.namespace)
-
- def test_set_this(self):
- arglist = [
- self._router.name,
- '--enable',
- '--distributed',
- '--name', 'noob',
- ]
- verifylist = [
- ('router', self._router.name),
- ('enable', True),
- ('distributed', True),
- ('name', 'noob'),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': True,
- 'distributed': True,
- 'name': 'noob',
- }
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
- def test_set_that(self):
- arglist = [
- self._router.name,
- '--disable',
- '--centralized',
- ]
- verifylist = [
- ('router', self._router.name),
- ('disable', True),
- ('centralized', True),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': False,
- 'distributed': False,
- }
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
- def test_set_distributed_centralized(self):
- arglist = [
- self._router.name,
- '--distributed',
- '--centralized',
- ]
- verifylist = [
- ('router', self._router.name),
- ('distributed', True),
- ('distributed', False),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_set_route(self):
- arglist = [
- self._router.name,
- '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
- ]
- verifylist = [
- ('router', self._router.name),
- ('routes', [{'destination': '10.20.30.0/24',
- 'gateway': '10.20.30.1'}]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'routes': self._router.routes + [{'destination': '10.20.30.0/24',
- 'nexthop': '10.20.30.1'}],
- }
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_route(self):
- arglist = [
- self._router.name,
- '--no-route',
- ]
- verifylist = [
- ('router', self._router.name),
- ('no_route', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'routes': [],
- }
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
- def test_set_route_no_route(self):
- arglist = [
- self._router.name,
- '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
- '--no-route',
- ]
- verifylist = [
- ('router', self._router.name),
- ('routes', [{'destination': '10.20.30.0/24',
- 'gateway': '10.20.30.1'}]),
- ('no_route', True),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_set_clear_routes(self):
- arglist = [
- self._router.name,
- '--clear-routes',
- ]
- verifylist = [
- ('router', self._router.name),
- ('clear_routes', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'routes': [],
- }
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
- def test_set_route_clear_routes(self):
- arglist = [
- self._router.name,
- '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
- '--clear-routes',
- ]
- verifylist = [
- ('router', self._router.name),
- ('routes', [{'destination': '10.20.30.0/24',
- 'gateway': '10.20.30.1'}]),
- ('clear_routes', True),
- ]
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_set_nothing(self):
- arglist = [
- self._router.name,
- ]
- verifylist = [
- ('router', self._router.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_router.assert_called_once_with(
- self._router, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowRouter(TestRouter):
-
- # The router to set.
- _router = network_fakes.FakeRouter.create_one_router()
-
- columns = (
- 'admin_state_up',
- 'availability_zone_hints',
- 'availability_zones',
- 'distributed',
- 'external_gateway_info',
- 'ha',
- 'id',
- 'name',
- 'project_id',
- 'routes',
- 'status',
- )
- data = (
- router._format_admin_state(_router.admin_state_up),
- osc_utils.format_list(_router.availability_zone_hints),
- osc_utils.format_list(_router.availability_zones),
- _router.distributed,
- router._format_external_gateway_info(_router.external_gateway_info),
- _router.ha,
- _router.id,
- _router.name,
- _router.tenant_id,
- router._format_routes(_router.routes),
- _router.status,
- )
-
- def setUp(self):
- super(TestShowRouter, self).setUp()
-
- self.network.find_router = mock.Mock(return_value=self._router)
-
- # Get the command object to test
- self.cmd = router.ShowRouter(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._router.name,
- ]
- verifylist = [
- ('router', self._router.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_router.assert_called_once_with(
- self._router.name, ignore_missing=False)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestUnsetRouter(TestRouter):
-
- def setUp(self):
- super(TestUnsetRouter, self).setUp()
- self._testrouter = network_fakes.FakeRouter.create_one_router(
- {'routes': [{"destination": "192.168.101.1/24",
- "gateway": "172.24.4.3"},
- {"destination": "192.168.101.2/24",
- "gateway": "172.24.4.3"}], })
- self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
- self.network.find_router = mock.Mock(return_value=self._testrouter)
- self.network.update_router = mock.Mock(return_value=None)
- # Get the command object to test
- self.cmd = router.UnsetRouter(self.app, self.namespace)
-
- def test_unset_router_params(self):
- arglist = [
- '--route', 'destination=192.168.101.1/24,gateway=172.24.4.3',
- self._testrouter.name,
- ]
- verifylist = [
- ('routes', [
- {"destination": "192.168.101.1/24", "gateway": "172.24.4.3"}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'routes': [{"destination": "192.168.101.2/24",
- "nexthop": "172.24.4.3"}],
- }
- self.network.update_router.assert_called_once_with(
- self._testrouter, **attrs)
- self.assertIsNone(result)
-
- def test_unset_router_wrong_routes(self):
- arglist = [
- '--route', 'destination=192.168.101.1/24,gateway=172.24.4.2',
- self._testrouter.name,
- ]
- verifylist = [
- ('routes', [
- {"destination": "192.168.101.1/24", "gateway": "172.24.4.2"}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action, parsed_args)
diff --git a/openstackclient/tests/network/v2/test_security_group.py b/openstackclient/tests/network/v2/test_security_group.py
deleted file mode 100644
index 15f4cffe..00000000
--- a/openstackclient/tests/network/v2/test_security_group.py
+++ /dev/null
@@ -1,770 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-
-from openstackclient.network.v2 import security_group
-from openstackclient.tests.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.identity.v3 import fakes as identity_fakes
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestSecurityGroupNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestSecurityGroupNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestSecurityGroupCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestSecurityGroupCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
-class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group to be created.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group()
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.project_id,
- '',
- )
-
- def setUp(self):
- super(TestCreateSecurityGroupNetwork, self).setUp()
-
- self.network.create_security_group = mock.Mock(
- return_value=self._security_group)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- # Get the command object to test
- self.cmd = security_group.CreateSecurityGroup(self.app, self.namespace)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_min_options(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('name', self._security_group.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group.assert_called_once_with(**{
- 'description': self._security_group.name,
- 'name': self._security_group.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- '--description', self._security_group.description,
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.name,
- ]
- verifylist = [
- ('description', self._security_group.description),
- ('name', self._security_group.name),
- ('project', self.project.name),
- ('project_domain', self.domain.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group.assert_called_once_with(**{
- 'description': self._security_group.description,
- 'name': self._security_group.name,
- 'tenant_id': self.project.id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group to be shown.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
- '',
- )
-
- def setUp(self):
- super(TestCreateSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.create.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group.CreateSecurityGroup(self.app, None)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_network_options(self):
- arglist = [
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.name,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_min_options(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('name', self._security_group.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.name)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- '--description', self._security_group.description,
- self._security_group.name,
- ]
- verifylist = [
- ('description', self._security_group.description),
- ('name', self._security_group.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.description)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
-
- # The security groups to be deleted.
- _security_groups = \
- network_fakes.FakeSecurityGroup.create_security_groups()
-
- def setUp(self):
- super(TestDeleteSecurityGroupNetwork, self).setUp()
-
- self.network.delete_security_group = mock.Mock(return_value=None)
-
- self.network.find_security_group = (
- network_fakes.FakeSecurityGroup.get_security_groups(
- self._security_groups)
- )
-
- # Get the command object to test
- self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace)
-
- def test_security_group_delete(self):
- arglist = [
- self._security_groups[0].name,
- ]
- verifylist = [
- ('group', [self._security_groups[0].name]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.network.delete_security_group.assert_called_once_with(
- self._security_groups[0])
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_groups:
- arglist.append(s.name)
- verifylist = [
- ('group', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_groups:
- calls.append(call(s))
- self.network.delete_security_group.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete_with_exception(self):
- arglist = [
- self._security_groups[0].name,
- 'unexist_security_group',
- ]
- verifylist = [
- ('group',
- [self._security_groups[0].name, 'unexist_security_group']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._security_groups[0], exceptions.CommandError]
- self.network.find_security_group = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 groups failed to delete.', str(e))
-
- self.network.find_security_group.assert_any_call(
- self._security_groups[0].name, ignore_missing=False)
- self.network.find_security_group.assert_any_call(
- 'unexist_security_group', ignore_missing=False)
- self.network.delete_security_group.assert_called_once_with(
- self._security_groups[0]
- )
-
-
-class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security groups to be deleted.
- _security_groups = \
- compute_fakes.FakeSecurityGroup.create_security_groups()
-
- def setUp(self):
- super(TestDeleteSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.delete = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = (
- compute_fakes.FakeSecurityGroup.get_security_groups(
- self._security_groups)
- )
-
- # Get the command object to test
- self.cmd = security_group.DeleteSecurityGroup(self.app, None)
-
- def test_security_group_delete(self):
- arglist = [
- self._security_groups[0].id,
- ]
- verifylist = [
- ('group', [self._security_groups[0].id]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id)
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_groups:
- arglist.append(s.id)
- verifylist = [
- ('group', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_groups:
- calls.append(call(s.id))
- self.compute.security_groups.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete_with_exception(self):
- arglist = [
- self._security_groups[0].id,
- 'unexist_security_group',
- ]
- verifylist = [
- ('group',
- [self._security_groups[0].id, 'unexist_security_group']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._security_groups[0], exceptions.CommandError]
- self.compute.security_groups.get = (
- mock.MagicMock(side_effect=find_mock_result)
- )
- self.compute.security_groups.find.side_effect = (
- exceptions.NotFound(None))
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 groups failed to delete.', str(e))
-
- self.compute.security_groups.get.assert_any_call(
- self._security_groups[0].id)
- self.compute.security_groups.get.assert_any_call(
- 'unexist_security_group')
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id
- )
-
-
-class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
-
- # The security group to be listed.
- _security_groups = \
- network_fakes.FakeSecurityGroup.create_security_groups(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Description',
- 'Project',
- )
-
- data = []
- for grp in _security_groups:
- data.append((
- grp.id,
- grp.name,
- grp.description,
- grp.tenant_id,
- ))
-
- def setUp(self):
- super(TestListSecurityGroupNetwork, self).setUp()
-
- self.network.security_groups = mock.Mock(
- return_value=self._security_groups)
-
- # Get the command object to test
- self.cmd = security_group.ListSecurityGroup(self.app, self.namespace)
-
- def test_security_group_list_no_options(self):
- arglist = []
- verifylist = [
- ('all_projects', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.security_groups.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_security_group_list_all_projects(self):
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.security_groups.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestListSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group to be listed.
- _security_groups = \
- compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Description',
- )
- columns_all_projects = (
- 'ID',
- 'Name',
- 'Description',
- 'Project',
- )
-
- data = []
- for grp in _security_groups:
- data.append((
- grp.id,
- grp.name,
- grp.description,
- ))
- data_all_projects = []
- for grp in _security_groups:
- data_all_projects.append((
- grp.id,
- grp.name,
- grp.description,
- grp.tenant_id,
- ))
-
- def setUp(self):
- super(TestListSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.list.return_value = self._security_groups
-
- # Get the command object to test
- self.cmd = security_group.ListSecurityGroup(self.app, None)
-
- def test_security_group_list_no_options(self):
- arglist = []
- verifylist = [
- ('all_projects', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- kwargs = {'search_opts': {'all_tenants': False}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_security_group_list_all_projects(self):
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- kwargs = {'search_opts': {'all_tenants': True}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
- self.assertEqual(self.columns_all_projects, columns)
- self.assertEqual(self.data_all_projects, list(data))
-
-
-class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
-
- # The security group to be set.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group()
-
- def setUp(self):
- super(TestSetSecurityGroupNetwork, self).setUp()
-
- self.network.update_security_group = mock.Mock(return_value=None)
-
- self.network.find_security_group = mock.Mock(
- return_value=self._security_group)
-
- # Get the command object to test
- self.cmd = security_group.SetSecurityGroup(self.app, self.namespace)
-
- def test_set_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_set_no_updates(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('group', self._security_group.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.network.update_security_group.assert_called_once_with(
- self._security_group,
- **{}
- )
- self.assertIsNone(result)
-
- def test_set_all_options(self):
- new_name = 'new-' + self._security_group.name
- new_description = 'new-' + self._security_group.description
- arglist = [
- '--name', new_name,
- '--description', new_description,
- self._security_group.name,
- ]
- verifylist = [
- ('description', new_description),
- ('group', self._security_group.name),
- ('name', new_name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'description': new_description,
- 'name': new_name,
- }
- self.network.update_security_group.assert_called_once_with(
- self._security_group,
- **attrs
- )
- self.assertIsNone(result)
-
-
-class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group to be set.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- def setUp(self):
- super(TestSetSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.update = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = mock.Mock(
- return_value=self._security_group)
-
- # Get the command object to test
- self.cmd = security_group.SetSecurityGroup(self.app, None)
-
- def test_set_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_set_no_updates(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('group', self._security_group.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.update.assert_called_once_with(
- self._security_group,
- self._security_group.name,
- self._security_group.description
- )
- self.assertIsNone(result)
-
- def test_set_all_options(self):
- new_name = 'new-' + self._security_group.name
- new_description = 'new-' + self._security_group.description
- arglist = [
- '--name', new_name,
- '--description', new_description,
- self._security_group.name,
- ]
- verifylist = [
- ('description', new_description),
- ('group', self._security_group.name),
- ('name', new_name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.update.assert_called_once_with(
- self._security_group,
- new_name,
- new_description
- )
- self.assertIsNone(result)
-
-
-class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
-
- # The security group rule to be shown with the group.
- _security_group_rule = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- # The security group to be shown.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group(
- attrs={'security_group_rules': [_security_group_rule._info]}
- )
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.project_id,
- security_group._format_network_security_group_rules(
- [_security_group_rule._info]),
- )
-
- def setUp(self):
- super(TestShowSecurityGroupNetwork, self).setUp()
-
- self.network.find_security_group = mock.Mock(
- return_value=self._security_group)
-
- # Get the command object to test
- self.cmd = security_group.ShowSecurityGroup(self.app, self.namespace)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group.id,
- ]
- verifylist = [
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_security_group.assert_called_once_with(
- self._security_group.id, ignore_missing=False)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group rule to be shown with the group.
- _security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- # The security group to be shown.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group(
- attrs={'rules': [_security_group_rule._info]}
- )
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
- security_group._format_compute_security_group_rules(
- [_security_group_rule._info]),
- )
-
- def setUp(self):
- super(TestShowSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group.ShowSecurityGroup(self.app, None)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group.id,
- ]
- verifylist = [
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py
deleted file mode 100644
index 170989bf..00000000
--- a/openstackclient/tests/network/v2/test_security_group_rule.py
+++ /dev/null
@@ -1,1177 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import copy
-import mock
-from mock import call
-
-from osc_lib import exceptions
-
-from openstackclient.network import utils as network_utils
-from openstackclient.network.v2 import security_group_rule
-from openstackclient.tests.compute.v2 import fakes as compute_fakes
-from openstackclient.tests import fakes
-from openstackclient.tests.identity.v3 import fakes as identity_fakes
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestSecurityGroupRuleNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestSecurityGroupRuleCompute, self).setUp()
-
- # Get a shortcut to the network client
- self.compute = self.app.client_manager.compute
-
-
-class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group rule to be created.
- _security_group_rule = None
-
- # The security group that will contain the rule created.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group()
-
- expected_columns = (
- 'direction',
- 'ethertype',
- 'id',
- 'port_range_max',
- 'port_range_min',
- 'project_id',
- 'protocol',
- 'remote_group_id',
- 'remote_ip_prefix',
- 'security_group_id',
- )
-
- expected_data = None
-
- def _setup_security_group_rule(self, attrs=None):
- self._security_group_rule = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
- attrs)
- self.network.create_security_group_rule = mock.Mock(
- return_value=self._security_group_rule)
- self.expected_data = (
- self._security_group_rule.direction,
- self._security_group_rule.ethertype,
- self._security_group_rule.id,
- self._security_group_rule.port_range_max,
- self._security_group_rule.port_range_min,
- self._security_group_rule.project_id,
- self._security_group_rule.protocol,
- self._security_group_rule.remote_group_id,
- self._security_group_rule.remote_ip_prefix,
- self._security_group_rule.security_group_id,
- )
-
- def setUp(self):
- super(TestCreateSecurityGroupRuleNetwork, self).setUp()
-
- self.network.find_security_group = mock.Mock(
- return_value=self._security_group)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- # Get the command object to test
- self.cmd = security_group_rule.CreateSecurityGroupRule(
- self.app, self.namespace)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_all_source_options(self):
- arglist = [
- '--src-ip', '10.10.0.0/24',
- '--src-group', self._security_group.id,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_bad_ethertype(self):
- arglist = [
- '--ethertype', 'foo',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_all_protocol_options(self):
- arglist = [
- '--protocol', 'tcp',
- '--proto', 'tcp',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_all_port_range_options(self):
- arglist = [
- '--dst-port', '80:80',
- '--icmp-type', '3',
- '--icmp-code', '1',
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (80, 80)),
- ('icmp_type', 3),
- ('icmp_code', 1),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- def test_create_default_rule(self):
- self._setup_security_group_rule({
- 'port_range_max': 443,
- 'port_range_min': 443,
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.port_range_min),
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.port_range_min,
- self._security_group_rule.port_range_max)),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_max': self._security_group_rule.port_range_max,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'protocol': self._security_group_rule.protocol,
- 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_proto_option(self):
- self._setup_security_group_rule({
- 'protocol': 'icmp',
- 'remote_ip_prefix': '10.0.2.0/24',
- })
- arglist = [
- '--proto', self._security_group_rule.protocol,
- '--src-ip', self._security_group_rule.remote_ip_prefix,
- self._security_group.id,
- ]
- verifylist = [
- ('proto', self._security_group_rule.protocol),
- ('protocol', None),
- ('src_ip', self._security_group_rule.remote_ip_prefix),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'protocol': self._security_group_rule.protocol,
- 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_source_group(self):
- self._setup_security_group_rule({
- 'port_range_max': 22,
- 'port_range_min': 22,
- 'remote_group_id': self._security_group.id,
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.port_range_min),
- '--ingress',
- '--src-group', self._security_group.name,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.port_range_min,
- self._security_group_rule.port_range_max)),
- ('ingress', True),
- ('src_group', self._security_group.name),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_max': self._security_group_rule.port_range_max,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'protocol': self._security_group_rule.protocol,
- 'remote_group_id': self._security_group_rule.remote_group_id,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_source_ip(self):
- self._setup_security_group_rule({
- 'protocol': 'icmp',
- 'remote_ip_prefix': '10.0.2.0/24',
- })
- arglist = [
- '--protocol', self._security_group_rule.protocol,
- '--src-ip', self._security_group_rule.remote_ip_prefix,
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', self._security_group_rule.protocol),
- ('src_ip', self._security_group_rule.remote_ip_prefix),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'protocol': self._security_group_rule.protocol,
- 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_network_options(self):
- self._setup_security_group_rule({
- 'direction': 'egress',
- 'ethertype': 'IPv6',
- 'port_range_max': 443,
- 'port_range_min': 443,
- 'protocol': '6',
- 'remote_group_id': None,
- 'remote_ip_prefix': None,
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.port_range_min),
- '--egress',
- '--ethertype', self._security_group_rule.ethertype,
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- '--protocol', self._security_group_rule.protocol,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.port_range_min,
- self._security_group_rule.port_range_max)),
- ('egress', True),
- ('ethertype', self._security_group_rule.ethertype),
- ('project', self.project.name),
- ('project_domain', self.domain.name),
- ('protocol', self._security_group_rule.protocol),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_max': self._security_group_rule.port_range_max,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'protocol': self._security_group_rule.protocol,
- 'security_group_id': self._security_group.id,
- 'tenant_id': self.project.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_tcp_with_icmp_type(self):
- arglist = [
- '--protocol', 'tcp',
- '--icmp-type', '15',
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', 'tcp'),
- ('icmp_type', 15),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- def test_create_icmp_code(self):
- arglist = [
- '--protocol', '1',
- '--icmp-code', '1',
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', '1'),
- ('icmp_code', 1),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
-
- def test_create_icmp_type(self):
- self._setup_security_group_rule({
- 'port_range_min': 15,
- 'protocol': 'icmp',
- 'remote_ip_prefix': '0.0.0.0/0',
- })
- arglist = [
- '--icmp-type', str(self._security_group_rule.port_range_min),
- '--protocol', self._security_group_rule.protocol,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', None),
- ('icmp_type', self._security_group_rule.port_range_min),
- ('icmp_code', None),
- ('protocol', self._security_group_rule.protocol),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'protocol': self._security_group_rule.protocol,
- 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_ipv6_icmp_type_code(self):
- self._setup_security_group_rule({
- 'ethertype': 'IPv6',
- 'port_range_min': 139,
- 'port_range_max': 2,
- 'protocol': 'ipv6-icmp',
- })
- arglist = [
- '--icmp-type', str(self._security_group_rule.port_range_min),
- '--icmp-code', str(self._security_group_rule.port_range_max),
- '--protocol', self._security_group_rule.protocol,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', None),
- ('icmp_type', self._security_group_rule.port_range_min),
- ('icmp_code', self._security_group_rule.port_range_max),
- ('protocol', self._security_group_rule.protocol),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'port_range_max': self._security_group_rule.port_range_max,
- 'protocol': self._security_group_rule.protocol,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
- def test_create_icmpv6_type(self):
- self._setup_security_group_rule({
- 'ethertype': 'IPv6',
- 'port_range_min': 139,
- 'protocol': 'icmpv6',
- })
- arglist = [
- '--icmp-type', str(self._security_group_rule.port_range_min),
- '--protocol', self._security_group_rule.protocol,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', None),
- ('icmp_type', self._security_group_rule.port_range_min),
- ('icmp_code', None),
- ('protocol', self._security_group_rule.protocol),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_security_group_rule.assert_called_once_with(**{
- 'direction': self._security_group_rule.direction,
- 'ethertype': self._security_group_rule.ethertype,
- 'port_range_min': self._security_group_rule.port_range_min,
- 'protocol': self._security_group_rule.protocol,
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns, columns)
- self.assertEqual(self.expected_data, data)
-
-
-class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group rule to be created.
- _security_group_rule = None
-
- # The security group that will contain the rule created.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- def _setup_security_group_rule(self, attrs=None):
- self._security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
- attrs)
- self.compute.security_group_rules.create.return_value = \
- self._security_group_rule
- expected_columns, expected_data = \
- security_group_rule._format_security_group_rule_show(
- self._security_group_rule._info)
- return expected_columns, expected_data
-
- def setUp(self):
- super(TestCreateSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_all_source_options(self):
- arglist = [
- '--src-ip', '10.10.0.0/24',
- '--src-group', self._security_group.id,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_bad_protocol(self):
- arglist = [
- '--protocol', 'foo',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_all_protocol_options(self):
- arglist = [
- '--protocol', 'tcp',
- '--proto', 'tcp',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_network_options(self):
- arglist = [
- '--ingress',
- '--ethertype', 'IPv4',
- '--icmp-type', '3',
- '--icmp-code', '11',
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_default_rule(self):
- expected_columns, expected_data = self._setup_security_group_rule()
- dst_port = str(self._security_group_rule.from_port) + ':' + \
- str(self._security_group_rule.to_port)
- arglist = [
- '--dst-port', dst_port,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.from_port,
- self._security_group_rule.to_port)),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_source_group(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'from_port': 22,
- 'to_port': 22,
- 'group': {'name': self._security_group.name},
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.from_port),
- '--src-group', self._security_group.name,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.from_port,
- self._security_group_rule.to_port)),
- ('src_group', self._security_group.name),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_source_ip(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- })
- arglist = [
- '--protocol', self._security_group_rule.ip_protocol,
- '--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', self._security_group_rule.ip_protocol),
- ('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_proto_option(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- })
- arglist = [
- '--proto', self._security_group_rule.ip_protocol,
- '--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- ]
- verifylist = [
- ('proto', self._security_group_rule.ip_protocol),
- ('protocol', None),
- ('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
-
-class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
-
- # The security group rules to be deleted.
- _security_group_rules = \
- network_fakes.FakeSecurityGroupRule.create_security_group_rules(
- count=2)
-
- def setUp(self):
- super(TestDeleteSecurityGroupRuleNetwork, self).setUp()
-
- self.network.delete_security_group_rule = mock.Mock(return_value=None)
-
- self.network.find_security_group_rule = (
- network_fakes.FakeSecurityGroupRule.get_security_group_rules(
- self._security_group_rules)
- )
-
- # Get the command object to test
- self.cmd = security_group_rule.DeleteSecurityGroupRule(
- self.app, self.namespace)
-
- def test_security_group_rule_delete(self):
- arglist = [
- self._security_group_rules[0].id,
- ]
- verifylist = [
- ('rule', [self._security_group_rules[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.delete_security_group_rule.assert_called_once_with(
- self._security_group_rules[0])
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_group_rules:
- arglist.append(s.id)
- verifylist = [
- ('rule', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_group_rules:
- calls.append(call(s))
- self.network.delete_security_group_rule.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete_with_exception(self):
- arglist = [
- self._security_group_rules[0].id,
- 'unexist_rule',
- ]
- verifylist = [
- ('rule',
- [self._security_group_rules[0].id, 'unexist_rule']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [
- self._security_group_rules[0], exceptions.CommandError]
- self.network.find_security_group_rule = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 rules failed to delete.', str(e))
-
- self.network.find_security_group_rule.assert_any_call(
- self._security_group_rules[0].id, ignore_missing=False)
- self.network.find_security_group_rule.assert_any_call(
- 'unexist_rule', ignore_missing=False)
- self.network.delete_security_group_rule.assert_called_once_with(
- self._security_group_rules[0]
- )
-
-
-class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group rule to be deleted.
- _security_group_rules = \
- compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
- count=2)
-
- def setUp(self):
- super(TestDeleteSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Get the command object to test
- self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
-
- def test_security_group_rule_delete(self):
- arglist = [
- self._security_group_rules[0].id,
- ]
- verifylist = [
- ('rule', [self._security_group_rules[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.delete.assert_called_once_with(
- self._security_group_rules[0].id)
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_group_rules:
- arglist.append(s.id)
- verifylist = [
- ('rule', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_group_rules:
- calls.append(call(s.id))
- self.compute.security_group_rules.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete_with_exception(self):
- arglist = [
- self._security_group_rules[0].id,
- 'unexist_rule',
- ]
- verifylist = [
- ('rule',
- [self._security_group_rules[0].id, 'unexist_rule']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [None, exceptions.CommandError]
- self.compute.security_group_rules.delete = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 rules failed to delete.', str(e))
-
- self.compute.security_group_rules.delete.assert_any_call(
- self._security_group_rules[0].id)
- self.compute.security_group_rules.delete.assert_any_call(
- 'unexist_rule')
-
-
-class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
-
- # The security group to hold the rules.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group()
-
- # The security group rule to be listed.
- _security_group_rule_tcp = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'protocol': 'tcp',
- 'port_range_max': 80,
- 'port_range_min': 80,
- 'security_group_id': _security_group.id,
- })
- _security_group_rule_icmp = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'protocol': 'icmp',
- 'remote_ip_prefix': '10.0.2.0/24',
- 'security_group_id': _security_group.id,
- })
- _security_group.security_group_rules = [_security_group_rule_tcp._info,
- _security_group_rule_icmp._info]
- _security_group_rules = [_security_group_rule_tcp,
- _security_group_rule_icmp]
-
- expected_columns_with_group_and_long = (
- 'ID',
- 'IP Protocol',
- 'IP Range',
- 'Port Range',
- 'Direction',
- 'Ethertype',
- 'Remote Security Group',
- )
- expected_columns_no_group = (
- 'ID',
- 'IP Protocol',
- 'IP Range',
- 'Port Range',
- 'Remote Security Group',
- 'Security Group',
- )
-
- expected_data_with_group_and_long = []
- expected_data_no_group = []
- for _security_group_rule in _security_group_rules:
- expected_data_with_group_and_long.append((
- _security_group_rule.id,
- _security_group_rule.protocol,
- _security_group_rule.remote_ip_prefix,
- security_group_rule._format_network_port_range(
- _security_group_rule),
- _security_group_rule.direction,
- _security_group_rule.ethertype,
- _security_group_rule.remote_group_id,
- ))
- expected_data_no_group.append((
- _security_group_rule.id,
- _security_group_rule.protocol,
- _security_group_rule.remote_ip_prefix,
- security_group_rule._format_network_port_range(
- _security_group_rule),
- _security_group_rule.remote_group_id,
- _security_group_rule.security_group_id,
- ))
-
- def setUp(self):
- super(TestListSecurityGroupRuleNetwork, self).setUp()
-
- self.network.find_security_group = mock.Mock(
- return_value=self._security_group)
- self.network.security_group_rules = mock.Mock(
- return_value=self._security_group_rules)
-
- # Get the command object to test
- self.cmd = security_group_rule.ListSecurityGroupRule(
- self.app, self.namespace)
-
- def test_list_default(self):
- self._security_group_rule_tcp.port_range_min = 80
- parsed_args = self.check_parser(self.cmd, [], [])
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.security_group_rules.assert_called_once_with(**{})
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
- def test_list_with_group_and_long(self):
- self._security_group_rule_tcp.port_range_min = 80
- arglist = [
- '--long',
- self._security_group.id,
- ]
- verifylist = [
- ('long', True),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.security_group_rules.assert_called_once_with(**{
- 'security_group_id': self._security_group.id,
- })
- self.assertEqual(self.expected_columns_with_group_and_long, columns)
- self.assertEqual(self.expected_data_with_group_and_long, list(data))
-
- def test_list_with_ignored_options(self):
- self._security_group_rule_tcp.port_range_min = 80
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.security_group_rules.assert_called_once_with(**{})
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
-
-class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group to hold the rules.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- # The security group rule to be listed.
- _security_group_rule_tcp = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'ip_protocol': 'tcp',
- 'from_port': 80,
- 'to_port': 80,
- 'group': {'name': _security_group.name},
- })
- _security_group_rule_icmp = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- 'group': {'name': _security_group.name},
- })
- _security_group.rules = [_security_group_rule_tcp._info,
- _security_group_rule_icmp._info]
-
- expected_columns_with_group = (
- 'ID',
- 'IP Protocol',
- 'IP Range',
- 'Port Range',
- 'Remote Security Group',
- )
- expected_columns_no_group = \
- expected_columns_with_group + ('Security Group',)
-
- expected_data_with_group = []
- expected_data_no_group = []
- for _security_group_rule in _security_group.rules:
- rule = network_utils.transform_compute_security_group_rule(
- _security_group_rule
- )
- expected_rule_with_group = (
- rule['id'],
- rule['ip_protocol'],
- rule['ip_range'],
- rule['port_range'],
- rule['remote_security_group'],
- )
- expected_rule_no_group = expected_rule_with_group + \
- (_security_group_rule['parent_group_id'],)
- expected_data_with_group.append(expected_rule_with_group)
- expected_data_no_group.append(expected_rule_no_group)
-
- def setUp(self):
- super(TestListSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = \
- self._security_group
- self.compute.security_groups.list.return_value = \
- [self._security_group]
-
- # Get the command object to test
- self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
-
- def test_list_default(self):
- parsed_args = self.check_parser(self.cmd, [], [])
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': False}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
- def test_list_with_group(self):
- arglist = [
- self._security_group.id,
- ]
- verifylist = [
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id
- )
- self.assertEqual(self.expected_columns_with_group, columns)
- self.assertEqual(self.expected_data_with_group, list(data))
-
- def test_list_all_projects(self):
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': True}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
- def test_list_with_ignored_options(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': False}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
-
-class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
-
- # The security group rule to be shown.
- _security_group_rule = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- columns = (
- 'direction',
- 'ethertype',
- 'id',
- 'port_range_max',
- 'port_range_min',
- 'project_id',
- 'protocol',
- 'remote_group_id',
- 'remote_ip_prefix',
- 'security_group_id',
- )
-
- data = (
- _security_group_rule.direction,
- _security_group_rule.ethertype,
- _security_group_rule.id,
- _security_group_rule.port_range_max,
- _security_group_rule.port_range_min,
- _security_group_rule.project_id,
- _security_group_rule.protocol,
- _security_group_rule.remote_group_id,
- _security_group_rule.remote_ip_prefix,
- _security_group_rule.security_group_id,
- )
-
- def setUp(self):
- super(TestShowSecurityGroupRuleNetwork, self).setUp()
-
- self.network.find_security_group_rule = mock.Mock(
- return_value=self._security_group_rule)
-
- # Get the command object to test
- self.cmd = security_group_rule.ShowSecurityGroupRule(
- self.app, self.namespace)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group_rule.id,
- ]
- verifylist = [
- ('rule', self._security_group_rule.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_security_group_rule.assert_called_once_with(
- self._security_group_rule.id, ignore_missing=False)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group rule to be shown.
- _security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- columns, data = \
- security_group_rule._format_security_group_rule_show(
- _security_group_rule._info)
-
- def setUp(self):
- super(TestShowSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Build a security group fake customized for this test.
- security_group_rules = [self._security_group_rule._info]
- security_group = fakes.FakeResource(
- info=copy.deepcopy({'rules': security_group_rules}),
- loaded=True)
- security_group.rules = security_group_rules
- self.compute.security_groups.list.return_value = [security_group]
-
- # Get the command object to test
- self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group_rule.id,
- ]
- verifylist = [
- ('rule', self._security_group_rule.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/network/v2/test_subnet.py b/openstackclient/tests/network/v2/test_subnet.py
deleted file mode 100644
index c117c6fd..00000000
--- a/openstackclient/tests/network/v2/test_subnet.py
+++ /dev/null
@@ -1,978 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-from mock import call
-
-from osc_lib import exceptions
-from osc_lib import utils
-
-from openstackclient.network.v2 import subnet as subnet_v2
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestSubnet(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestSubnet, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestCreateSubnet(TestSubnet):
-
- project = identity_fakes_v3.FakeProject.create_one_project()
- domain = identity_fakes_v3.FakeDomain.create_one_domain()
- # An IPv4 subnet to be created with mostly default values
- _subnet = network_fakes.FakeSubnet.create_one_subnet(
- attrs={
- 'tenant_id': project.id,
- }
- )
-
- # Subnet pool to be used to create a subnet from a pool
- _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
-
- # An IPv4 subnet to be created using a specific subnet pool
- _subnet_from_pool = network_fakes.FakeSubnet.create_one_subnet(
- attrs={
- 'tenant_id': project.id,
- 'subnetpool_id': _subnet_pool.id,
- 'dns_nameservers': ['8.8.8.8',
- '8.8.4.4'],
- 'host_routes': [{'destination': '10.20.20.0/24',
- 'nexthop': '10.20.20.1'},
- {'destination': '10.30.30.0/24',
- 'nexthop': '10.30.30.1'}],
- 'service_types': ['network:router_gateway',
- 'network:floatingip_agent_gateway'],
- }
- )
-
- # An IPv6 subnet to be created with most options specified
- _subnet_ipv6 = network_fakes.FakeSubnet.create_one_subnet(
- attrs={
- 'tenant_id': project.id,
- 'cidr': 'fe80:0:0:a00a::/64',
- 'enable_dhcp': True,
- 'dns_nameservers': ['fe80:27ff:a00a:f00f::ffff',
- 'fe80:37ff:a00a:f00f::ffff'],
- 'allocation_pools': [{'start': 'fe80::a00a:0:c0de:0:100',
- 'end': 'fe80::a00a:0:c0de:0:f000'},
- {'start': 'fe80::a00a:0:c0de:1:100',
- 'end': 'fe80::a00a:0:c0de:1:f000'}],
- 'host_routes': [{'destination': 'fe80:27ff:a00a:f00f::/64',
- 'nexthop': 'fe80:27ff:a00a:f00f::1'},
- {'destination': 'fe80:37ff:a00a:f00f::/64',
- 'nexthop': 'fe80:37ff:a00a:f00f::1'}],
- 'ip_version': 6,
- 'gateway_ip': 'fe80::a00a:0:c0de:0:1',
- 'ipv6_address_mode': 'slaac',
- 'ipv6_ra_mode': 'slaac',
- 'subnetpool_id': 'None',
- 'service_types': ['network:router_gateway',
- 'network:floatingip_agent_gateway'],
- }
- )
-
- # The network to be returned from find_network
- _network = network_fakes.FakeNetwork.create_one_network(
- attrs={
- 'id': _subnet.network_id,
- }
- )
-
- # The network segment to be returned from find_segment
- _network_segment = \
- network_fakes.FakeNetworkSegment.create_one_network_segment(
- attrs={
- 'network_id': _subnet.network_id,
- }
- )
-
- columns = (
- 'allocation_pools',
- 'cidr',
- 'dns_nameservers',
- 'enable_dhcp',
- 'gateway_ip',
- 'host_routes',
- 'id',
- 'ip_version',
- 'ipv6_address_mode',
- 'ipv6_ra_mode',
- 'name',
- 'network_id',
- 'project_id',
- 'segment_id',
- 'service_types',
- 'subnetpool_id',
- )
-
- data = (
- subnet_v2._format_allocation_pools(_subnet.allocation_pools),
- _subnet.cidr,
- utils.format_list(_subnet.dns_nameservers),
- _subnet.enable_dhcp,
- _subnet.gateway_ip,
- subnet_v2._format_host_routes(_subnet.host_routes),
- _subnet.id,
- _subnet.ip_version,
- _subnet.ipv6_address_mode,
- _subnet.ipv6_ra_mode,
- _subnet.name,
- _subnet.network_id,
- _subnet.project_id,
- _subnet.segment_id,
- utils.format_list(_subnet.service_types),
- _subnet.subnetpool_id,
- )
-
- data_subnet_pool = (
- subnet_v2._format_allocation_pools(_subnet_from_pool.allocation_pools),
- _subnet_from_pool.cidr,
- utils.format_list(_subnet_from_pool.dns_nameservers),
- _subnet_from_pool.enable_dhcp,
- _subnet_from_pool.gateway_ip,
- subnet_v2._format_host_routes(_subnet_from_pool.host_routes),
- _subnet_from_pool.id,
- _subnet_from_pool.ip_version,
- _subnet_from_pool.ipv6_address_mode,
- _subnet_from_pool.ipv6_ra_mode,
- _subnet_from_pool.name,
- _subnet_from_pool.network_id,
- _subnet_from_pool.project_id,
- _subnet_from_pool.segment_id,
- utils.format_list(_subnet_from_pool.service_types),
- _subnet_from_pool.subnetpool_id,
- )
-
- data_ipv6 = (
- subnet_v2._format_allocation_pools(_subnet_ipv6.allocation_pools),
- _subnet_ipv6.cidr,
- utils.format_list(_subnet_ipv6.dns_nameservers),
- _subnet_ipv6.enable_dhcp,
- _subnet_ipv6.gateway_ip,
- subnet_v2._format_host_routes(_subnet_ipv6.host_routes),
- _subnet_ipv6.id,
- _subnet_ipv6.ip_version,
- _subnet_ipv6.ipv6_address_mode,
- _subnet_ipv6.ipv6_ra_mode,
- _subnet_ipv6.name,
- _subnet_ipv6.network_id,
- _subnet_ipv6.project_id,
- _subnet_ipv6.segment_id,
- utils.format_list(_subnet_ipv6.service_types),
- _subnet_ipv6.subnetpool_id,
- )
-
- def setUp(self):
- super(TestCreateSubnet, self).setUp()
-
- # Get the command object to test
- self.cmd = subnet_v2.CreateSubnet(self.app, self.namespace)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- # Mock SDK calls for all tests.
- self.network.find_network = mock.Mock(return_value=self._network)
- self.network.find_segment = mock.Mock(
- return_value=self._network_segment
- )
- self.network.find_subnet_pool = mock.Mock(
- return_value=self._subnet_pool
- )
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- # Testing that a call without the required argument will fail and
- # throw a "ParserExecption"
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- # Mock SDK calls for this test.
- self.network.create_subnet = mock.Mock(return_value=self._subnet)
- self._network.id = self._subnet.network_id
-
- arglist = [
- "--subnet-range", self._subnet.cidr,
- "--network", self._subnet.network_id,
- self._subnet.name,
- ]
- verifylist = [
- ('name', self._subnet.name),
- ('subnet_range', self._subnet.cidr),
- ('network', self._subnet.network_id),
- ('ip_version', self._subnet.ip_version),
- ('gateway', 'auto'),
-
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_subnet.assert_called_once_with(**{
- 'cidr': self._subnet.cidr,
- 'ip_version': self._subnet.ip_version,
- 'name': self._subnet.name,
- 'network_id': self._subnet.network_id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_from_subnet_pool_options(self):
- # Mock SDK calls for this test.
- self.network.create_subnet = \
- mock.Mock(return_value=self._subnet_from_pool)
- self._network.id = self._subnet_from_pool.network_id
-
- arglist = [
- self._subnet_from_pool.name,
- "--subnet-pool", self._subnet_from_pool.subnetpool_id,
- "--prefix-length", '24',
- "--network", self._subnet_from_pool.network_id,
- "--ip-version", str(self._subnet_from_pool.ip_version),
- "--gateway", self._subnet_from_pool.gateway_ip,
- "--dhcp",
- ]
-
- for dns_addr in self._subnet_from_pool.dns_nameservers:
- arglist.append('--dns-nameserver')
- arglist.append(dns_addr)
-
- for host_route in self._subnet_from_pool.host_routes:
- arglist.append('--host-route')
- value = 'gateway=' + host_route.get('nexthop', '') + \
- ',destination=' + host_route.get('destination', '')
- arglist.append(value)
-
- for service_type in self._subnet_from_pool.service_types:
- arglist.append('--service-type')
- arglist.append(service_type)
-
- verifylist = [
- ('name', self._subnet_from_pool.name),
- ('prefix_length', '24'),
- ('network', self._subnet_from_pool.network_id),
- ('ip_version', self._subnet_from_pool.ip_version),
- ('gateway', self._subnet_from_pool.gateway_ip),
- ('dns_nameservers', self._subnet_from_pool.dns_nameservers),
- ('dhcp', self._subnet_from_pool.enable_dhcp),
- ('host_routes', subnet_v2.convert_entries_to_gateway(
- self._subnet_from_pool.host_routes)),
- ('subnet_pool', self._subnet_from_pool.subnetpool_id),
- ('service_types', self._subnet_from_pool.service_types),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_subnet.assert_called_once_with(**{
- 'dns_nameservers': self._subnet_from_pool.dns_nameservers,
- 'enable_dhcp': self._subnet_from_pool.enable_dhcp,
- 'gateway_ip': self._subnet_from_pool.gateway_ip,
- 'host_routes': self._subnet_from_pool.host_routes,
- 'ip_version': self._subnet_from_pool.ip_version,
- 'name': self._subnet_from_pool.name,
- 'network_id': self._subnet_from_pool.network_id,
- 'prefixlen': '24',
- 'subnetpool_id': self._subnet_from_pool.subnetpool_id,
- 'service_types': self._subnet_from_pool.service_types,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data_subnet_pool, data)
-
- def test_create_options_subnet_range_ipv6(self):
- # Mock SDK calls for this test.
- self.network.create_subnet = mock.Mock(return_value=self._subnet_ipv6)
- self._network.id = self._subnet_ipv6.network_id
-
- arglist = [
- self._subnet_ipv6.name,
- "--subnet-range", self._subnet_ipv6.cidr,
- "--network", self._subnet_ipv6.network_id,
- "--ip-version", str(self._subnet_ipv6.ip_version),
- "--ipv6-ra-mode", self._subnet_ipv6.ipv6_ra_mode,
- "--ipv6-address-mode", self._subnet_ipv6.ipv6_address_mode,
- "--gateway", self._subnet_ipv6.gateway_ip,
- "--dhcp",
- ]
-
- for dns_addr in self._subnet_ipv6.dns_nameservers:
- arglist.append('--dns-nameserver')
- arglist.append(dns_addr)
-
- for host_route in self._subnet_ipv6.host_routes:
- arglist.append('--host-route')
- value = 'gateway=' + host_route.get('nexthop', '') + \
- ',destination=' + host_route.get('destination', '')
- arglist.append(value)
-
- for pool in self._subnet_ipv6.allocation_pools:
- arglist.append('--allocation-pool')
- value = 'start=' + pool.get('start', '') + \
- ',end=' + pool.get('end', '')
- arglist.append(value)
-
- for service_type in self._subnet_ipv6.service_types:
- arglist.append('--service-type')
- arglist.append(service_type)
-
- verifylist = [
- ('name', self._subnet_ipv6.name),
- ('subnet_range', self._subnet_ipv6.cidr),
- ('network', self._subnet_ipv6.network_id),
- ('ip_version', self._subnet_ipv6.ip_version),
- ('ipv6_ra_mode', self._subnet_ipv6.ipv6_ra_mode),
- ('ipv6_address_mode', self._subnet_ipv6.ipv6_address_mode),
- ('gateway', self._subnet_ipv6.gateway_ip),
- ('dns_nameservers', self._subnet_ipv6.dns_nameservers),
- ('dhcp', self._subnet_ipv6.enable_dhcp),
- ('host_routes', subnet_v2.convert_entries_to_gateway(
- self._subnet_ipv6.host_routes)),
- ('allocation_pools', self._subnet_ipv6.allocation_pools),
- ('service_types', self._subnet_ipv6.service_types),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_subnet.assert_called_once_with(**{
- 'cidr': self._subnet_ipv6.cidr,
- 'dns_nameservers': self._subnet_ipv6.dns_nameservers,
- 'enable_dhcp': self._subnet_ipv6.enable_dhcp,
- 'gateway_ip': self._subnet_ipv6.gateway_ip,
- 'host_routes': self._subnet_ipv6.host_routes,
- 'ip_version': self._subnet_ipv6.ip_version,
- 'ipv6_address_mode': self._subnet_ipv6.ipv6_address_mode,
- 'ipv6_ra_mode': self._subnet_ipv6.ipv6_ra_mode,
- 'name': self._subnet_ipv6.name,
- 'network_id': self._subnet_ipv6.network_id,
- 'allocation_pools': self._subnet_ipv6.allocation_pools,
- 'service_types': self._subnet_ipv6.service_types,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data_ipv6, data)
-
- def test_create_no_beta_command_options(self):
- arglist = [
- "--subnet-range", self._subnet.cidr,
- "--network-segment", self._network_segment.id,
- "--network", self._subnet.network_id,
- self._subnet.name,
- ]
- verifylist = [
- ('name', self._subnet.name),
- ('subnet_range', self._subnet.cidr),
- ('network-segment', self._network_segment.id),
- ('network', self._subnet.network_id),
- ]
- self.app.options.os_beta_command = False
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
-
- def test_create_with_network_segment(self):
- # Mock SDK calls for this test.
- self.network.create_subnet = mock.Mock(return_value=self._subnet)
- self._network.id = self._subnet.network_id
-
- arglist = [
- "--subnet-range", self._subnet.cidr,
- "--network-segment", self._network_segment.id,
- "--network", self._subnet.network_id,
- self._subnet.name,
- ]
- verifylist = [
- ('name', self._subnet.name),
- ('subnet_range', self._subnet.cidr),
- ('network_segment', self._network_segment.id),
- ('network', self._subnet.network_id),
- ('ip_version', self._subnet.ip_version),
- ('gateway', 'auto'),
-
- ]
-
- self.app.options.os_beta_command = True
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.create_subnet.assert_called_once_with(**{
- 'cidr': self._subnet.cidr,
- 'ip_version': self._subnet.ip_version,
- 'name': self._subnet.name,
- 'network_id': self._subnet.network_id,
- 'segment_id': self._network_segment.id,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteSubnet(TestSubnet):
-
- # The subnets to delete.
- _subnets = network_fakes.FakeSubnet.create_subnets(count=2)
-
- def setUp(self):
- super(TestDeleteSubnet, self).setUp()
-
- self.network.delete_subnet = mock.Mock(return_value=None)
-
- self.network.find_subnet = (
- network_fakes.FakeSubnet.get_subnets(self._subnets))
-
- # Get the command object to test
- self.cmd = subnet_v2.DeleteSubnet(self.app, self.namespace)
-
- def test_subnet_delete(self):
- arglist = [
- self._subnets[0].name,
- ]
- verifylist = [
- ('subnet', [self._subnets[0].name]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.network.delete_subnet.assert_called_once_with(self._subnets[0])
- self.assertIsNone(result)
-
- def test_multi_subnets_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._subnets:
- arglist.append(s.name)
- verifylist = [
- ('subnet', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._subnets:
- calls.append(call(s))
- self.network.delete_subnet.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_subnets_delete_with_exception(self):
- arglist = [
- self._subnets[0].name,
- 'unexist_subnet',
- ]
- verifylist = [
- ('subnet',
- [self._subnets[0].name, 'unexist_subnet']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._subnets[0], exceptions.CommandError]
- self.network.find_subnet = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 subnets failed to delete.', str(e))
-
- self.network.find_subnet.assert_any_call(
- self._subnets[0].name, ignore_missing=False)
- self.network.find_subnet.assert_any_call(
- 'unexist_subnet', ignore_missing=False)
- self.network.delete_subnet.assert_called_once_with(
- self._subnets[0]
- )
-
-
-class TestListSubnet(TestSubnet):
- # The subnets going to be listed up.
- _subnet = network_fakes.FakeSubnet.create_subnets(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Network',
- 'Subnet',
- )
- columns_long = columns + (
- 'Project',
- 'DHCP',
- 'Name Servers',
- 'Allocation Pools',
- 'Host Routes',
- 'IP Version',
- 'Gateway',
- 'Service Types',
- )
-
- data = []
- for subnet in _subnet:
- data.append((
- subnet.id,
- subnet.name,
- subnet.network_id,
- subnet.cidr,
- ))
-
- data_long = []
- for subnet in _subnet:
- data_long.append((
- subnet.id,
- subnet.name,
- subnet.network_id,
- subnet.cidr,
- subnet.tenant_id,
- subnet.enable_dhcp,
- utils.format_list(subnet.dns_nameservers),
- subnet_v2._format_allocation_pools(subnet.allocation_pools),
- utils.format_list(subnet.host_routes),
- subnet.ip_version,
- subnet.gateway_ip,
- utils.format_list(subnet.service_types),
- ))
-
- def setUp(self):
- super(TestListSubnet, self).setUp()
-
- # Get the command object to test
- self.cmd = subnet_v2.ListSubnet(self.app, self.namespace)
-
- self.network.subnets = mock.Mock(return_value=self._subnet)
-
- def test_subnet_list_no_options(self):
- arglist = []
- verifylist = [
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.subnets.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.subnets.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertEqual(self.data_long, list(data))
-
- def test_subnet_list_ip_version(self):
- arglist = [
- '--ip-version', str(4),
- ]
- verifylist = [
- ('ip_version', 4),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'ip_version': 4}
-
- self.network.subnets.assert_called_once_with(**filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_list_dhcp(self):
- arglist = [
- '--dhcp',
- ]
- verifylist = [
- ('dhcp', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'enable_dhcp': True}
-
- self.network.subnets.assert_called_once_with(**filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_list_no_dhcp(self):
- arglist = [
- '--no-dhcp',
- ]
- verifylist = [
- ('no_dhcp', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'enable_dhcp': False}
-
- self.network.subnets.assert_called_once_with(**filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_list_service_type(self):
- arglist = [
- '--service-type', 'network:router_gateway',
- ]
- verifylist = [
- ('service_types', ['network:router_gateway']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'service_types': ['network:router_gateway']}
-
- self.network.subnets.assert_called_once_with(**filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_list_service_type_multiple(self):
- arglist = [
- '--service-type', 'network:router_gateway',
- '--service-type', 'network:floatingip_agent_gateway',
- ]
- verifylist = [
- ('service_types', ['network:router_gateway',
- 'network:floatingip_agent_gateway']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- filters = {'service_types': ['network:router_gateway',
- 'network:floatingip_agent_gateway']}
-
- self.network.subnets.assert_called_once_with(**filters)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestSetSubnet(TestSubnet):
-
- _subnet = network_fakes.FakeSubnet.create_one_subnet()
-
- def setUp(self):
- super(TestSetSubnet, self).setUp()
- self.network.update_subnet = mock.Mock(return_value=None)
- self.network.find_subnet = mock.Mock(return_value=self._subnet)
- self.cmd = subnet_v2.SetSubnet(self.app, self.namespace)
-
- def test_set_this(self):
- arglist = [
- "--name", "new_subnet",
- "--dhcp",
- "--gateway", self._subnet.gateway_ip,
- self._subnet.name,
- ]
- verifylist = [
- ('name', "new_subnet"),
- ('dhcp', True),
- ('gateway', self._subnet.gateway_ip),
- ('subnet', self._subnet.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'enable_dhcp': True,
- 'gateway_ip': self._subnet.gateway_ip,
- 'name': "new_subnet",
- }
- self.network.update_subnet.assert_called_with(self._subnet, **attrs)
- self.assertIsNone(result)
-
- def test_set_that(self):
- arglist = [
- "--name", "new_subnet",
- "--no-dhcp",
- "--gateway", "none",
- self._subnet.name,
- ]
- verifylist = [
- ('name', "new_subnet"),
- ('no_dhcp', True),
- ('gateway', "none"),
- ('subnet', self._subnet.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'enable_dhcp': False,
- 'gateway_ip': None,
- 'name': "new_subnet",
- }
- self.network.update_subnet.assert_called_with(self._subnet, **attrs)
- self.assertIsNone(result)
-
- def test_set_nothing(self):
- arglist = [self._subnet.name, ]
- verifylist = [('subnet', self._subnet.name)]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_subnet.assert_called_with(self._subnet, **attrs)
- self.assertIsNone(result)
-
- def test_append_options(self):
- _testsubnet = network_fakes.FakeSubnet.create_one_subnet(
- {'dns_nameservers': ["10.0.0.1"],
- 'service_types': ["network:router_gateway"]})
- self.network.find_subnet = mock.Mock(return_value=_testsubnet)
- arglist = [
- '--dns-nameserver', '10.0.0.2',
- '--service-type', 'network:floatingip_agent_gateway',
- _testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['10.0.0.2']),
- ('service_types', ['network:floatingip_agent_gateway']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {
- 'dns_nameservers': ['10.0.0.2', '10.0.0.1'],
- 'service_types': ['network:floatingip_agent_gateway',
- 'network:router_gateway'],
- }
- self.network.update_subnet.assert_called_once_with(
- _testsubnet, **attrs)
- self.assertIsNone(result)
-
-
-class TestShowSubnet(TestSubnet):
- # The subnets to be shown
- _subnet = network_fakes.FakeSubnet.create_one_subnet()
-
- columns = (
- 'allocation_pools',
- 'cidr',
- 'dns_nameservers',
- 'enable_dhcp',
- 'gateway_ip',
- 'host_routes',
- 'id',
- 'ip_version',
- 'ipv6_address_mode',
- 'ipv6_ra_mode',
- 'name',
- 'network_id',
- 'project_id',
- 'segment_id',
- 'service_types',
- 'subnetpool_id',
- )
-
- data = (
- subnet_v2._format_allocation_pools(_subnet.allocation_pools),
- _subnet.cidr,
- utils.format_list(_subnet.dns_nameservers),
- _subnet.enable_dhcp,
- _subnet.gateway_ip,
- utils.format_list(_subnet.host_routes),
- _subnet.id,
- _subnet.ip_version,
- _subnet.ipv6_address_mode,
- _subnet.ipv6_ra_mode,
- _subnet.name,
- _subnet.network_id,
- _subnet.tenant_id,
- _subnet.segment_id,
- utils.format_list(_subnet.service_types),
- _subnet.subnetpool_id,
- )
-
- def setUp(self):
- super(TestShowSubnet, self).setUp()
-
- # Get the command object to test
- self.cmd = subnet_v2.ShowSubnet(self.app, self.namespace)
-
- self.network.find_subnet = mock.Mock(return_value=self._subnet)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- # Testing that a call without the required argument will fail and
- # throw a "ParserExecption"
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._subnet.name,
- ]
- verifylist = [
- ('subnet', self._subnet.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_subnet.assert_called_once_with(
- self._subnet.name, ignore_missing=False)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestUnsetSubnet(TestSubnet):
-
- def setUp(self):
- super(TestUnsetSubnet, self).setUp()
- self._testsubnet = network_fakes.FakeSubnet.create_one_subnet(
- {'dns_nameservers': ['8.8.8.8',
- '8.8.8.4'],
- 'host_routes': [{'destination': '10.20.20.0/24',
- 'nexthop': '10.20.20.1'},
- {'destination': '10.30.30.30/24',
- 'nexthop': '10.30.30.1'}],
- 'allocation_pools': [{'start': '8.8.8.100',
- 'end': '8.8.8.150'},
- {'start': '8.8.8.160',
- 'end': '8.8.8.170'}],
- 'service_types': ['network:router_gateway',
- 'network:floatingip_agent_gateway'], })
- self.network.find_subnet = mock.Mock(return_value=self._testsubnet)
- self.network.update_subnet = mock.Mock(return_value=None)
- # Get the command object to test
- self.cmd = subnet_v2.UnsetSubnet(self.app, self.namespace)
-
- def test_unset_subnet_params(self):
- arglist = [
- '--dns-nameserver', '8.8.8.8',
- '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
- '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
- '--service-type', 'network:router_gateway',
- self._testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['8.8.8.8']),
- ('host_routes', [{
- "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
- ('allocation_pools', [{
- 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
- ('service_types', ['network:router_gateway']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'dns_nameservers': ['8.8.8.4'],
- 'host_routes': [{
- "destination": "10.20.20.0/24", "nexthop": "10.20.20.1"}],
- 'allocation_pools': [{'start': '8.8.8.160', 'end': '8.8.8.170'}],
- 'service_types': ['network:floatingip_agent_gateway'],
- }
- self.network.update_subnet.assert_called_once_with(
- self._testsubnet, **attrs)
- self.assertIsNone(result)
-
- def test_unset_subnet_wrong_host_routes(self):
- arglist = [
- '--dns-nameserver', '8.8.8.8',
- '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.2',
- '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
- self._testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['8.8.8.8']),
- ('host_routes', [{
- "destination": "10.30.30.30/24", "gateway": "10.30.30.2"}]),
- ('allocation_pools', [{
- 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action, parsed_args)
-
- def test_unset_subnet_wrong_allocation_pool(self):
- arglist = [
- '--dns-nameserver', '8.8.8.8',
- '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
- '--allocation-pool', 'start=8.8.8.100,end=8.8.8.156',
- self._testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['8.8.8.8']),
- ('host_routes', [{
- "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
- ('allocation_pools', [{
- 'start': '8.8.8.100', 'end': '8.8.8.156'}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action, parsed_args)
-
- def test_unset_subnet_wrong_dns_nameservers(self):
- arglist = [
- '--dns-nameserver', '8.8.8.1',
- '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
- '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
- self._testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['8.8.8.1']),
- ('host_routes', [{
- "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
- ('allocation_pools', [{
- 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action, parsed_args)
-
- def test_unset_subnet_wrong_service_type(self):
- arglist = [
- '--dns-nameserver', '8.8.8.8',
- '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
- '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
- '--service-type', 'network:dhcp',
- self._testsubnet.name,
- ]
- verifylist = [
- ('dns_nameservers', ['8.8.8.8']),
- ('host_routes', [{
- "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
- ('allocation_pools', [{
- 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
- ('service_types', ['network:dhcp']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action, parsed_args)
diff --git a/openstackclient/tests/network/v2/test_subnet_pool.py b/openstackclient/tests/network/v2/test_subnet_pool.py
deleted file mode 100644
index 4cfecef7..00000000
--- a/openstackclient/tests/network/v2/test_subnet_pool.py
+++ /dev/null
@@ -1,722 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import argparse
-import mock
-from mock import call
-
-from osc_lib import exceptions
-from osc_lib import utils
-
-from openstackclient.network.v2 import subnet_pool
-from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
-from openstackclient.tests.network.v2 import fakes as network_fakes
-from openstackclient.tests import utils as tests_utils
-
-
-class TestSubnetPool(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestSubnetPool, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
- # Get a shortcut to the ProjectManager Mock
- self.projects_mock = self.app.client_manager.identity.projects
- # Get a shortcut to the DomainManager Mock
- self.domains_mock = self.app.client_manager.identity.domains
-
-
-class TestCreateSubnetPool(TestSubnetPool):
-
- project = identity_fakes_v3.FakeProject.create_one_project()
- domain = identity_fakes_v3.FakeDomain.create_one_domain()
- # The new subnet pool to create.
- _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
-
- _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
-
- columns = (
- 'address_scope_id',
- 'default_prefixlen',
- 'default_quota',
- 'id',
- 'ip_version',
- 'is_default',
- 'max_prefixlen',
- 'min_prefixlen',
- 'name',
- 'prefixes',
- 'project_id',
- 'shared',
- )
- data = (
- _subnet_pool.address_scope_id,
- _subnet_pool.default_prefixlen,
- _subnet_pool.default_quota,
- _subnet_pool.id,
- _subnet_pool.ip_version,
- _subnet_pool.is_default,
- _subnet_pool.max_prefixlen,
- _subnet_pool.min_prefixlen,
- _subnet_pool.name,
- utils.format_list(_subnet_pool.prefixes),
- _subnet_pool.project_id,
- _subnet_pool.shared,
- )
-
- def setUp(self):
- super(TestCreateSubnetPool, self).setUp()
-
- self.network.create_subnet_pool = mock.Mock(
- return_value=self._subnet_pool)
-
- # Get the command object to test
- self.cmd = subnet_pool.CreateSubnetPool(self.app, self.namespace)
-
- self.network.find_address_scope = mock.Mock(
- return_value=self._address_scope)
-
- self.projects_mock.get.return_value = self.project
- self.domains_mock.get.return_value = self.domain
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_no_pool_prefix(self):
- """Make sure --pool-prefix is a required argument"""
- arglist = [
- self._subnet_pool.name,
- ]
- verifylist = [
- ('name', self._subnet_pool.name),
- ]
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- '--pool-prefix', '10.0.10.0/24',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('prefixes', ['10.0.10.0/24']),
- ('name', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_subnet_pool.assert_called_once_with(**{
- 'prefixes': ['10.0.10.0/24'],
- 'name': self._subnet_pool.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_prefixlen_options(self):
- arglist = [
- '--default-prefix-length', self._subnet_pool.default_prefixlen,
- '--max-prefix-length', self._subnet_pool.max_prefixlen,
- '--min-prefix-length', self._subnet_pool.min_prefixlen,
- '--pool-prefix', '10.0.10.0/24',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('default_prefix_length',
- int(self._subnet_pool.default_prefixlen)),
- ('max_prefix_length', int(self._subnet_pool.max_prefixlen)),
- ('min_prefix_length', int(self._subnet_pool.min_prefixlen)),
- ('name', self._subnet_pool.name),
- ('prefixes', ['10.0.10.0/24']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_subnet_pool.assert_called_once_with(**{
- 'default_prefixlen': int(self._subnet_pool.default_prefixlen),
- 'max_prefixlen': int(self._subnet_pool.max_prefixlen),
- 'min_prefixlen': int(self._subnet_pool.min_prefixlen),
- 'prefixes': ['10.0.10.0/24'],
- 'name': self._subnet_pool.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_len_negative(self):
- arglist = [
- self._subnet_pool.name,
- '--min-prefix-length', '-16',
- ]
- verifylist = [
- ('subnet_pool', self._subnet_pool.name),
- ('min_prefix_length', '-16'),
- ]
-
- self.assertRaises(argparse.ArgumentTypeError, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_project_domain(self):
- arglist = [
- '--pool-prefix', '10.0.10.0/24',
- "--project", self.project.name,
- "--project-domain", self.domain.name,
- self._subnet_pool.name,
- ]
- verifylist = [
- ('prefixes', ['10.0.10.0/24']),
- ('project', self.project.name),
- ('project_domain', self.domain.name),
- ('name', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_subnet_pool.assert_called_once_with(**{
- 'prefixes': ['10.0.10.0/24'],
- 'tenant_id': self.project.id,
- 'name': self._subnet_pool.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_address_scope_option(self):
- arglist = [
- '--pool-prefix', '10.0.10.0/24',
- '--address-scope', self._address_scope.id,
- self._subnet_pool.name,
- ]
- verifylist = [
- ('prefixes', ['10.0.10.0/24']),
- ('address_scope', self._address_scope.id),
- ('name', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_subnet_pool.assert_called_once_with(**{
- 'prefixes': ['10.0.10.0/24'],
- 'address_scope_id': self._address_scope.id,
- 'name': self._subnet_pool.name,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_default_and_shared_options(self):
- arglist = [
- '--pool-prefix', '10.0.10.0/24',
- '--default',
- '--share',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('prefixes', ['10.0.10.0/24']),
- ('default', True),
- ('share', True),
- ('name', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.network.create_subnet_pool.assert_called_once_with(**{
- 'is_default': True,
- 'name': self._subnet_pool.name,
- 'prefixes': ['10.0.10.0/24'],
- 'shared': True,
- })
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteSubnetPool(TestSubnetPool):
-
- # The subnet pools to delete.
- _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=2)
-
- def setUp(self):
- super(TestDeleteSubnetPool, self).setUp()
-
- self.network.delete_subnet_pool = mock.Mock(return_value=None)
-
- self.network.find_subnet_pool = (
- network_fakes.FakeSubnetPool.get_subnet_pools(self._subnet_pools)
- )
-
- # Get the command object to test
- self.cmd = subnet_pool.DeleteSubnetPool(self.app, self.namespace)
-
- def test_subnet_pool_delete(self):
- arglist = [
- self._subnet_pools[0].name,
- ]
- verifylist = [
- ('subnet_pool', [self._subnet_pools[0].name]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.network.delete_subnet_pool.assert_called_once_with(
- self._subnet_pools[0])
- self.assertIsNone(result)
-
- def test_multi_subnet_pools_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._subnet_pools:
- arglist.append(s.name)
- verifylist = [
- ('subnet_pool', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._subnet_pools:
- calls.append(call(s))
- self.network.delete_subnet_pool.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_subnet_pools_delete_with_exception(self):
- arglist = [
- self._subnet_pools[0].name,
- 'unexist_subnet_pool',
- ]
- verifylist = [
- ('subnet_pool',
- [self._subnet_pools[0].name, 'unexist_subnet_pool']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._subnet_pools[0], exceptions.CommandError]
- self.network.find_subnet_pool = (
- mock.MagicMock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 subnet pools failed to delete.', str(e))
-
- self.network.find_subnet_pool.assert_any_call(
- self._subnet_pools[0].name, ignore_missing=False)
- self.network.find_subnet_pool.assert_any_call(
- 'unexist_subnet_pool', ignore_missing=False)
- self.network.delete_subnet_pool.assert_called_once_with(
- self._subnet_pools[0]
- )
-
-
-class TestListSubnetPool(TestSubnetPool):
- # The subnet pools going to be listed up.
- _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Prefixes',
- )
- columns_long = columns + (
- 'Default Prefix Length',
- 'Address Scope',
- 'Default Subnet Pool',
- 'Shared',
- )
-
- data = []
- for pool in _subnet_pools:
- data.append((
- pool.id,
- pool.name,
- utils.format_list(pool.prefixes),
- ))
-
- data_long = []
- for pool in _subnet_pools:
- data_long.append((
- pool.id,
- pool.name,
- utils.format_list(pool.prefixes),
- pool.default_prefixlen,
- pool.address_scope_id,
- pool.is_default,
- pool.shared,
- ))
-
- def setUp(self):
- super(TestListSubnetPool, self).setUp()
-
- # Get the command object to test
- self.cmd = subnet_pool.ListSubnetPool(self.app, self.namespace)
-
- self.network.subnet_pools = mock.Mock(return_value=self._subnet_pools)
-
- def test_subnet_pool_list_no_option(self):
- arglist = []
- verifylist = [
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.subnet_pools.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_subnet_pool_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.subnet_pools.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertEqual(self.data_long, list(data))
-
-
-class TestSetSubnetPool(TestSubnetPool):
-
- # The subnet_pool to set.
- _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
-
- _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
-
- def setUp(self):
- super(TestSetSubnetPool, self).setUp()
-
- self.network.update_subnet_pool = mock.Mock(return_value=None)
-
- self.network.find_subnet_pool = mock.Mock(
- return_value=self._subnet_pool)
-
- self.network.find_address_scope = mock.Mock(
- return_value=self._address_scope)
-
- # Get the command object to test
- self.cmd = subnet_pool.SetSubnetPool(self.app, self.namespace)
-
- def test_set_this(self):
- arglist = [
- '--name', 'noob',
- '--default-prefix-length', '8',
- '--min-prefix-length', '8',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('name', 'noob'),
- ('default_prefix_length', 8),
- ('min_prefix_length', 8),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'name': 'noob',
- 'default_prefixlen': 8,
- 'min_prefixlen': 8,
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_that(self):
- arglist = [
- '--pool-prefix', '10.0.1.0/24',
- '--pool-prefix', '10.0.2.0/24',
- '--max-prefix-length', '16',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('prefixes', ['10.0.1.0/24', '10.0.2.0/24']),
- ('max_prefix_length', 16),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- prefixes = ['10.0.1.0/24', '10.0.2.0/24']
- prefixes.extend(self._subnet_pool.prefixes)
- attrs = {
- 'prefixes': prefixes,
- 'max_prefixlen': 16,
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_nothing(self):
- arglist = [self._subnet_pool.name, ]
- verifylist = [('subnet_pool', self._subnet_pool.name), ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_len_negative(self):
- arglist = [
- '--max-prefix-length', '-16',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('max_prefix_length', '-16'),
- ('subnet_pool', self._subnet_pool.name),
- ]
-
- self.assertRaises(argparse.ArgumentTypeError, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_set_address_scope(self):
- arglist = [
- '--address-scope', self._address_scope.id,
- self._subnet_pool.name,
- ]
- verifylist = [
- ('address_scope', self._address_scope.id),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'address_scope_id': self._address_scope.id,
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_address_scope(self):
- arglist = [
- '--no-address-scope',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('no_address_scope', True),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'address_scope_id': None,
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_address_scope_conflict(self):
- arglist = [
- '--address-scope', self._address_scope.id,
- '--no-address-scope',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('address_scope', self._address_scope.id),
- ('no_address_scope', True),
- ('subnet_pool', self._subnet_pool.name),
- ]
-
- # Exclusive arguments will conflict here.
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_set_default(self):
- arglist = [
- '--default',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('default', True),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'is_default': True
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_default(self):
- arglist = [
- '--no-default',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('no_default', True),
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'is_default': False,
- }
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnet_pool, **attrs)
- self.assertIsNone(result)
-
- def test_set_no_default_conflict(self):
- arglist = [
- '--default',
- '--no-default',
- self._subnet_pool.name,
- ]
- verifylist = [
- ('default', True),
- ('no_default', True),
- ('subnet_pool', self._subnet_pool.name),
- ]
-
- # Exclusive arguments will conflict here.
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
-
-class TestShowSubnetPool(TestSubnetPool):
-
- # The subnet_pool to set.
- _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
-
- columns = (
- 'address_scope_id',
- 'default_prefixlen',
- 'default_quota',
- 'id',
- 'ip_version',
- 'is_default',
- 'max_prefixlen',
- 'min_prefixlen',
- 'name',
- 'prefixes',
- 'project_id',
- 'shared',
- )
-
- data = (
- _subnet_pool.address_scope_id,
- _subnet_pool.default_prefixlen,
- _subnet_pool.default_quota,
- _subnet_pool.id,
- _subnet_pool.ip_version,
- _subnet_pool.is_default,
- _subnet_pool.max_prefixlen,
- _subnet_pool.min_prefixlen,
- _subnet_pool.name,
- utils.format_list(_subnet_pool.prefixes),
- _subnet_pool.tenant_id,
- _subnet_pool.shared,
- )
-
- def setUp(self):
- super(TestShowSubnetPool, self).setUp()
-
- self.network.find_subnet_pool = mock.Mock(
- return_value=self._subnet_pool
- )
-
- # Get the command object to test
- self.cmd = subnet_pool.ShowSubnetPool(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._subnet_pool.name,
- ]
- verifylist = [
- ('subnet_pool', self._subnet_pool.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.network.find_subnet_pool.assert_called_once_with(
- self._subnet_pool.name,
- ignore_missing=False
- )
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestUnsetSubnetPool(TestSubnetPool):
-
- def setUp(self):
- super(TestUnsetSubnetPool, self).setUp()
- self._subnetpool = network_fakes.FakeSubnetPool.create_one_subnet_pool(
- {'prefixes': ['10.0.10.0/24', '10.1.10.0/24',
- '10.2.10.0/24'], })
- self.network.find_subnet_pool = mock.Mock(
- return_value=self._subnetpool)
- self.network.update_subnet_pool = mock.Mock(return_value=None)
- # Get the command object to test
- self.cmd = subnet_pool.UnsetSubnetPool(self.app, self.namespace)
-
- def test_unset_subnet_pool(self):
- arglist = [
- '--pool-prefix', '10.0.10.0/24',
- '--pool-prefix', '10.1.10.0/24',
- self._subnetpool.name,
- ]
- verifylist = [('prefixes', ['10.0.10.0/24', '10.1.10.0/24'])]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
- attrs = {'prefixes': ['10.2.10.0/24']}
- self.network.update_subnet_pool.assert_called_once_with(
- self._subnetpool, **attrs)
- self.assertIsNone(result)
-
- def test_unset_subnet_pool_prefix_not_existent(self):
- arglist = [
- '--pool-prefix', '10.100.1.1/25',
- self._subnetpool.name,
- ]
- verifylist = [('prefixes', ['10.100.1.1/25'])]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- self.assertRaises(exceptions.CommandError,
- self.cmd.take_action,
- parsed_args)