diff options
| author | Steve Martinelli <s.martinelli@gmail.com> | 2016-09-05 22:14:33 -0700 |
|---|---|---|
| committer | Steve Martinelli <s.martinelli@gmail.com> | 2016-09-08 15:19:50 -0700 |
| commit | 39839def2e356e8d145be89380c73a71423cf06d (patch) | |
| tree | f53c090b7ded46554866436e9d687492d2cef487 /openstackclient/tests/network | |
| parent | 7d1a5d0854c732681a130255ddc6abb2e9721a7a (diff) | |
| download | python-openstackclient-39839def2e356e8d145be89380c73a71423cf06d.tar.gz | |
move unit tests to new "unit" test module
this will better isolate the unit tests from the functional tests.
unfortunately, the "integration" tests had to be lumped into the
"unit" tests since we need the separation in testr.conf
Change-Id: Ifd12198c1f90e4e3c951c73bfa1884ab300d8ded
Diffstat (limited to 'openstackclient/tests/network')
18 files changed, 0 insertions, 9584 deletions
diff --git a/openstackclient/tests/network/__init__.py b/openstackclient/tests/network/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/openstackclient/tests/network/__init__.py +++ /dev/null diff --git a/openstackclient/tests/network/test_common.py b/openstackclient/tests/network/test_common.py deleted file mode 100644 index 48608734..00000000 --- a/openstackclient/tests/network/test_common.py +++ /dev/null @@ -1,174 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock - -from openstackclient.network import common -from openstackclient.tests import utils - - -def _add_common_argument(parser): - parser.add_argument( - 'common', - metavar='<common>', - help='Common argument', - ) - return parser - - -def _add_network_argument(parser): - parser.add_argument( - 'network', - metavar='<network>', - help='Network argument', - ) - return parser - - -def _add_compute_argument(parser): - parser.add_argument( - 'compute', - metavar='<compute>', - help='Compute argument', - ) - return parser - - -class FakeNetworkAndComputeCommand(common.NetworkAndComputeCommand): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class FakeNetworkAndComputeLister(common.NetworkAndComputeLister): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class FakeNetworkAndComputeShowOne(common.NetworkAndComputeShowOne): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class TestNetworkAndCompute(utils.TestCommand): - - def setUp(self): - super(TestNetworkAndCompute, self).setUp() - - self.namespace = argparse.Namespace() - - # Create network client mocks. - self.app.client_manager.network = mock.Mock() - self.network = self.app.client_manager.network - self.network.network_action = mock.Mock( - return_value='take_action_network') - - # Create compute client mocks. - self.app.client_manager.compute = mock.Mock() - self.compute = self.app.client_manager.compute - self.compute.compute_action = mock.Mock( - return_value='take_action_compute') - - # Subclasses can override the command object to test. - self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) - - def test_take_action_network(self): - arglist = [ - 'common', - 'network' - ] - verifylist = [ - ('common', 'common'), - ('network', 'network') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - self.network.network_action.assert_called_with(parsed_args) - self.assertEqual('take_action_network', result) - - def test_take_action_compute(self): - arglist = [ - 'common', - 'compute' - ] - verifylist = [ - ('common', 'common'), - ('compute', 'compute') - ] - - self.app.client_manager.network_endpoint_enabled = False - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - self.compute.compute_action.assert_called_with(parsed_args) - self.assertEqual('take_action_compute', result) - - -class TestNetworkAndComputeCommand(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeCommand, self).setUp() - self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) - - -class TestNetworkAndComputeLister(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeLister, self).setUp() - self.cmd = FakeNetworkAndComputeLister(self.app, self.namespace) - - -class TestNetworkAndComputeShowOne(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeShowOne, self).setUp() - self.cmd = FakeNetworkAndComputeShowOne(self.app, self.namespace) diff --git a/openstackclient/tests/network/v2/__init__.py b/openstackclient/tests/network/v2/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/openstackclient/tests/network/v2/__init__.py +++ /dev/null diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py deleted file mode 100644 index 33bc4017..00000000 --- a/openstackclient/tests/network/v2/fakes.py +++ /dev/null @@ -1,1100 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import copy -import mock -import uuid - -from openstackclient.tests import fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests import utils - - -QUOTA = { - "subnet": 10, - "network": 10, - "floatingip": 50, - "subnetpool": -1, - "security_group_rule": 100, - "security_group": 10, - "router": 10, - "rbac_policy": -1, - "port": 50, - "vip": 10, - "member": 10, - "health_monitor": 10, -} - - -class FakeNetworkV2Client(object): - - def __init__(self, **kwargs): - self.extensions = mock.Mock() - self.extensions.resource_class = fakes.FakeResource(None, {}) - - -class TestNetworkV2(utils.TestCommand): - - def setUp(self): - super(TestNetworkV2, self).setUp() - - self.namespace = argparse.Namespace() - - self.app.client_manager.session = mock.Mock() - - self.app.client_manager.network = FakeNetworkV2Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - - self.app.client_manager.identity = ( - identity_fakes_v3.FakeIdentityv3Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - ) - - -class FakeAddressScope(object): - """Fake one or more address scopes.""" - - @staticmethod - def create_one_address_scope(attrs=None): - """Create a fake address scope. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, id, etc. - """ - attrs = attrs or {} - - # Set default attributes. - address_scope_attrs = { - 'name': 'address-scope-name-' + uuid.uuid4().hex, - 'id': 'address-scope-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'shared': False, - 'ip_version': 4, - } - - # Overwrite default attributes. - address_scope_attrs.update(attrs) - - address_scope = fakes.FakeResource( - info=copy.deepcopy(address_scope_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - address_scope.project_id = address_scope_attrs['tenant_id'] - - return address_scope - - @staticmethod - def create_address_scopes(attrs=None, count=2): - """Create multiple fake address scopes. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of address scopes to fake - :return: - A list of FakeResource objects faking the address scopes - """ - address_scopes = [] - for i in range(0, count): - address_scopes.append( - FakeAddressScope.create_one_address_scope(attrs)) - - return address_scopes - - @staticmethod - def get_address_scopes(address_scopes=None, count=2): - """Get an iterable MagicMock object with a list of faked address scopes. - - If address scopes list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List address scopes: - A list of FakeResource objects faking address scopes - :param int count: - The number of address scopes to fake - :return: - An iterable Mock object with side_effect set to a list of faked - address scopes - """ - if address_scopes is None: - address_scopes = FakeAddressScope.create_address_scopes(count) - return mock.MagicMock(side_effect=address_scopes) - - -class FakeAvailabilityZone(object): - """Fake one or more network availability zones (AZs).""" - - @staticmethod - def create_one_availability_zone(attrs=None): - """Create a fake AZ. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, state, etc. - """ - attrs = attrs or {} - - # Set default attributes. - availability_zone = { - 'name': uuid.uuid4().hex, - 'state': 'available', - 'resource': 'network', - } - - # Overwrite default attributes. - availability_zone.update(attrs) - - availability_zone = fakes.FakeResource( - info=copy.deepcopy(availability_zone), - loaded=True) - return availability_zone - - @staticmethod - def create_availability_zones(attrs=None, count=2): - """Create multiple fake AZs. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of AZs to fake - :return: - A list of FakeResource objects faking the AZs - """ - availability_zones = [] - for i in range(0, count): - availability_zone = \ - FakeAvailabilityZone.create_one_availability_zone(attrs) - availability_zones.append(availability_zone) - - return availability_zones - - -class FakeIPAvailability(object): - """Fake one or more network ip availabilities.""" - - @staticmethod - def create_one_ip_availability(): - """Create a fake list with ip availability stats of a network. - - :return: - A FakeResource object with network_name, network_id, etc. - """ - - # Set default attributes. - network_ip_availability = { - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'network_name': 'network-name-' + uuid.uuid4().hex, - 'tenant_id': '', - 'subnet_ip_availability': [], - 'total_ips': 254, - 'used_ips': 6, - } - - network_ip_availability = fakes.FakeResource( - info=copy.deepcopy(network_ip_availability), - loaded=True) - return network_ip_availability - - @staticmethod - def create_ip_availability(count=2): - """Create fake list of ip availability stats of multiple networks. - - :param int count: - The number of networks to fake - :return: - A list of FakeResource objects faking network ip availability stats - """ - network_ip_availabilities = [] - for i in range(0, count): - network_ip_availability = \ - FakeIPAvailability.create_one_ip_availability() - network_ip_availabilities.append(network_ip_availability) - - return network_ip_availabilities - - -class FakeExtension(object): - """Fake one or more extension.""" - - @staticmethod - def create_one_extension(attrs=None): - """Create a fake extension. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, namespace, etc. - """ - attrs = attrs or {} - - # Set default attributes. - extension_info = { - 'name': 'name-' + uuid.uuid4().hex, - 'namespace': 'http://docs.openstack.org/network/', - 'description': 'description-' + uuid.uuid4().hex, - 'updated': '2013-07-09T12:00:0-00:00', - 'alias': 'Dystopian', - 'links': '[{"href":''"https://github.com/os/network", "type"}]', - } - - # Overwrite default attributes. - extension_info.update(attrs) - - extension = fakes.FakeResource( - info=copy.deepcopy(extension_info), - loaded=True) - return extension - - -class FakeNetwork(object): - """Fake one or more networks.""" - - @staticmethod - def create_one_network(attrs=None): - """Create a fake network. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - network_attrs = { - 'id': 'network-id-' + uuid.uuid4().hex, - 'name': 'network-name-' + uuid.uuid4().hex, - 'status': 'ACTIVE', - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'admin_state_up': True, - 'shared': False, - 'subnets': ['a', 'b'], - 'provider_network_type': 'vlan', - 'router:external': True, - 'availability_zones': [], - 'availability_zone_hints': [], - 'is_default': False, - 'port_security_enabled': True, - } - - # Overwrite default attributes. - network_attrs.update(attrs) - - network = fakes.FakeResource(info=copy.deepcopy(network_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - network.project_id = network_attrs['tenant_id'] - network.is_router_external = network_attrs['router:external'] - network.is_port_security_enabled = \ - network_attrs['port_security_enabled'] - - return network - - @staticmethod - def create_networks(attrs=None, count=2): - """Create multiple fake networks. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of networks to fake - :return: - A list of FakeResource objects faking the networks - """ - networks = [] - for i in range(0, count): - networks.append(FakeNetwork.create_one_network(attrs)) - - return networks - - @staticmethod - def get_networks(networks=None, count=2): - """Get an iterable MagicMock object with a list of faked networks. - - If networks list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List networks: - A list of FakeResource objects faking networks - :param int count: - The number of networks to fake - :return: - An iterable Mock object with side_effect set to a list of faked - networks - """ - if networks is None: - networks = FakeNetwork.create_networks(count) - return mock.MagicMock(side_effect=networks) - - -class FakeNetworkSegment(object): - """Fake one or more network segments.""" - - @staticmethod - def create_one_network_segment(attrs=None): - """Create a fake network segment. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the network segment - """ - attrs = attrs or {} - - # Set default attributes. - network_segment_attrs = { - 'id': 'network-segment-id-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'network_type': 'vlan', - 'physical_network': 'physical-network-name-' + uuid.uuid4().hex, - 'segmentation_id': 1024, - } - - # Overwrite default attributes. - network_segment_attrs.update(attrs) - - network_segment = fakes.FakeResource( - info=copy.deepcopy(network_segment_attrs), - loaded=True - ) - - return network_segment - - @staticmethod - def create_network_segments(attrs=None, count=2): - """Create multiple fake network segments. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of network segments to fake - :return: - A list of FakeResource objects faking the network segments - """ - network_segments = [] - for i in range(0, count): - network_segments.append( - FakeNetworkSegment.create_one_network_segment(attrs) - ) - return network_segments - - -class FakePort(object): - """Fake one or more ports.""" - - @staticmethod - def create_one_port(attrs=None): - """Create a fake port. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - port_attrs = { - 'admin_state_up': True, - 'allowed_address_pairs': [{}], - 'binding:host_id': 'binding-host-id-' + uuid.uuid4().hex, - 'binding:profile': {}, - 'binding:vif_details': {}, - 'binding:vif_type': 'ovs', - 'binding:vnic_type': 'normal', - 'device_id': 'device-id-' + uuid.uuid4().hex, - 'device_owner': 'compute:nova', - 'dns_assignment': [{}], - 'dns_name': 'dns-name-' + uuid.uuid4().hex, - 'extra_dhcp_opts': [{}], - 'fixed_ips': [{}], - 'id': 'port-id-' + uuid.uuid4().hex, - 'mac_address': 'fa:16:3e:a9:4e:72', - 'name': 'port-name-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'port_security_enabled': True, - 'security_groups': [], - 'status': 'ACTIVE', - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - port_attrs.update(attrs) - - port = fakes.FakeResource(info=copy.deepcopy(port_attrs), - loaded=True) - - # Set attributes with special mappings in OpenStack SDK. - port.project_id = port_attrs['tenant_id'] - port.binding_host_id = port_attrs['binding:host_id'] - port.binding_profile = port_attrs['binding:profile'] - port.binding_vif_details = port_attrs['binding:vif_details'] - port.binding_vif_type = port_attrs['binding:vif_type'] - port.binding_vnic_type = port_attrs['binding:vnic_type'] - - return port - - @staticmethod - def create_ports(attrs=None, count=2): - """Create multiple fake ports. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of ports to fake - :return: - A list of FakeResource objects faking the ports - """ - ports = [] - for i in range(0, count): - ports.append(FakePort.create_one_port(attrs)) - - return ports - - @staticmethod - def get_ports(ports=None, count=2): - """Get an iterable MagicMock object with a list of faked ports. - - If ports list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List ports: - A list of FakeResource objects faking ports - :param int count: - The number of ports to fake - :return: - An iterable Mock object with side_effect set to a list of faked - ports - """ - if ports is None: - ports = FakePort.create_ports(count) - return mock.MagicMock(side_effect=ports) - - -class FakeNetworkAgent(object): - """Fake one or more network agents.""" - - @staticmethod - def create_one_network_agent(attrs=None): - """Create a fake network agent - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, agent_type, and so on. - """ - attrs = attrs or {} - - # Set default attributes - agent_attrs = { - 'id': 'agent-id-' + uuid.uuid4().hex, - 'agent_type': 'agent-type-' + uuid.uuid4().hex, - 'host': 'host-' + uuid.uuid4().hex, - 'availability_zone': 'zone-' + uuid.uuid4().hex, - 'alive': True, - 'admin_state_up': True, - 'binary': 'binary-' + uuid.uuid4().hex, - 'configurations': {'subnet': 2, 'networks': 1}, - } - agent_attrs.update(attrs) - agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs), - loaded=True) - return agent - - @staticmethod - def create_network_agents(attrs=None, count=2): - """Create multiple fake network agents. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of network agents to fake - :return: - A list of FakeResource objects faking the network agents - """ - agents = [] - for i in range(0, count): - agents.append(FakeNetworkAgent.create_one_network_agent(attrs)) - - return agents - - @staticmethod - def get_network_agents(agents=None, count=2): - """Get an iterable MagicMock object with a list of faked network agents. - - If network agents list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List agents: - A list of FakeResource objects faking network agents - :param int count: - The number of network agents to fake - :return: - An iterable Mock object with side_effect set to a list of faked - network agents - """ - if agents is None: - agents = FakeNetworkAgent.create_network_agents(count) - return mock.MagicMock(side_effect=agents) - - -class FakeNetworkRBAC(object): - """Fake one or more network rbac policies.""" - - @staticmethod - def create_one_network_rbac(attrs=None): - """Create a fake network rbac - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, action, target_tenant, - tenant_id, type - """ - attrs = attrs or {} - - # Set default attributes - rbac_attrs = { - 'id': 'rbac-id-' + uuid.uuid4().hex, - 'object_type': 'network', - 'object_id': 'object-id-' + uuid.uuid4().hex, - 'action': 'access_as_shared', - 'target_tenant': 'target-tenant-' + uuid.uuid4().hex, - 'tenant_id': 'tenant-id-' + uuid.uuid4().hex, - } - rbac_attrs.update(attrs) - rbac = fakes.FakeResource(info=copy.deepcopy(rbac_attrs), - loaded=True) - # Set attributes with special mapping in OpenStack SDK. - rbac.project_id = rbac_attrs['tenant_id'] - rbac.target_project_id = rbac_attrs['target_tenant'] - return rbac - - @staticmethod - def create_network_rbacs(attrs=None, count=2): - """Create multiple fake network rbac policies. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of rbac policies to fake - :return: - A list of FakeResource objects faking the rbac policies - """ - rbac_policies = [] - for i in range(0, count): - rbac_policies.append(FakeNetworkRBAC. - create_one_network_rbac(attrs)) - - return rbac_policies - - @staticmethod - def get_network_rbacs(rbac_policies=None, count=2): - """Get an iterable MagicMock object with a list of faked rbac policies. - - If rbac policies list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List rbac_policies: - A list of FakeResource objects faking rbac policies - :param int count: - The number of rbac policies to fake - :return: - An iterable Mock object with side_effect set to a list of faked - rbac policies - """ - if rbac_policies is None: - rbac_policies = FakeNetworkRBAC.create_network_rbacs(count) - return mock.MagicMock(side_effect=rbac_policies) - - -class FakeRouter(object): - """Fake one or more routers.""" - - @staticmethod - def create_one_router(attrs=None): - """Create a fake router. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, admin_state_up, - status, tenant_id - """ - attrs = attrs or {} - - # Set default attributes. - router_attrs = { - 'id': 'router-id-' + uuid.uuid4().hex, - 'name': 'router-name-' + uuid.uuid4().hex, - 'status': 'ACTIVE', - 'admin_state_up': True, - 'distributed': False, - 'ha': False, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'routes': [], - 'external_gateway_info': {}, - 'availability_zone_hints': [], - 'availability_zones': [], - } - - # Overwrite default attributes. - router_attrs.update(attrs) - - router = fakes.FakeResource(info=copy.deepcopy(router_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - router.project_id = router_attrs['tenant_id'] - - return router - - @staticmethod - def create_routers(attrs=None, count=2): - """Create multiple fake routers. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of routers to fake - :return: - A list of FakeResource objects faking the routers - """ - routers = [] - for i in range(0, count): - routers.append(FakeRouter.create_one_router(attrs)) - - return routers - - @staticmethod - def get_routers(routers=None, count=2): - """Get an iterable MagicMock object with a list of faked routers. - - If routers list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List routers: - A list of FakeResource objects faking routers - :param int count: - The number of routers to fake - :return: - An iterable Mock object with side_effect set to a list of faked - routers - """ - if routers is None: - routers = FakeRouter.create_routers(count) - return mock.MagicMock(side_effect=routers) - - -class FakeSecurityGroup(object): - """Fake one or more security groups.""" - - @staticmethod - def create_one_security_group(attrs=None): - """Create a fake security group. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - security_group_attrs = { - 'id': 'security-group-id-' + uuid.uuid4().hex, - 'name': 'security-group-name-' + uuid.uuid4().hex, - 'description': 'security-group-description-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'security_group_rules': [], - } - - # Overwrite default attributes. - security_group_attrs.update(attrs) - - security_group = fakes.FakeResource( - info=copy.deepcopy(security_group_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - security_group.project_id = security_group_attrs['tenant_id'] - - return security_group - - @staticmethod - def create_security_groups(attrs=None, count=2): - """Create multiple fake security groups. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of security groups to fake - :return: - A list of FakeResource objects faking the security groups - """ - security_groups = [] - for i in range(0, count): - security_groups.append( - FakeSecurityGroup.create_one_security_group(attrs)) - - return security_groups - - @staticmethod - def get_security_groups(security_groups=None, count=2): - """Get an iterable MagicMock object with a list of faked security groups. - - If security groups list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List security groups: - A list of FakeResource objects faking security groups - :param int count: - The number of security groups to fake - :return: - An iterable Mock object with side_effect set to a list of faked - security groups - """ - if security_groups is None: - security_groups = FakeSecurityGroup.create_security_groups(count) - return mock.MagicMock(side_effect=security_groups) - - -class FakeSecurityGroupRule(object): - """Fake one or more security group rules.""" - - @staticmethod - def create_one_security_group_rule(attrs=None): - """Create a fake security group rule. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, etc. - """ - attrs = attrs or {} - - # Set default attributes. - security_group_rule_attrs = { - 'direction': 'ingress', - 'ethertype': 'IPv4', - 'id': 'security-group-rule-id-' + uuid.uuid4().hex, - 'port_range_max': None, - 'port_range_min': None, - 'protocol': 'tcp', - 'remote_group_id': None, - 'remote_ip_prefix': '0.0.0.0/0', - 'security_group_id': 'security-group-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - security_group_rule_attrs.update(attrs) - - security_group_rule = fakes.FakeResource( - info=copy.deepcopy(security_group_rule_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - security_group_rule.project_id = security_group_rule_attrs['tenant_id'] - - return security_group_rule - - @staticmethod - def create_security_group_rules(attrs=None, count=2): - """Create multiple fake security group rules. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of security group rules to fake - :return: - A list of FakeResource objects faking the security group rules - """ - security_group_rules = [] - for i in range(0, count): - security_group_rules.append( - FakeSecurityGroupRule.create_one_security_group_rule(attrs)) - - return security_group_rules - - @staticmethod - def get_security_group_rules(security_group_rules=None, count=2): - """Get an iterable MagicMock object with a list of faked security group rules. - - If security group rules list is provided, then initialize the Mock - object with the list. Otherwise create one. - - :param List security group rules: - A list of FakeResource objects faking security group rules - :param int count: - The number of security group rules to fake - :return: - An iterable Mock object with side_effect set to a list of faked - security group rules - """ - if security_group_rules is None: - security_group_rules = ( - FakeSecurityGroupRule.create_security_group_rules(count)) - return mock.MagicMock(side_effect=security_group_rules) - - -class FakeSubnet(object): - """Fake one or more subnets.""" - - @staticmethod - def create_one_subnet(attrs=None): - """Create a fake subnet. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the subnet - """ - attrs = attrs or {} - - # Set default attributes. - project_id = 'project-id-' + uuid.uuid4().hex - subnet_attrs = { - 'id': 'subnet-id-' + uuid.uuid4().hex, - 'name': 'subnet-name-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'cidr': '10.10.10.0/24', - 'tenant_id': project_id, - 'enable_dhcp': True, - 'dns_nameservers': [], - 'allocation_pools': [], - 'host_routes': [], - 'ip_version': 4, - 'gateway_ip': '10.10.10.1', - 'ipv6_address_mode': None, - 'ipv6_ra_mode': None, - 'segment_id': None, - 'service_types': [], - 'subnetpool_id': None, - } - - # Overwrite default attributes. - subnet_attrs.update(attrs) - - subnet = fakes.FakeResource(info=copy.deepcopy(subnet_attrs), - loaded=True) - - # Set attributes with special mappings in OpenStack SDK. - subnet.project_id = subnet_attrs['tenant_id'] - - return subnet - - @staticmethod - def create_subnets(attrs=None, count=2): - """Create multiple fake subnets. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of subnets to fake - :return: - A list of FakeResource objects faking the subnets - """ - subnets = [] - for i in range(0, count): - subnets.append(FakeSubnet.create_one_subnet(attrs)) - - return subnets - - @staticmethod - def get_subnets(subnets=None, count=2): - """Get an iterable MagicMock object with a list of faked subnets. - - If subnets list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List subnets: - A list of FakeResource objects faking subnets - :param int count: - The number of subnets to fake - :return: - An iterable Mock object with side_effect set to a list of faked - subnets - """ - if subnets is None: - subnets = FakeSubnet.create_subnets(count) - return mock.MagicMock(side_effect=subnets) - - -class FakeFloatingIP(object): - """Fake one or more floating ip.""" - - @staticmethod - def create_one_floating_ip(attrs=None): - """Create a fake floating ip. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, ip, and so on - """ - attrs = attrs or {} - - # Set default attributes. - floating_ip_attrs = { - 'id': 'floating-ip-id-' + uuid.uuid4().hex, - 'floating_ip_address': '1.0.9.0', - 'fixed_ip_address': '2.0.9.0', - 'dns_domain': None, - 'dns_name': None, - 'status': 'DOWN', - 'floating_network_id': 'network-id-' + uuid.uuid4().hex, - 'router_id': 'router-id-' + uuid.uuid4().hex, - 'port_id': 'port-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - floating_ip_attrs.update(attrs) - - floating_ip = fakes.FakeResource( - info=copy.deepcopy(floating_ip_attrs), - loaded=True - ) - - # Set attributes with special mappings in OpenStack SDK. - floating_ip.project_id = floating_ip_attrs['tenant_id'] - - return floating_ip - - @staticmethod - def create_floating_ips(attrs=None, count=2): - """Create multiple fake floating ips. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of floating ips to fake - :return: - A list of FakeResource objects faking the floating ips - """ - floating_ips = [] - for i in range(0, count): - floating_ips.append(FakeFloatingIP.create_one_floating_ip(attrs)) - return floating_ips - - @staticmethod - def get_floating_ips(floating_ips=None, count=2): - """Get an iterable MagicMock object with a list of faked floating ips. - - If floating_ips list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List floating ips: - A list of FakeResource objects faking floating ips - :param int count: - The number of floating ips to fake - :return: - An iterable Mock object with side_effect set to a list of faked - floating ips - """ - if floating_ips is None: - floating_ips = FakeFloatingIP.create_floating_ips(count) - return mock.MagicMock(side_effect=floating_ips) - - -class FakeSubnetPool(object): - """Fake one or more subnet pools.""" - - @staticmethod - def create_one_subnet_pool(attrs=None): - """Create a fake subnet pool. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the subnet pool - """ - attrs = attrs or {} - - # Set default attributes. - subnet_pool_attrs = { - 'id': 'subnet-pool-id-' + uuid.uuid4().hex, - 'name': 'subnet-pool-name-' + uuid.uuid4().hex, - 'prefixes': ['10.0.0.0/24', '10.1.0.0/24'], - 'default_prefixlen': '8', - 'address_scope_id': 'address-scope-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'is_default': False, - 'shared': False, - 'max_prefixlen': '32', - 'min_prefixlen': '8', - 'default_quota': None, - 'ip_version': '4', - } - - # Overwrite default attributes. - subnet_pool_attrs.update(attrs) - - subnet_pool = fakes.FakeResource( - info=copy.deepcopy(subnet_pool_attrs), - loaded=True - ) - - # Set attributes with special mapping in OpenStack SDK. - subnet_pool.project_id = subnet_pool_attrs['tenant_id'] - - return subnet_pool - - @staticmethod - def create_subnet_pools(attrs=None, count=2): - """Create multiple fake subnet pools. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of subnet pools to fake - :return: - A list of FakeResource objects faking the subnet pools - """ - subnet_pools = [] - for i in range(0, count): - subnet_pools.append( - FakeSubnetPool.create_one_subnet_pool(attrs) - ) - - return subnet_pools - - @staticmethod - def get_subnet_pools(subnet_pools=None, count=2): - """Get an iterable MagicMock object with a list of faked subnet pools. - - If subnet_pools list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List subnet pools: - A list of FakeResource objects faking subnet pools - :param int count: - The number of subnet pools to fake - :return: - An iterable Mock object with side_effect set to a list of faked - subnet pools - """ - if subnet_pools is None: - subnet_pools = FakeSubnetPool.create_subnet_pools(count) - return mock.MagicMock(side_effect=subnet_pools) diff --git a/openstackclient/tests/network/v2/test_address_scope.py b/openstackclient/tests/network/v2/test_address_scope.py deleted file mode 100644 index 342cf49b..00000000 --- a/openstackclient/tests/network/v2/test_address_scope.py +++ /dev/null @@ -1,399 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import address_scope -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestAddressScope(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestAddressScope, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateAddressScope(TestAddressScope): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new address scope created. - new_address_scope = ( - network_fakes.FakeAddressScope.create_one_address_scope( - attrs={ - 'tenant_id': project.id, - } - )) - columns = ( - 'id', - 'ip_version', - 'name', - 'project_id', - 'shared' - ) - data = ( - new_address_scope.id, - new_address_scope.ip_version, - new_address_scope.name, - new_address_scope.project_id, - new_address_scope.shared, - ) - - def setUp(self): - super(TestCreateAddressScope, self).setUp() - self.network.create_address_scope = mock.Mock( - return_value=self.new_address_scope) - - # Get the command object to test - self.cmd = address_scope.CreateAddressScope(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.new_address_scope.name, - ] - verifylist = [ - ('project', None), - ('ip_version', self.new_address_scope.ip_version), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--ip-version', str(self.new_address_scope.ip_version), - '--share', - '--project', self.project.name, - '--project-domain', self.domain.name, - self.new_address_scope.name, - ] - verifylist = [ - ('ip_version', self.new_address_scope.ip_version), - ('share', True), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'shared': True, - 'tenant_id': self.project.id, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_no_share(self): - arglist = [ - '--no-share', - self.new_address_scope.name, - ] - verifylist = [ - ('no_share', True), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'shared': False, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteAddressScope(TestAddressScope): - - # The address scope to delete. - _address_scopes = ( - network_fakes.FakeAddressScope.create_address_scopes(count=2)) - - def setUp(self): - super(TestDeleteAddressScope, self).setUp() - self.network.delete_address_scope = mock.Mock(return_value=None) - self.network.find_address_scope = ( - network_fakes.FakeAddressScope.get_address_scopes( - address_scopes=self._address_scopes) - ) - - # Get the command object to test - self.cmd = address_scope.DeleteAddressScope(self.app, self.namespace) - - def test_address_scope_delete(self): - arglist = [ - self._address_scopes[0].name, - ] - verifylist = [ - ('address_scope', [self._address_scopes[0].name]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_address_scope.assert_called_once_with( - self._address_scopes[0].name, ignore_missing=False) - self.network.delete_address_scope.assert_called_once_with( - self._address_scopes[0]) - self.assertIsNone(result) - - def test_multi_address_scopes_delete(self): - arglist = [] - verifylist = [] - - for a in self._address_scopes: - arglist.append(a.name) - verifylist = [ - ('address_scope', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for a in self._address_scopes: - calls.append(call(a)) - self.network.delete_address_scope.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_address_scopes_delete_with_exception(self): - arglist = [ - self._address_scopes[0].name, - 'unexist_address_scope', - ] - verifylist = [ - ('address_scope', - [self._address_scopes[0].name, 'unexist_address_scope']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._address_scopes[0], exceptions.CommandError] - self.network.find_address_scope = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 address scopes failed to delete.', str(e)) - - self.network.find_address_scope.assert_any_call( - self._address_scopes[0].name, ignore_missing=False) - self.network.find_address_scope.assert_any_call( - 'unexist_address_scope', ignore_missing=False) - self.network.delete_address_scope.assert_called_once_with( - self._address_scopes[0] - ) - - -class TestListAddressScope(TestAddressScope): - - # The address scopes to list up. - address_scopes = ( - network_fakes.FakeAddressScope.create_address_scopes(count=3)) - columns = ( - 'ID', - 'Name', - 'IP Version', - 'Shared', - 'Project', - ) - data = [] - for scope in address_scopes: - data.append(( - scope.id, - scope.name, - scope.ip_version, - scope.shared, - scope.project_id, - )) - - def setUp(self): - super(TestListAddressScope, self).setUp() - self.network.address_scopes = mock.Mock( - return_value=self.address_scopes) - - # Get the command object to test - self.cmd = address_scope.ListAddressScope(self.app, self.namespace) - - def test_address_scope_list(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.address_scopes.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetAddressScope(TestAddressScope): - - # The address scope to set. - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - def setUp(self): - super(TestSetAddressScope, self).setUp() - self.network.update_address_scope = mock.Mock(return_value=None) - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = address_scope.SetAddressScope(self.app, self.namespace) - - def test_set_nothing(self): - arglist = [self._address_scope.name, ] - verifylist = [ - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - def test_set_name_and_share(self): - arglist = [ - '--name', 'new_address_scope', - '--share', - self._address_scope.name, - ] - verifylist = [ - ('name', 'new_address_scope'), - ('share', True), - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'name': "new_address_scope", - 'shared': True, - } - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - def test_set_no_share(self): - arglist = [ - '--no-share', - self._address_scope.name, - ] - verifylist = [ - ('no_share', True), - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'shared': False, - } - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - -class TestShowAddressScope(TestAddressScope): - - # The address scope to show. - _address_scope = ( - network_fakes.FakeAddressScope.create_one_address_scope()) - columns = ( - 'id', - 'ip_version', - 'name', - 'project_id', - 'shared', - ) - data = ( - _address_scope.id, - _address_scope.ip_version, - _address_scope.name, - _address_scope.project_id, - _address_scope.shared, - ) - - def setUp(self): - super(TestShowAddressScope, self).setUp() - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = address_scope.ShowAddressScope(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._address_scope.name, - ] - verifylist = [ - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_address_scope.assert_called_once_with( - self._address_scope.name, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/network/v2/test_floating_ip.py b/openstackclient/tests/network/v2/test_floating_ip.py deleted file mode 100644 index 234fe446..00000000 --- a/openstackclient/tests/network/v2/test_floating_ip.py +++ /dev/null @@ -1,565 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import floating_ip -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -# Tests for Neutron network -# -class TestFloatingIPNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestFloatingIPNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestCreateFloatingIPNetwork(TestFloatingIPNetwork): - - # Fake data for option tests. - floating_network = network_fakes.FakeNetwork.create_one_network() - subnet = network_fakes.FakeSubnet.create_one_subnet() - port = network_fakes.FakePort.create_one_port() - - # The floating ip to be deleted. - floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip( - attrs={ - 'floating_network_id': floating_network.id, - 'port_id': port.id, - } - ) - - columns = ( - 'dns_domain', - 'dns_name', - 'fixed_ip_address', - 'floating_ip_address', - 'floating_network_id', - 'id', - 'port_id', - 'project_id', - 'router_id', - 'status', - ) - - data = ( - floating_ip.dns_domain, - floating_ip.dns_name, - floating_ip.fixed_ip_address, - floating_ip.floating_ip_address, - floating_ip.floating_network_id, - floating_ip.id, - floating_ip.port_id, - floating_ip.project_id, - floating_ip.router_id, - floating_ip.status, - ) - - def setUp(self): - super(TestCreateFloatingIPNetwork, self).setUp() - - self.network.create_ip = mock.Mock(return_value=self.floating_ip) - - self.network.find_network = mock.Mock( - return_value=self.floating_network) - self.network.find_subnet = mock.Mock(return_value=self.subnet) - self.network.find_port = mock.Mock(return_value=self.port) - - # Get the command object to test - self.cmd = floating_ip.CreateFloatingIP(self.app, self.namespace) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.floating_ip.floating_network_id, - ] - verifylist = [ - ('network', self.floating_ip.floating_network_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_ip.assert_called_once_with(**{ - 'floating_network_id': self.floating_ip.floating_network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--subnet', self.subnet.id, - '--port', self.floating_ip.port_id, - '--floating-ip-address', self.floating_ip.floating_ip_address, - '--fixed-ip-address', self.floating_ip.fixed_ip_address, - self.floating_ip.floating_network_id, - ] - verifylist = [ - ('subnet', self.subnet.id), - ('port', self.floating_ip.port_id), - ('floating_ip_address', self.floating_ip.floating_ip_address), - ('fixed_ip_address', self.floating_ip.fixed_ip_address), - ('network', self.floating_ip.floating_network_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_ip.assert_called_once_with(**{ - 'subnet_id': self.subnet.id, - 'port_id': self.floating_ip.port_id, - 'floating_ip_address': self.floating_ip.floating_ip_address, - 'fixed_ip_address': self.floating_ip.fixed_ip_address, - 'floating_network_id': self.floating_ip.floating_network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ips to be deleted. - floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2) - - def setUp(self): - super(TestDeleteFloatingIPNetwork, self).setUp() - - self.network.delete_ip = mock.Mock(return_value=None) - self.network.find_ip = ( - network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) - - # Get the command object to test - self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace) - - def test_floating_ip_delete(self): - arglist = [ - self.floating_ips[0].id, - ] - verifylist = [ - ('floating_ip', [self.floating_ips[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.find_ip.assert_called_once_with( - self.floating_ips[0].id, ignore_missing=False) - self.network.delete_ip.assert_called_once_with(self.floating_ips[0]) - self.assertIsNone(result) - - def test_multi_floating_ips_delete(self): - arglist = [] - verifylist = [] - - for f in self.floating_ips: - arglist.append(f.id) - verifylist = [ - ('floating_ip', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for f in self.floating_ips: - calls.append(call(f)) - self.network.delete_ip.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_floating_ips_delete_with_exception(self): - arglist = [ - self.floating_ips[0].id, - 'unexist_floating_ip', - ] - verifylist = [ - ('floating_ip', - [self.floating_ips[0].id, 'unexist_floating_ip']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.network.find_ip = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - - self.network.find_ip.assert_any_call( - self.floating_ips[0].id, ignore_missing=False) - self.network.find_ip.assert_any_call( - 'unexist_floating_ip', ignore_missing=False) - self.network.delete_ip.assert_called_once_with( - self.floating_ips[0] - ) - - -class TestListFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ips to list up - floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=3) - - columns = ( - 'ID', - 'Floating IP Address', - 'Fixed IP Address', - 'Port', - ) - - data = [] - for ip in floating_ips: - data.append(( - ip.id, - ip.floating_ip_address, - ip.fixed_ip_address, - ip.port_id, - )) - - def setUp(self): - super(TestListFloatingIPNetwork, self).setUp() - - self.network.ips = mock.Mock(return_value=self.floating_ips) - - # Get the command object to test - self.cmd = floating_ip.ListFloatingIP(self.app, self.namespace) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ips.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ip to display. - floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'dns_domain', - 'dns_name', - 'fixed_ip_address', - 'floating_ip_address', - 'floating_network_id', - 'id', - 'port_id', - 'project_id', - 'router_id', - 'status', - ) - - data = ( - floating_ip.dns_domain, - floating_ip.dns_name, - floating_ip.fixed_ip_address, - floating_ip.floating_ip_address, - floating_ip.floating_network_id, - floating_ip.id, - floating_ip.port_id, - floating_ip.tenant_id, - floating_ip.router_id, - floating_ip.status, - ) - - def setUp(self): - super(TestShowFloatingIPNetwork, self).setUp() - - self.network.find_ip = mock.Mock(return_value=self.floating_ip) - - # Get the command object to test - self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace) - - def test_floating_ip_show(self): - arglist = [ - self.floating_ip.id, - ] - verifylist = [ - ('floating_ip', self.floating_ip.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_ip.assert_called_once_with( - self.floating_ip.id, - ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -# Tests for Nova network -# -class TestFloatingIPCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestFloatingIPCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to be deleted. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestCreateFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.create.return_value = self.floating_ip - - # Get the command object to test - self.cmd = floating_ip.CreateFloatingIP(self.app, None) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.floating_ip.pool, - ] - verifylist = [ - ('network', self.floating_ip.pool), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.create.assert_called_once_with( - self.floating_ip.pool) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be deleted. - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2) - - def setUp(self): - super(TestDeleteFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.delete.return_value = None - - # Return value of utils.find_resource() - self.compute.floating_ips.get = ( - compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) - - # Get the command object to test - self.cmd = floating_ip.DeleteFloatingIP(self.app, None) - - def test_floating_ip_delete(self): - arglist = [ - self.floating_ips[0].id, - ] - verifylist = [ - ('floating_ip', [self.floating_ips[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - self.assertIsNone(result) - - def test_multi_floating_ips_delete(self): - arglist = [] - verifylist = [] - - for f in self.floating_ips: - arglist.append(f.id) - verifylist = [ - ('floating_ip', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for f in self.floating_ips: - calls.append(call(f.id)) - self.compute.floating_ips.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_floating_ips_delete_with_exception(self): - arglist = [ - self.floating_ips[0].id, - 'unexist_floating_ip', - ] - verifylist = [ - ('floating_ip', - [self.floating_ips[0].id, 'unexist_floating_ip']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.compute.floating_ips.get = ( - mock.MagicMock(side_effect=find_mock_result) - ) - self.compute.floating_ips.find.side_effect = exceptions.NotFound(None) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - - self.compute.floating_ips.get.assert_any_call( - self.floating_ips[0].id) - self.compute.floating_ips.get.assert_any_call( - 'unexist_floating_ip') - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - - -class TestListFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be list up - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3) - - columns = ( - 'ID', - 'Floating IP Address', - 'Fixed IP Address', - 'Server', - 'Pool', - ) - - data = [] - for ip in floating_ips: - data.append(( - ip.id, - ip.ip, - ip.fixed_ip, - ip.instance_id, - ip.pool, - )) - - def setUp(self): - super(TestListFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.list.return_value = self.floating_ips - - # Get the command object to test - self.cmd = floating_ip.ListFloatingIP(self.app, None) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to display. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestShowFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Return value of utils.find_resource() - self.compute.floating_ips.get.return_value = self.floating_ip - - # Get the command object to test - self.cmd = floating_ip.ShowFloatingIP(self.app, None) - - def test_floating_ip_show(self): - arglist = [ - self.floating_ip.id, - ] - verifylist = [ - ('floating_ip', self.floating_ip.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_floating_ip_pool.py b/openstackclient/tests/network/v2/test_floating_ip_pool.py deleted file mode 100644 index 22d20d20..00000000 --- a/openstackclient/tests/network/v2/test_floating_ip_pool.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from osc_lib import exceptions - -from openstackclient.network.v2 import floating_ip_pool -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes - - -# Tests for Network API v2 -# -class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestFloatingIPPoolNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork): - - def setUp(self): - super(TestListFloatingIPPoolNetwork, self).setUp() - - # Get the command object to test - self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, - self.namespace) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - -# Tests for Compute network -# -class TestFloatingIPPoolCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestFloatingIPPoolCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestListFloatingIPPoolCompute(TestFloatingIPPoolCompute): - - # The floating ip pools to list up - floating_ip_pools = \ - compute_fakes.FakeFloatingIPPool.create_floating_ip_pools(count=3) - - columns = ( - 'Name', - ) - - data = [] - for pool in floating_ip_pools: - data.append(( - pool.name, - )) - - def setUp(self): - super(TestListFloatingIPPoolCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ip_pools.list.return_value = \ - self.floating_ip_pools - - # Get the command object to test - self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ip_pools.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/network/v2/test_ip_availability.py b/openstackclient/tests/network/v2/test_ip_availability.py deleted file mode 100644 index f74bf8f7..00000000 --- a/openstackclient/tests/network/v2/test_ip_availability.py +++ /dev/null @@ -1,172 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from osc_lib import utils as common_utils - -from openstackclient.network.v2 import ip_availability -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestIPAvailability(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestIPAvailability, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - - self.project = identity_fakes.FakeProject.create_one_project() - self.projects_mock.get.return_value = self.project - - -class TestListIPAvailability(TestIPAvailability): - - _ip_availability = \ - network_fakes.FakeIPAvailability.create_ip_availability(count=3) - columns = ( - 'Network ID', - 'Network Name', - 'Total IPs', - 'Used IPs', - ) - data = [] - for net in _ip_availability: - data.append(( - net.network_id, - net.network_name, - net.total_ips, - net.used_ips, - )) - - def setUp(self): - super(TestListIPAvailability, self).setUp() - - self.cmd = ip_availability.ListIPAvailability( - self.app, self.namespace) - self.network.network_ip_availabilities = mock.Mock( - return_value=self._ip_availability) - - def test_list_no_options(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_ip_version(self): - arglist = [ - '--ip-version', str(4), - ] - verifylist = [ - ('ip_version', 4) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_project(self): - arglist = [ - '--project', self.project.name - ] - verifylist = [ - ('project', self.project.name) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'tenant_id': self.project.id, - 'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowIPAvailability(TestIPAvailability): - - _ip_availability = \ - network_fakes.FakeIPAvailability.create_one_ip_availability() - - columns = ( - 'network_id', - 'network_name', - 'project_id', - 'subnet_ip_availability', - 'total_ips', - 'used_ips', - ) - data = ( - _ip_availability.network_id, - _ip_availability.network_name, - _ip_availability.tenant_id, - common_utils.format_list( - _ip_availability.subnet_ip_availability), - _ip_availability.total_ips, - _ip_availability.used_ips, - ) - - def setUp(self): - super(TestShowIPAvailability, self).setUp() - - self.network.find_network_ip_availability = mock.Mock( - return_value=self._ip_availability) - - # Get the command object to test - self.cmd = ip_availability.ShowIPAvailability( - self.app, self.namespace) - - def test_show_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._ip_availability.network_name, - ] - verifylist = [ - ('network', self._ip_availability.network_name) - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - self.network.find_network_ip_availability.assert_called_once_with( - self._ip_availability.network_name, - ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_network.py b/openstackclient/tests/network/v2/test_network.py deleted file mode 100644 index bb606819..00000000 --- a/openstackclient/tests/network/v2/test_network.py +++ /dev/null @@ -1,1059 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import network -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests import fakes -from openstackclient.tests.identity.v2_0 import fakes as identity_fakes_v2 -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -# Tests for Neutron network -# -class TestNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateNetworkIdentityV3(TestNetwork): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new network created. - _network = network_fakes.FakeNetwork.create_one_network( - attrs={ - 'tenant_id': project.id, - 'availability_zone_hints': ["nova"], - } - ) - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestCreateNetworkIdentityV3, self).setUp() - - self.network.create_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self._network.name, - ] - verifylist = [ - ('name', self._network.name), - ('enable', True), - ('share', None), - ('project', None), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - "--disable", - "--share", - "--project", self.project.name, - "--project-domain", self.domain.name, - "--availability-zone-hint", "nova", - "--external", "--default", - "--provider-network-type", "vlan", - "--provider-physical-network", "physnet1", - "--provider-segment", "400", - "--transparent-vlan", - "--enable-port-security", - self._network.name, - ] - verifylist = [ - ('disable', True), - ('share', True), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('availability_zone_hints', ["nova"]), - ('external', True), - ('default', True), - ('provider_network_type', 'vlan'), - ('physical_network', 'physnet1'), - ('segmentation_id', '400'), - ('transparent_vlan', True), - ('enable_port_security', True), - ('name', self._network.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': False, - 'availability_zone_hints': ["nova"], - 'name': self._network.name, - 'shared': True, - 'tenant_id': self.project.id, - 'is_default': True, - 'router:external': True, - 'provider:network_type': 'vlan', - 'provider:physical_network': 'physnet1', - 'provider:segmentation_id': '400', - 'vlan_transparent': True, - 'port_security_enabled': True, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_other_options(self): - arglist = [ - "--enable", - "--no-share", - "--disable-port-security", - self._network.name, - ] - verifylist = [ - ('enable', True), - ('no_share', True), - ('name', self._network.name), - ('external', False), - ('disable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - 'shared': False, - 'port_security_enabled': False, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestCreateNetworkIdentityV2(TestNetwork): - - project = identity_fakes_v2.FakeProject.create_one_project() - # The new network created. - _network = network_fakes.FakeNetwork.create_one_network( - attrs={'tenant_id': project.id} - ) - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestCreateNetworkIdentityV2, self).setUp() - - self.network.create_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, self.namespace) - - # Set identity client v2. And get a shortcut to Identity client. - identity_client = identity_fakes_v2.FakeIdentityv2Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - self.app.client_manager.identity = identity_client - self.identity = self.app.client_manager.identity - - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.identity.tenants - self.projects_mock.get.return_value = self.project - - # There is no DomainManager Mock in fake identity v2. - - def test_create_with_project_identityv2(self): - arglist = [ - "--project", self.project.name, - self._network.name, - ] - verifylist = [ - ('enable', True), - ('share', None), - ('name', self._network.name), - ('project', self.project.name), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_with_domain_identityv2(self): - arglist = [ - "--project", self.project.name, - "--project-domain", "domain-name", - self._network.name, - ] - verifylist = [ - ('enable', True), - ('share', None), - ('project', self.project.name), - ('project_domain', "domain-name"), - ('name', self._network.name), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.assertRaises( - AttributeError, - self.cmd.take_action, - parsed_args, - ) - - -class TestDeleteNetwork(TestNetwork): - - def setUp(self): - super(TestDeleteNetwork, self).setUp() - - # The networks to delete - self._networks = network_fakes.FakeNetwork.create_networks(count=3) - - self.network.delete_network = mock.Mock(return_value=None) - - self.network.find_network = network_fakes.FakeNetwork.get_networks( - networks=self._networks) - - # Get the command object to test - self.cmd = network.DeleteNetwork(self.app, self.namespace) - - def test_delete_one_network(self): - arglist = [ - self._networks[0].name, - ] - verifylist = [ - ('network', [self._networks[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_network.assert_called_once_with(self._networks[0]) - self.assertIsNone(result) - - def test_delete_multiple_networks(self): - arglist = [] - for n in self._networks: - arglist.append(n.id) - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self._networks: - calls.append(call(n)) - self.network.delete_network.assert_has_calls(calls) - self.assertIsNone(result) - - def test_delete_multiple_networks_exception(self): - arglist = [ - self._networks[0].id, - 'xxxx-yyyy-zzzz', - self._networks[1].id, - ] - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # Fake exception in find_network() - ret_find = [ - self._networks[0], - exceptions.NotFound('404'), - self._networks[1], - ] - self.network.find_network = mock.Mock(side_effect=ret_find) - - # Fake exception in delete_network() - ret_delete = [ - None, - exceptions.NotFound('404'), - ] - self.network.delete_network = mock.Mock(side_effect=ret_delete) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - # The second call of find_network() should fail. So delete_network() - # was only called twice. - calls = [ - call(self._networks[0]), - call(self._networks[1]), - ] - self.network.delete_network.assert_has_calls(calls) - - -class TestListNetwork(TestNetwork): - - # The networks going to be listed up. - _network = network_fakes.FakeNetwork.create_networks(count=3) - - columns = ( - 'ID', - 'Name', - 'Subnets', - ) - columns_long = ( - 'ID', - 'Name', - 'Status', - 'Project', - 'State', - 'Shared', - 'Subnets', - 'Network Type', - 'Router Type', - 'Availability Zones', - ) - - data = [] - for net in _network: - data.append(( - net.id, - net.name, - utils.format_list(net.subnets), - )) - - data_long = [] - for net in _network: - data_long.append(( - net.id, - net.name, - net.status, - net.project_id, - network._format_admin_state(net.admin_state_up), - net.shared, - utils.format_list(net.subnets), - net.provider_network_type, - network._format_router_external(net.is_router_external), - utils.format_list(net.availability_zones), - )) - - def setUp(self): - super(TestListNetwork, self).setUp() - - # Get the command object to test - self.cmd = network.ListNetwork(self.app, self.namespace) - - self.network.networks = mock.Mock(return_value=self._network) - - def test_network_list_no_options(self): - arglist = [] - verifylist = [ - ('external', False), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_external(self): - arglist = [ - '--external', - ] - verifylist = [ - ('external', True), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with( - **{'router:external': True} - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_network_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ('external', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestSetNetwork(TestNetwork): - - # The network to set. - _network = network_fakes.FakeNetwork.create_one_network() - - def setUp(self): - super(TestSetNetwork, self).setUp() - - self.network.update_network = mock.Mock(return_value=None) - - self.network.find_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.SetNetwork(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - self._network.name, - '--enable', - '--name', 'noob', - '--share', - '--external', - '--default', - '--provider-network-type', 'vlan', - '--provider-physical-network', 'physnet1', - '--provider-segment', '400', - '--no-transparent-vlan', - '--enable-port-security', - ] - verifylist = [ - ('network', self._network.name), - ('enable', True), - ('name', 'noob'), - ('share', True), - ('external', True), - ('default', True), - ('provider_network_type', 'vlan'), - ('physical_network', 'physnet1'), - ('segmentation_id', '400'), - ('no_transparent_vlan', True), - ('enable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'name': 'noob', - 'admin_state_up': True, - 'shared': True, - 'router:external': True, - 'is_default': True, - 'provider:network_type': 'vlan', - 'provider:physical_network': 'physnet1', - 'provider:segmentation_id': '400', - 'vlan_transparent': False, - 'port_security_enabled': True, - } - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - self._network.name, - '--disable', - '--no-share', - '--internal', - '--disable-port-security', - ] - verifylist = [ - ('network', self._network.name), - ('disable', True), - ('no_share', True), - ('internal', True), - ('disable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'shared': False, - 'router:external': False, - 'port_security_enabled': False, - } - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._network.name, ] - verifylist = [('network', self._network.name), ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - -class TestShowNetwork(TestNetwork): - - # The network to show. - _network = network_fakes.FakeNetwork.create_one_network() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestShowNetwork, self).setUp() - - self.network.find_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.ShowNetwork(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network.name, - ] - verifylist = [ - ('network', self._network.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_network.assert_called_once_with( - self._network.name, ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -# Tests for Nova network -# -class TestNetworkCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestNetworkCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateNetworkCompute(TestNetworkCompute): - - # The network to create. - _network = compute_fakes.FakeNetwork.create_one_network() - - columns = ( - 'bridge', - 'bridge_interface', - 'broadcast', - 'cidr', - 'cidr_v6', - 'created_at', - 'deleted', - 'deleted_at', - 'dhcp_server', - 'dhcp_start', - 'dns1', - 'dns2', - 'enable_dhcp', - 'gateway', - 'gateway_v6', - 'host', - 'id', - 'injected', - 'label', - 'mtu', - 'multi_host', - 'netmask', - 'netmask_v6', - 'priority', - 'project_id', - 'rxtx_base', - 'share_address', - 'updated_at', - 'vlan', - 'vpn_private_address', - 'vpn_public_address', - 'vpn_public_port', - ) - - data = ( - _network.bridge, - _network.bridge_interface, - _network.broadcast, - _network.cidr, - _network.cidr_v6, - _network.created_at, - _network.deleted, - _network.deleted_at, - _network.dhcp_server, - _network.dhcp_start, - _network.dns1, - _network.dns2, - _network.enable_dhcp, - _network.gateway, - _network.gateway_v6, - _network.host, - _network.id, - _network.injected, - _network.label, - _network.mtu, - _network.multi_host, - _network.netmask, - _network.netmask_v6, - _network.priority, - _network.project_id, - _network.rxtx_base, - _network.share_address, - _network.updated_at, - _network.vlan, - _network.vpn_private_address, - _network.vpn_public_address, - _network.vpn_public_port, - ) - - def setUp(self): - super(TestCreateNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.networks.create.return_value = self._network - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, None) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should raise exception here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - "--subnet", self._network.cidr, - self._network.label, - ] - verifylist = [ - ('subnet', self._network.cidr), - ('name', self._network.label), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.networks.create.assert_called_once_with(**{ - 'cidr': self._network.cidr, - 'label': self._network.label, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteNetworkCompute(TestNetworkCompute): - - def setUp(self): - super(TestDeleteNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # The networks to delete - self._networks = compute_fakes.FakeNetwork.create_networks(count=3) - - self.compute.networks.delete.return_value = None - - # Return value of utils.find_resource() - self.compute.networks.get = \ - compute_fakes.FakeNetwork.get_networks(networks=self._networks) - - # Get the command object to test - self.cmd = network.DeleteNetwork(self.app, None) - - def test_delete_one_network(self): - arglist = [ - self._networks[0].label, - ] - verifylist = [ - ('network', [self._networks[0].label]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.networks.delete.assert_called_once_with( - self._networks[0].id) - self.assertIsNone(result) - - def test_delete_multiple_networks(self): - arglist = [] - for n in self._networks: - arglist.append(n.label) - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self._networks: - calls.append(call(n.id)) - self.compute.networks.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_delete_multiple_networks_exception(self): - arglist = [ - self._networks[0].id, - 'xxxx-yyyy-zzzz', - self._networks[1].id, - ] - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # Fake exception in utils.find_resource() - # In compute v2, we use utils.find_resource() to find a network. - # It calls get() several times, but find() only one time. So we - # choose to fake get() always raise exception, then pass through. - # And fake find() to find the real network or not. - self.compute.networks.get.side_effect = Exception() - ret_find = [ - self._networks[0], - Exception(), - self._networks[1], - ] - self.compute.networks.find.side_effect = ret_find - - # Fake exception in delete() - ret_delete = [ - None, - Exception(), - ] - self.compute.networks.delete = mock.Mock(side_effect=ret_delete) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - # The second call of utils.find_resource() should fail. So delete() - # was only called twice. - calls = [ - call(self._networks[0].id), - call(self._networks[1].id), - ] - self.compute.networks.delete.assert_has_calls(calls) - - -class TestListNetworkCompute(TestNetworkCompute): - - # The networks going to be listed up. - _networks = compute_fakes.FakeNetwork.create_networks(count=3) - - columns = ( - 'ID', - 'Name', - 'Subnet', - ) - - data = [] - for net in _networks: - data.append(( - net.id, - net.label, - net.cidr, - )) - - def setUp(self): - super(TestListNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.networks.list.return_value = self._networks - - # Get the command object to test - self.cmd = network.ListNetwork(self.app, None) - - def test_network_list_no_options(self): - arglist = [] - verifylist = [ - ('external', False), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.compute.networks.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowNetworkCompute(TestNetworkCompute): - - # The network to show. - _network = compute_fakes.FakeNetwork.create_one_network() - - columns = ( - 'bridge', - 'bridge_interface', - 'broadcast', - 'cidr', - 'cidr_v6', - 'created_at', - 'deleted', - 'deleted_at', - 'dhcp_server', - 'dhcp_start', - 'dns1', - 'dns2', - 'enable_dhcp', - 'gateway', - 'gateway_v6', - 'host', - 'id', - 'injected', - 'label', - 'mtu', - 'multi_host', - 'netmask', - 'netmask_v6', - 'priority', - 'project_id', - 'rxtx_base', - 'share_address', - 'updated_at', - 'vlan', - 'vpn_private_address', - 'vpn_public_address', - 'vpn_public_port', - ) - - data = ( - _network.bridge, - _network.bridge_interface, - _network.broadcast, - _network.cidr, - _network.cidr_v6, - _network.created_at, - _network.deleted, - _network.deleted_at, - _network.dhcp_server, - _network.dhcp_start, - _network.dns1, - _network.dns2, - _network.enable_dhcp, - _network.gateway, - _network.gateway_v6, - _network.host, - _network.id, - _network.injected, - _network.label, - _network.mtu, - _network.multi_host, - _network.netmask, - _network.netmask_v6, - _network.priority, - _network.project_id, - _network.rxtx_base, - _network.share_address, - _network.updated_at, - _network.vlan, - _network.vpn_private_address, - _network.vpn_public_address, - _network.vpn_public_port, - ) - - def setUp(self): - super(TestShowNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Return value of utils.find_resource() - self.compute.networks.get.return_value = self._network - - # Get the command object to test - self.cmd = network.ShowNetwork(self.app, None) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network.label, - ] - verifylist = [ - ('network', self._network.label), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_network_agent.py b/openstackclient/tests/network/v2/test_network_agent.py deleted file mode 100644 index 269d4e1d..00000000 --- a/openstackclient/tests/network/v2/test_network_agent.py +++ /dev/null @@ -1,294 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import network_agent -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkAgent(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkAgent, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestDeleteNetworkAgent(TestNetworkAgent): - - network_agents = ( - network_fakes.FakeNetworkAgent.create_network_agents(count=2)) - - def setUp(self): - super(TestDeleteNetworkAgent, self).setUp() - self.network.delete_agent = mock.Mock(return_value=None) - self.network.get_agent = ( - network_fakes.FakeNetworkAgent.get_network_agents( - agents=self.network_agents) - ) - - # Get the command object to test - self.cmd = network_agent.DeleteNetworkAgent(self.app, self.namespace) - - def test_network_agent_delete(self): - arglist = [ - self.network_agents[0].id, - ] - verifylist = [ - ('network_agent', [self.network_agents[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.get_agent.assert_called_once_with( - self.network_agents[0].id, ignore_missing=False) - self.network.delete_agent.assert_called_once_with( - self.network_agents[0]) - self.assertIsNone(result) - - def test_multi_network_agents_delete(self): - arglist = [] - verifylist = [] - - for n in self.network_agents: - arglist.append(n.id) - verifylist = [ - ('network_agent', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self.network_agents: - calls.append(call(n)) - self.network.delete_agent.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_network_agents_delete_with_exception(self): - arglist = [ - self.network_agents[0].id, - 'unexist_network_agent', - ] - verifylist = [ - ('network_agent', - [self.network_agents[0].id, 'unexist_network_agent']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.network_agents[0], exceptions.CommandError] - self.network.get_agent = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 network agents failed to delete.', str(e)) - - self.network.get_agent.assert_any_call( - self.network_agents[0].id, ignore_missing=False) - self.network.get_agent.assert_any_call( - 'unexist_network_agent', ignore_missing=False) - self.network.delete_agent.assert_called_once_with( - self.network_agents[0] - ) - - -class TestListNetworkAgent(TestNetworkAgent): - - network_agents = ( - network_fakes.FakeNetworkAgent.create_network_agents(count=3)) - - columns = ( - 'ID', - 'Agent Type', - 'Host', - 'Availability Zone', - 'Alive', - 'State', - 'Binary' - ) - data = [] - for agent in network_agents: - data.append(( - agent.id, - agent.agent_type, - agent.host, - agent.availability_zone, - agent.alive, - network_agent._format_admin_state(agent.admin_state_up), - agent.binary, - )) - - def setUp(self): - super(TestListNetworkAgent, self).setUp() - self.network.agents = mock.Mock( - return_value=self.network_agents) - - # Get the command object to test - self.cmd = network_agent.ListNetworkAgent(self.app, self.namespace) - - def test_network_agents_list(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.agents.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetNetworkAgent(TestNetworkAgent): - - _network_agent = ( - network_fakes.FakeNetworkAgent.create_one_network_agent()) - - def setUp(self): - super(TestSetNetworkAgent, self).setUp() - self.network.update_agent = mock.Mock(return_value=None) - self.network.get_agent = mock.Mock(return_value=self._network_agent) - - # Get the command object to test - self.cmd = network_agent.SetNetworkAgent(self.app, self.namespace) - - def test_set_nothing(self): - arglist = [ - self._network_agent.id, - ] - verifylist = [ - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - def test_set_all(self): - arglist = [ - '--description', 'new_description', - '--enable', - self._network_agent.id, - ] - verifylist = [ - ('description', 'new_description'), - ('enable', True), - ('disable', False), - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'description': 'new_description', - 'admin_state_up': True, - } - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - def test_set_with_disable(self): - arglist = [ - '--disable', - self._network_agent.id, - ] - verifylist = [ - ('enable', False), - ('disable', True), - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - } - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - -class TestShowNetworkAgent(TestNetworkAgent): - - _network_agent = ( - network_fakes.FakeNetworkAgent.create_one_network_agent()) - - columns = ( - 'admin_state_up', - 'agent_type', - 'alive', - 'availability_zone', - 'binary', - 'configurations', - 'host', - 'id', - ) - data = ( - network_agent._format_admin_state(_network_agent.admin_state_up), - _network_agent.agent_type, - _network_agent.alive, - _network_agent.availability_zone, - _network_agent.binary, - utils.format_dict(_network_agent.configurations), - _network_agent.host, - _network_agent.id, - ) - - def setUp(self): - super(TestShowNetworkAgent, self).setUp() - self.network.get_agent = mock.Mock( - return_value=self._network_agent) - - # Get the command object to test - self.cmd = network_agent.ShowNetworkAgent(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network_agent.id, - ] - verifylist = [ - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.get_agent.assert_called_once_with( - self._network_agent.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/network/v2/test_network_rbac.py b/openstackclient/tests/network/v2/test_network_rbac.py deleted file mode 100644 index 9250e91b..00000000 --- a/openstackclient/tests/network/v2/test_network_rbac.py +++ /dev/null @@ -1,430 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import network_rbac -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkRBAC(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkRBAC, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - - -class TestCreateNetworkRBAC(TestNetworkRBAC): - - network_object = network_fakes.FakeNetwork.create_one_network() - project = identity_fakes_v3.FakeProject.create_one_project() - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( - attrs={'tenant_id': project.id, - 'target_tenant': project.id, - 'object_id': network_object.id} - ) - - columns = ( - 'action', - 'id', - 'object_id', - 'object_type', - 'project_id', - 'target_project_id', - ) - - data = [ - rbac_policy.action, - rbac_policy.id, - rbac_policy.object_id, - rbac_policy.object_type, - rbac_policy.tenant_id, - rbac_policy.target_tenant, - ] - - def setUp(self): - super(TestCreateNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace) - - self.network.create_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - self.network.find_network = mock.Mock( - return_value=self.network_object) - self.projects_mock.get.return_value = self.project - - def test_network_rbac_create_no_type(self): - arglist = [ - '--action', self.rbac_policy.action, - self.rbac_policy.object_id, - ] - verifylist = [ - ('action', self.rbac_policy.action), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_no_action(self): - arglist = [ - '--type', self.rbac_policy.object_type, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_invalid_type(self): - arglist = [ - '--action', self.rbac_policy.action, - '--type', 'invalid_type', - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('action', self.rbac_policy.action), - ('type', 'invalid_type'), - ('target-project', self.rbac_policy.target_tenant), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_invalid_action(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', 'invalid_action', - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', 'invalid_action'), - ('target-project', self.rbac_policy.target_tenant), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', self.rbac_policy.action, - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', self.rbac_policy.action), - ('target_project', self.rbac_policy.target_tenant), - ('rbac_object', self.rbac_policy.object_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_rbac_policy.assert_called_with(**{ - 'object_id': self.rbac_policy.object_id, - 'object_type': self.rbac_policy.object_type, - 'action': self.rbac_policy.action, - 'target_tenant': self.rbac_policy.target_tenant, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_network_rbac_create_all_options(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', self.rbac_policy.action, - '--target-project', self.rbac_policy.target_tenant, - '--project', self.rbac_policy.tenant_id, - '--project-domain', self.project.domain_id, - '--target-project-domain', self.project.domain_id, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', self.rbac_policy.action), - ('target_project', self.rbac_policy.target_tenant), - ('project', self.rbac_policy.tenant_id), - ('project_domain', self.project.domain_id), - ('target_project_domain', self.project.domain_id), - ('rbac_object', self.rbac_policy.object_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_rbac_policy.assert_called_with(**{ - 'object_id': self.rbac_policy.object_id, - 'object_type': self.rbac_policy.object_type, - 'action': self.rbac_policy.action, - 'target_tenant': self.rbac_policy.target_tenant, - 'tenant_id': self.rbac_policy.tenant_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestDeleteNetworkRBAC(TestNetworkRBAC): - - rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2) - - def setUp(self): - super(TestDeleteNetworkRBAC, self).setUp() - self.network.delete_rbac_policy = mock.Mock(return_value=None) - self.network.find_rbac_policy = ( - network_fakes.FakeNetworkRBAC.get_network_rbacs( - rbac_policies=self.rbac_policies) - ) - - # Get the command object to test - self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace) - - def test_network_rbac_delete(self): - arglist = [ - self.rbac_policies[0].id, - ] - verifylist = [ - ('rbac_policy', [self.rbac_policies[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policies[0].id, ignore_missing=False) - self.network.delete_rbac_policy.assert_called_once_with( - self.rbac_policies[0]) - self.assertIsNone(result) - - def test_multi_network_rbacs_delete(self): - arglist = [] - verifylist = [] - - for r in self.rbac_policies: - arglist.append(r.id) - verifylist = [ - ('rbac_policy', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for r in self.rbac_policies: - calls.append(call(r)) - self.network.delete_rbac_policy.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_network_policies_delete_with_exception(self): - arglist = [ - self.rbac_policies[0].id, - 'unexist_rbac_policy', - ] - verifylist = [ - ('rbac_policy', - [self.rbac_policies[0].id, 'unexist_rbac_policy']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.rbac_policies[0], exceptions.CommandError] - self.network.find_rbac_policy = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e)) - - self.network.find_rbac_policy.assert_any_call( - self.rbac_policies[0].id, ignore_missing=False) - self.network.find_rbac_policy.assert_any_call( - 'unexist_rbac_policy', ignore_missing=False) - self.network.delete_rbac_policy.assert_called_once_with( - self.rbac_policies[0] - ) - - -class TestListNetworkRABC(TestNetworkRBAC): - - # The network rbac policies going to be listed up. - rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=3) - - columns = ( - 'ID', - 'Object Type', - 'Object ID', - ) - - data = [] - for r in rbac_policies: - data.append(( - r.id, - r.object_type, - r.object_id, - )) - - def setUp(self): - super(TestListNetworkRABC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.ListNetworkRBAC(self.app, self.namespace) - - self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies) - - def test_network_rbac_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.rbac_policies.assert_called_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetNetworkRBAC(TestNetworkRBAC): - - project = identity_fakes_v3.FakeProject.create_one_project() - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( - attrs={'target_tenant': project.id}) - - def setUp(self): - super(TestSetNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.SetNetworkRBAC(self.app, self.namespace) - - self.network.find_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - self.network.update_rbac_policy = mock.Mock(return_value=None) - self.projects_mock.get.return_value = self.project - - def test_network_rbac_set_nothing(self): - arglist = [ - self.rbac_policy.id, - ] - verifylist = [ - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policy.id, ignore_missing=False - ) - attrs = {} - self.network.update_rbac_policy.assert_called_once_with( - self.rbac_policy, **attrs) - self.assertIsNone(result) - - def test_network_rbac_set(self): - arglist = [ - '--target-project', self.project.id, - self.rbac_policy.id, - ] - verifylist = [ - ('target_project', self.project.id), - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policy.id, ignore_missing=False - ) - attrs = {'target_tenant': self.project.id} - self.network.update_rbac_policy.assert_called_once_with( - self.rbac_policy, **attrs) - self.assertIsNone(result) - - -class TestShowNetworkRBAC(TestNetworkRBAC): - - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac() - - columns = ( - 'action', - 'id', - 'object_id', - 'object_type', - 'project_id', - 'target_project_id', - ) - - data = [ - rbac_policy.action, - rbac_policy.id, - rbac_policy.object_id, - rbac_policy.object_type, - rbac_policy.tenant_id, - rbac_policy.target_tenant, - ] - - def setUp(self): - super(TestShowNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.ShowNetworkRBAC(self.app, self.namespace) - - self.network.find_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_show_all_options(self): - arglist = [ - self.rbac_policy.id, - ] - verifylist = [ - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_rbac_policy.assert_called_with( - self.rbac_policy.id, ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/network/v2/test_network_segment.py b/openstackclient/tests/network/v2/test_network_segment.py deleted file mode 100644 index a635d845..00000000 --- a/openstackclient/tests/network/v2/test_network_segment.py +++ /dev/null @@ -1,200 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from osc_lib import exceptions - -from openstackclient.network.v2 import network_segment -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkSegment(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkSegment, self).setUp() - - # Enable beta commands. - self.app.options.os_beta_command = True - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestListNetworkSegment(TestNetworkSegment): - _network = network_fakes.FakeNetwork.create_one_network() - _network_segments = \ - network_fakes.FakeNetworkSegment.create_network_segments(count=3) - - columns = ( - 'ID', - 'Network', - 'Network Type', - 'Segment', - ) - columns_long = columns + ( - 'Physical Network', - ) - - data = [] - for _network_segment in _network_segments: - data.append(( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.segmentation_id, - )) - - data_long = [] - for _network_segment in _network_segments: - data_long.append(( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.segmentation_id, - _network_segment.physical_network, - )) - - def setUp(self): - super(TestListNetworkSegment, self).setUp() - - # Get the command object to test - self.cmd = network_segment.ListNetworkSegment(self.app, self.namespace) - - self.network.find_network = mock.Mock(return_value=self._network) - self.network.segments = mock.Mock(return_value=self._network_segments) - - def test_list_no_option(self): - arglist = [] - verifylist = [ - ('long', False), - ('network', None), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_no_beta_commands(self): - self.app.options.os_beta_command = False - parsed_args = self.check_parser(self.cmd, [], []) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ('network', None), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - def test_list_network(self): - arglist = [ - '--network', - self._network.id, - ] - verifylist = [ - ('long', False), - ('network', self._network.id) - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with( - **{'network_id': self._network.id} - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowNetworkSegment(TestNetworkSegment): - - # The network segment to show. - _network_segment = \ - network_fakes.FakeNetworkSegment.create_one_network_segment() - - columns = ( - 'id', - 'network_id', - 'network_type', - 'physical_network', - 'segmentation_id', - ) - - data = ( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.physical_network, - _network_segment.segmentation_id, - ) - - def setUp(self): - super(TestShowNetworkSegment, self).setUp() - - self.network.find_segment = mock.Mock( - return_value=self._network_segment - ) - - # Get the command object to test - self.cmd = network_segment.ShowNetworkSegment(self.app, self.namespace) - - def test_show_no_options(self): - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, [], []) - - def test_show_no_beta_commands(self): - arglist = [ - self._network_segment.id, - ] - verifylist = [ - ('network_segment', self._network_segment.id), - ] - self.app.options.os_beta_command = False - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_show_all_options(self): - arglist = [ - self._network_segment.id, - ] - verifylist = [ - ('network_segment', self._network_segment.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_segment.assert_called_once_with( - self._network_segment.id, - ignore_missing=False - ) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_port.py b/openstackclient/tests/network/v2/test_port.py deleted file mode 100644 index a1cecec8..00000000 --- a/openstackclient/tests/network/v2/test_port.py +++ /dev/null @@ -1,696 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock - -from mock import call -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import port -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestPort(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestPort, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - def _get_common_cols_data(self, fake_port): - columns = ( - 'admin_state_up', - 'allowed_address_pairs', - 'binding_host_id', - 'binding_profile', - 'binding_vif_details', - 'binding_vif_type', - 'binding_vnic_type', - 'device_id', - 'device_owner', - 'dns_assignment', - 'dns_name', - 'extra_dhcp_opts', - 'fixed_ips', - 'id', - 'mac_address', - 'name', - 'network_id', - 'port_security_enabled', - 'project_id', - 'security_groups', - 'status', - ) - - data = ( - port._format_admin_state(fake_port.admin_state_up), - utils.format_list_of_dicts(fake_port.allowed_address_pairs), - fake_port.binding_host_id, - utils.format_dict(fake_port.binding_profile), - utils.format_dict(fake_port.binding_vif_details), - fake_port.binding_vif_type, - fake_port.binding_vnic_type, - fake_port.device_id, - fake_port.device_owner, - utils.format_list_of_dicts(fake_port.dns_assignment), - fake_port.dns_name, - utils.format_list_of_dicts(fake_port.extra_dhcp_opts), - utils.format_list_of_dicts(fake_port.fixed_ips), - fake_port.id, - fake_port.mac_address, - fake_port.name, - fake_port.network_id, - fake_port.port_security_enabled, - fake_port.project_id, - utils.format_list(fake_port.security_groups), - fake_port.status, - ) - - return columns, data - - -class TestCreatePort(TestPort): - - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestCreatePort, self).setUp() - - self.network.create_port = mock.Mock(return_value=self._port) - fake_net = network_fakes.FakeNetwork.create_one_network({ - 'id': self._port.network_id, - }) - self.network.find_network = mock.Mock(return_value=fake_net) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - # Get the command object to test - self.cmd = port.CreatePort(self.app, self.namespace) - - def test_create_default_options(self): - arglist = [ - '--network', self._port.network_id, - 'test-port', - ] - verifylist = [ - ('network', self._port.network_id,), - ('enable', True), - ('name', 'test-port'), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'admin_state_up': True, - 'network_id': self._port.network_id, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - def test_create_full_options(self): - arglist = [ - '--mac-address', 'aa:aa:aa:aa:aa:aa', - '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2' - % self.fake_subnet.id, - '--device', 'deviceid', - '--device-owner', 'fakeowner', - '--disable', - '--vnic-type', 'macvtap', - '--binding-profile', 'foo=bar', - '--binding-profile', 'foo2=bar2', - '--network', self._port.network_id, - 'test-port', - - ] - verifylist = [ - ('mac_address', 'aa:aa:aa:aa:aa:aa'), - ( - 'fixed_ip', - [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}] - ), - ('device', 'deviceid'), - ('device_owner', 'fakeowner'), - ('disable', True), - ('vnic_type', 'macvtap'), - ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), - ('network', self._port.network_id), - ('name', 'test-port'), - - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'mac_address': 'aa:aa:aa:aa:aa:aa', - 'fixed_ips': [{'subnet_id': self.fake_subnet.id, - 'ip_address': '10.0.0.2'}], - 'device_id': 'deviceid', - 'device_owner': 'fakeowner', - 'admin_state_up': False, - 'binding:vnic_type': 'macvtap', - 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, - 'network_id': self._port.network_id, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - def test_create_invalid_json_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', '{"parent_name":"fake_parent"', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_create_invalid_key_value_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', 'key', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_create_json_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', '{"parent_name":"fake_parent"}', - '--binding-profile', '{"tag":42}', - 'test-port', - ] - verifylist = [ - ('network', self._port.network_id,), - ('enable', True), - ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}), - ('name', 'test-port'), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'admin_state_up': True, - 'network_id': self._port.network_id, - 'binding:profile': {'parent_name': 'fake_parent', 'tag': 42}, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - -class TestDeletePort(TestPort): - - # Ports to delete. - _ports = network_fakes.FakePort.create_ports(count=2) - - def setUp(self): - super(TestDeletePort, self).setUp() - - self.network.delete_port = mock.Mock(return_value=None) - self.network.find_port = network_fakes.FakePort.get_ports( - ports=self._ports) - # Get the command object to test - self.cmd = port.DeletePort(self.app, self.namespace) - - def test_port_delete(self): - arglist = [ - self._ports[0].name, - ] - verifylist = [ - ('port', [self._ports[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_port.assert_called_once_with( - self._ports[0].name, ignore_missing=False) - self.network.delete_port.assert_called_once_with(self._ports[0]) - self.assertIsNone(result) - - def test_multi_ports_delete(self): - arglist = [] - verifylist = [] - - for p in self._ports: - arglist.append(p.name) - verifylist = [ - ('port', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for p in self._ports: - calls.append(call(p)) - self.network.delete_port.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_ports_delete_with_exception(self): - arglist = [ - self._ports[0].name, - 'unexist_port', - ] - verifylist = [ - ('port', - [self._ports[0].name, 'unexist_port']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._ports[0], exceptions.CommandError] - self.network.find_port = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 ports failed to delete.', str(e)) - - self.network.find_port.assert_any_call( - self._ports[0].name, ignore_missing=False) - self.network.find_port.assert_any_call( - 'unexist_port', ignore_missing=False) - self.network.delete_port.assert_called_once_with( - self._ports[0] - ) - - -class TestListPort(TestPort): - - _ports = network_fakes.FakePort.create_ports(count=3) - - columns = ( - 'ID', - 'Name', - 'MAC Address', - 'Fixed IP Addresses', - ) - - data = [] - for prt in _ports: - data.append(( - prt.id, - prt.name, - prt.mac_address, - utils.format_list_of_dicts(prt.fixed_ips), - )) - - def setUp(self): - super(TestListPort, self).setUp() - - # Get the command object to test - self.cmd = port.ListPort(self.app, self.namespace) - self.network.ports = mock.Mock(return_value=self._ports) - fake_router = network_fakes.FakeRouter.create_one_router({ - 'id': 'fake-router-id', - }) - self.network.find_router = mock.Mock(return_value=fake_router) - - def test_port_list_no_options(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_router_opt(self): - arglist = [ - '--router', 'fake-router-name', - ] - - verifylist = [ - ('router', 'fake-router-name') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_id': 'fake-router-id' - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_device_owner_opt(self): - arglist = [ - '--device-owner', self._ports[0].device_owner, - ] - - verifylist = [ - ('device_owner', self._ports[0].device_owner) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_owner': self._ports[0].device_owner - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_all_opt(self): - arglist = [ - '--device-owner', self._ports[0].device_owner, - '--router', 'fake-router-name', - ] - - verifylist = [ - ('device_owner', self._ports[0].device_owner), - ('router', 'fake-router-name') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_owner': self._ports[0].device_owner, - 'device_id': 'fake-router-id' - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetPort(TestPort): - - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestSetPort, self).setUp() - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - self.network.find_port = mock.Mock(return_value=self._port) - self.network.update_port = mock.Mock(return_value=None) - - # Get the command object to test - self.cmd = port.SetPort(self.app, self.namespace) - - def test_set_fixed_ip(self): - arglist = [ - '--fixed-ip', 'ip-address=10.0.0.11', - self._port.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '10.0.0.11'}]), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'fixed_ips': [{'ip_address': '10.0.0.11'}], - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_append_fixed_ip(self): - _testport = network_fakes.FakePort.create_one_port( - {'fixed_ips': [{'ip_address': '0.0.0.1'}]}) - self.network.find_port = mock.Mock(return_value=_testport) - arglist = [ - '--fixed-ip', 'ip-address=10.0.0.12', - _testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '10.0.0.12'}]), - ('port', _testport.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'fixed_ips': [ - {'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}], - } - self.network.update_port.assert_called_once_with(_testport, **attrs) - self.assertIsNone(result) - - def test_set_this(self): - arglist = [ - '--disable', - '--no-fixed-ip', - '--no-binding-profile', - self._port.name, - ] - verifylist = [ - ('disable', True), - ('no_binding_profile', True), - ('no_fixed_ip', True), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'binding:profile': {}, - 'fixed_ips': [], - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - '--enable', - '--vnic-type', 'macvtap', - '--binding-profile', 'foo=bar', - '--host', 'binding-host-id-xxxx', - '--name', 'newName', - self._port.name, - ] - verifylist = [ - ('enable', True), - ('vnic_type', 'macvtap'), - ('binding_profile', {'foo': 'bar'}), - ('host', 'binding-host-id-xxxx'), - ('name', 'newName'), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': True, - 'binding:vnic_type': 'macvtap', - 'binding:profile': {'foo': 'bar'}, - 'binding:host_id': 'binding-host-id-xxxx', - 'name': 'newName', - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [ - self._port.name, - ] - verifylist = [ - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_invalid_json_binding_profile(self): - arglist = [ - '--binding-profile', '{"parent_name"}', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_set_invalid_key_value_binding_profile(self): - arglist = [ - '--binding-profile', 'key', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_set_mixed_binding_profile(self): - arglist = [ - '--binding-profile', 'foo=bar', - '--binding-profile', '{"foo2": "bar2"}', - self._port.name, - ] - verifylist = [ - ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - -class TestShowPort(TestPort): - - # The port to show. - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestShowPort, self).setUp() - - self.network.find_port = mock.Mock(return_value=self._port) - - # Get the command object to test - self.cmd = port.ShowPort(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._port.name, - ] - verifylist = [ - ('port', self._port.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_port.assert_called_once_with( - self._port.name, ignore_missing=False) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - -class TestUnsetPort(TestPort): - - def setUp(self): - super(TestUnsetPort, self).setUp() - self._testport = network_fakes.FakePort.create_one_port( - {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '0.0.0.1'}, - {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '1.0.0.0'}], - 'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'}}) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet( - {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'}) - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - self.network.find_port = mock.Mock(return_value=self._testport) - self.network.update_port = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = port.UnsetPort(self.app, self.namespace) - - def test_unset_port_parameters(self): - arglist = [ - '--fixed-ip', - 'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0', - '--binding-profile', 'Superman', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{ - 'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip-address': '1.0.0.0'}]), - ('binding_profile', ['Superman']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'fixed_ips': [{ - 'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '0.0.0.1'}], - 'binding:profile': {'batman': 'Joker'} - } - self.network.update_port.assert_called_once_with( - self._testport, **attrs) - self.assertIsNone(result) - - def test_unset_port_fixed_ip_not_existent(self): - arglist = [ - '--fixed-ip', 'ip-address=1.0.0.1', - '--binding-profile', 'Superman', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '1.0.0.1'}]), - ('binding_profile', ['Superman']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) - - def test_unset_port_binding_profile_not_existent(self): - arglist = [ - '--fixed-ip', 'ip-address=1.0.0.0', - '--binding-profile', 'Neo', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '1.0.0.0'}]), - ('binding_profile', ['Neo']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) diff --git a/openstackclient/tests/network/v2/test_router.py b/openstackclient/tests/network/v2/test_router.py deleted file mode 100644 index 1ef4707b..00000000 --- a/openstackclient/tests/network/v2/test_router.py +++ /dev/null @@ -1,751 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils as osc_utils - -from openstackclient.network.v2 import router -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestRouter(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestRouter, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestAddPortToRouter(TestRouter): - '''Add port to Router ''' - - _port = network_fakes.FakePort.create_one_port() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'port': _port.id}) - - def setUp(self): - super(TestAddPortToRouter, self).setUp() - self.network.router_add_interface = mock.Mock() - self.cmd = router.AddPortToRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_port = mock.Mock(return_value=self._port) - - def test_add_port_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_add_port_required_options(self): - arglist = [ - self._router.id, - self._router.port, - ] - verifylist = [ - ('router', self._router.id), - ('port', self._router.port), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.router_add_interface.assert_called_with(self._router, **{ - 'port_id': self._router.port, - }) - self.assertIsNone(result) - - -class TestAddSubnetToRouter(TestRouter): - '''Add subnet to Router ''' - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'subnet': _subnet.id}) - - def setUp(self): - super(TestAddSubnetToRouter, self).setUp() - self.network.router_add_interface = mock.Mock() - self.cmd = router.AddSubnetToRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_add_subnet_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_add_subnet_required_options(self): - arglist = [ - self._router.id, - self._router.subnet, - ] - verifylist = [ - ('router', self._router.id), - ('subnet', self._router.subnet), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.router_add_interface.assert_called_with( - self._router, **{'subnet_id': self._router.subnet}) - - self.assertIsNone(result) - - -class TestCreateRouter(TestRouter): - - # The new router created. - new_router = network_fakes.FakeRouter.create_one_router() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'distributed', - 'external_gateway_info', - 'ha', - 'id', - 'name', - 'project_id', - 'routes', - 'status', - ) - data = ( - router._format_admin_state(new_router.admin_state_up), - osc_utils.format_list(new_router.availability_zone_hints), - osc_utils.format_list(new_router.availability_zones), - new_router.distributed, - router._format_external_gateway_info(new_router.external_gateway_info), - new_router.ha, - new_router.id, - new_router.name, - new_router.tenant_id, - router._format_routes(new_router.routes), - new_router.status, - ) - - def setUp(self): - super(TestCreateRouter, self).setUp() - - self.network.create_router = mock.Mock(return_value=self.new_router) - - # Get the command object to test - self.cmd = router.CreateRouter(self.app, self.namespace) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.new_router.name, - ] - verifylist = [ - ('name', self.new_router.name), - ('enable', True), - ('distributed', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_router.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self.new_router.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_with_AZ_hints(self): - arglist = [ - self.new_router.name, - '--availability-zone-hint', 'fake-az', - '--availability-zone-hint', 'fake-az2', - ] - verifylist = [ - ('name', self.new_router.name), - ('availability_zone_hints', ['fake-az', 'fake-az2']), - ('enable', True), - ('distributed', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - self.network.create_router.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self.new_router.name, - 'availability_zone_hints': ['fake-az', 'fake-az2'], - }) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteRouter(TestRouter): - - # The routers to delete. - _routers = network_fakes.FakeRouter.create_routers(count=2) - - def setUp(self): - super(TestDeleteRouter, self).setUp() - - self.network.delete_router = mock.Mock(return_value=None) - - self.network.find_router = ( - network_fakes.FakeRouter.get_routers(self._routers)) - - # Get the command object to test - self.cmd = router.DeleteRouter(self.app, self.namespace) - - def test_router_delete(self): - arglist = [ - self._routers[0].name, - ] - verifylist = [ - ('router', [self._routers[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.delete_router.assert_called_once_with(self._routers[0]) - self.assertIsNone(result) - - def test_multi_routers_delete(self): - arglist = [] - verifylist = [] - - for r in self._routers: - arglist.append(r.name) - verifylist = [ - ('router', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for r in self._routers: - calls.append(call(r)) - self.network.delete_router.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_routers_delete_with_exception(self): - arglist = [ - self._routers[0].name, - 'unexist_router', - ] - verifylist = [ - ('router', - [self._routers[0].name, 'unexist_router']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._routers[0], exceptions.CommandError] - self.network.find_router = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 routers failed to delete.', str(e)) - - self.network.find_router.assert_any_call( - self._routers[0].name, ignore_missing=False) - self.network.find_router.assert_any_call( - 'unexist_router', ignore_missing=False) - self.network.delete_router.assert_called_once_with( - self._routers[0] - ) - - -class TestListRouter(TestRouter): - - # The routers going to be listed up. - routers = network_fakes.FakeRouter.create_routers(count=3) - - columns = ( - 'ID', - 'Name', - 'Status', - 'State', - 'Distributed', - 'HA', - 'Project', - ) - columns_long = columns + ( - 'Routes', - 'External gateway info', - 'Availability zones' - ) - - data = [] - for r in routers: - data.append(( - r.id, - r.name, - r.status, - router._format_admin_state(r.admin_state_up), - r.distributed, - r.ha, - r.tenant_id, - )) - data_long = [] - for i in range(0, len(routers)): - r = routers[i] - data_long.append( - data[i] + ( - router._format_routes(r.routes), - router._format_external_gateway_info(r.external_gateway_info), - osc_utils.format_list(r.availability_zones), - ) - ) - - def setUp(self): - super(TestListRouter, self).setUp() - - # Get the command object to test - self.cmd = router.ListRouter(self.app, self.namespace) - - self.network.routers = mock.Mock(return_value=self.routers) - - def test_router_list_no_options(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.routers.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_router_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.routers.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestRemovePortFromRouter(TestRouter): - '''Remove port from a Router ''' - - _port = network_fakes.FakePort.create_one_port() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'port': _port.id}) - - def setUp(self): - super(TestRemovePortFromRouter, self).setUp() - self.network.router_remove_interface = mock.Mock() - self.cmd = router.RemovePortFromRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_port = mock.Mock(return_value=self._port) - - def test_remove_port_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_remove_port_required_options(self): - arglist = [ - self._router.id, - self._router.port, - ] - verifylist = [ - ('router', self._router.id), - ('port', self._router.port), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.router_remove_interface.assert_called_with( - self._router, **{'port_id': self._router.port}) - self.assertIsNone(result) - - -class TestRemoveSubnetFromRouter(TestRouter): - '''Remove subnet from Router ''' - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'subnet': _subnet.id}) - - def setUp(self): - super(TestRemoveSubnetFromRouter, self).setUp() - self.network.router_remove_interface = mock.Mock() - self.cmd = router.RemoveSubnetFromRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_remove_subnet_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_remove_subnet_required_options(self): - arglist = [ - self._router.id, - self._router.subnet, - ] - verifylist = [ - ('subnet', self._router.subnet), - ('router', self._router.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.router_remove_interface.assert_called_with( - self._router, **{'subnet_id': self._router.subnet}) - self.assertIsNone(result) - - -class TestSetRouter(TestRouter): - - # The router to set. - _default_route = {'destination': '10.20.20.0/24', 'nexthop': '10.20.30.1'} - _router = network_fakes.FakeRouter.create_one_router( - attrs={'routes': [_default_route]} - ) - - def setUp(self): - super(TestSetRouter, self).setUp() - - self.network.update_router = mock.Mock(return_value=None) - - self.network.find_router = mock.Mock(return_value=self._router) - - # Get the command object to test - self.cmd = router.SetRouter(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - self._router.name, - '--enable', - '--distributed', - '--name', 'noob', - ] - verifylist = [ - ('router', self._router.name), - ('enable', True), - ('distributed', True), - ('name', 'noob'), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': True, - 'distributed': True, - 'name': 'noob', - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - self._router.name, - '--disable', - '--centralized', - ] - verifylist = [ - ('router', self._router.name), - ('disable', True), - ('centralized', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'distributed': False, - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_distributed_centralized(self): - arglist = [ - self._router.name, - '--distributed', - '--centralized', - ] - verifylist = [ - ('router', self._router.name), - ('distributed', True), - ('distributed', False), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_route(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': self._router.routes + [{'destination': '10.20.30.0/24', - 'nexthop': '10.20.30.1'}], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_no_route(self): - arglist = [ - self._router.name, - '--no-route', - ] - verifylist = [ - ('router', self._router.name), - ('no_route', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_route_no_route(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - '--no-route', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ('no_route', True), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_clear_routes(self): - arglist = [ - self._router.name, - '--clear-routes', - ] - verifylist = [ - ('router', self._router.name), - ('clear_routes', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_route_clear_routes(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - '--clear-routes', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ('clear_routes', True), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_nothing(self): - arglist = [ - self._router.name, - ] - verifylist = [ - ('router', self._router.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - -class TestShowRouter(TestRouter): - - # The router to set. - _router = network_fakes.FakeRouter.create_one_router() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'distributed', - 'external_gateway_info', - 'ha', - 'id', - 'name', - 'project_id', - 'routes', - 'status', - ) - data = ( - router._format_admin_state(_router.admin_state_up), - osc_utils.format_list(_router.availability_zone_hints), - osc_utils.format_list(_router.availability_zones), - _router.distributed, - router._format_external_gateway_info(_router.external_gateway_info), - _router.ha, - _router.id, - _router.name, - _router.tenant_id, - router._format_routes(_router.routes), - _router.status, - ) - - def setUp(self): - super(TestShowRouter, self).setUp() - - self.network.find_router = mock.Mock(return_value=self._router) - - # Get the command object to test - self.cmd = router.ShowRouter(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._router.name, - ] - verifylist = [ - ('router', self._router.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_router.assert_called_once_with( - self._router.name, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetRouter(TestRouter): - - def setUp(self): - super(TestUnsetRouter, self).setUp() - self._testrouter = network_fakes.FakeRouter.create_one_router( - {'routes': [{"destination": "192.168.101.1/24", - "gateway": "172.24.4.3"}, - {"destination": "192.168.101.2/24", - "gateway": "172.24.4.3"}], }) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_router = mock.Mock(return_value=self._testrouter) - self.network.update_router = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = router.UnsetRouter(self.app, self.namespace) - - def test_unset_router_params(self): - arglist = [ - '--route', 'destination=192.168.101.1/24,gateway=172.24.4.3', - self._testrouter.name, - ] - verifylist = [ - ('routes', [ - {"destination": "192.168.101.1/24", "gateway": "172.24.4.3"}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [{"destination": "192.168.101.2/24", - "nexthop": "172.24.4.3"}], - } - self.network.update_router.assert_called_once_with( - self._testrouter, **attrs) - self.assertIsNone(result) - - def test_unset_router_wrong_routes(self): - arglist = [ - '--route', 'destination=192.168.101.1/24,gateway=172.24.4.2', - self._testrouter.name, - ] - verifylist = [ - ('routes', [ - {"destination": "192.168.101.1/24", "gateway": "172.24.4.2"}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/network/v2/test_security_group.py b/openstackclient/tests/network/v2/test_security_group.py deleted file mode 100644 index 15f4cffe..00000000 --- a/openstackclient/tests/network/v2/test_security_group.py +++ /dev/null @@ -1,770 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import security_group -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSecurityGroupNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSecurityGroupNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestSecurityGroupCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be created. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.project_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupNetwork, self).setUp() - - self.network.create_security_group = mock.Mock( - return_value=self._security_group) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, self.namespace) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group.assert_called_once_with(**{ - 'description': self._security_group.name, - 'name': self._security_group.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ('project', self.project.name), - ('project_domain', self.domain.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group.assert_called_once_with(**{ - 'description': self._security_group.description, - 'name': self._security_group.name, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.create.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_network_options(self): - arglist = [ - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.name) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.description) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security groups to be deleted. - _security_groups = \ - network_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupNetwork, self).setUp() - - self.network.delete_security_group = mock.Mock(return_value=None) - - self.network.find_security_group = ( - network_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].name, - ] - verifylist = [ - ('group', [self._security_groups[0].name]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.network.delete_security_group.assert_called_once_with( - self._security_groups[0]) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.name) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s)) - self.network.delete_security_group.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].name, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].name, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.network.find_security_group = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.network.find_security_group.assert_any_call( - self._security_groups[0].name, ignore_missing=False) - self.network.find_security_group.assert_any_call( - 'unexist_security_group', ignore_missing=False) - self.network.delete_security_group.assert_called_once_with( - self._security_groups[0] - ) - - -class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): - - # The security groups to be deleted. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.delete = mock.Mock(return_value=None) - - self.compute.security_groups.get = ( - compute_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, None) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].id, - ] - verifylist = [ - ('group', [self._security_groups[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.id) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s.id)) - self.compute.security_groups.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].id, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].id, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.compute.security_groups.get = ( - mock.MagicMock(side_effect=find_mock_result) - ) - self.compute.security_groups.find.side_effect = ( - exceptions.NotFound(None)) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.compute.security_groups.get.assert_any_call( - self._security_groups[0].id) - self.compute.security_groups.get.assert_any_call( - 'unexist_security_group') - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id - ) - - -class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group to be listed. - _security_groups = \ - network_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupNetwork, self).setUp() - - self.network.security_groups = mock.Mock( - return_value=self._security_groups) - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, self.namespace) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_groups.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_groups.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestListSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be listed. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - ) - columns_all_projects = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - )) - data_all_projects = [] - for grp in _security_groups: - data_all_projects.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - self.compute.security_groups.list.return_value = self._security_groups - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, None) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': False}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': True}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns_all_projects, columns) - self.assertEqual(self.data_all_projects, list(data)) - - -class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group to be set. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupNetwork, self).setUp() - - self.network.update_security_group = mock.Mock(return_value=None) - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, self.namespace) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.network.update_security_group.assert_called_once_with( - self._security_group, - **{} - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'description': new_description, - 'name': new_name, - } - self.network.update_security_group.assert_called_once_with( - self._security_group, - **attrs - ) - self.assertIsNone(result) - - -class TestSetSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be set. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.update = mock.Mock(return_value=None) - - self.compute.security_groups.get = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, None) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - self._security_group.name, - self._security_group.description - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - new_name, - new_description - ) - self.assertIsNone(result) - - -class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group rule to be shown with the group. - _security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'security_group_rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.project_id, - security_group._format_network_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, self.namespace) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_security_group.assert_called_once_with( - self._security_group.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestShowSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group rule to be shown with the group. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - security_group._format_compute_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py deleted file mode 100644 index 170989bf..00000000 --- a/openstackclient/tests/network/v2/test_security_group_rule.py +++ /dev/null @@ -1,1177 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import copy -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network import utils as network_utils -from openstackclient.network.v2 import security_group_rule -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests import fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSecurityGroupRuleNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupRuleCompute, self).setUp() - - # Get a shortcut to the network client - self.compute = self.app.client_manager.compute - - -class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - expected_columns = ( - 'direction', - 'ethertype', - 'id', - 'port_range_max', - 'port_range_min', - 'project_id', - 'protocol', - 'remote_group_id', - 'remote_ip_prefix', - 'security_group_id', - ) - - expected_data = None - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.network.create_security_group_rule = mock.Mock( - return_value=self._security_group_rule) - self.expected_data = ( - self._security_group_rule.direction, - self._security_group_rule.ethertype, - self._security_group_rule.id, - self._security_group_rule.port_range_max, - self._security_group_rule.port_range_min, - self._security_group_rule.project_id, - self._security_group_rule.protocol, - self._security_group_rule.remote_group_id, - self._security_group_rule.remote_ip_prefix, - self._security_group_rule.security_group_id, - ) - - def setUp(self): - super(TestCreateSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule( - self.app, self.namespace) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_ethertype(self): - arglist = [ - '--ethertype', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_port_range_options(self): - arglist = [ - '--dst-port', '80:80', - '--icmp-type', '3', - '--icmp-code', '1', - self._security_group.id, - ] - verifylist = [ - ('dst_port', (80, 80)), - ('icmp_type', 3), - ('icmp_code', 1), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_default_rule(self): - self._setup_security_group_rule({ - 'port_range_max': 443, - 'port_range_min': 443, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_proto_option(self): - self._setup_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - }) - arglist = [ - '--proto', self._security_group_rule.protocol, - '--src-ip', self._security_group_rule.remote_ip_prefix, - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.remote_ip_prefix), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_source_group(self): - self._setup_security_group_rule({ - 'port_range_max': 22, - 'port_range_min': 22, - 'remote_group_id': self._security_group.id, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - '--ingress', - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('ingress', True), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_group_id': self._security_group_rule.remote_group_id, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_source_ip(self): - self._setup_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - }) - arglist = [ - '--protocol', self._security_group_rule.protocol, - '--src-ip', self._security_group_rule.remote_ip_prefix, - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.protocol), - ('src_ip', self._security_group_rule.remote_ip_prefix), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_network_options(self): - self._setup_security_group_rule({ - 'direction': 'egress', - 'ethertype': 'IPv6', - 'port_range_max': 443, - 'port_range_min': 443, - 'protocol': '6', - 'remote_group_id': None, - 'remote_ip_prefix': None, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - '--egress', - '--ethertype', self._security_group_rule.ethertype, - '--project', self.project.name, - '--project-domain', self.domain.name, - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('egress', True), - ('ethertype', self._security_group_rule.ethertype), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_tcp_with_icmp_type(self): - arglist = [ - '--protocol', 'tcp', - '--icmp-type', '15', - self._security_group.id, - ] - verifylist = [ - ('protocol', 'tcp'), - ('icmp_type', 15), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_icmp_code(self): - arglist = [ - '--protocol', '1', - '--icmp-code', '1', - self._security_group.id, - ] - verifylist = [ - ('protocol', '1'), - ('icmp_code', 1), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_icmp_type(self): - self._setup_security_group_rule({ - 'port_range_min': 15, - 'protocol': 'icmp', - 'remote_ip_prefix': '0.0.0.0/0', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', None), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_ipv6_icmp_type_code(self): - self._setup_security_group_rule({ - 'ethertype': 'IPv6', - 'port_range_min': 139, - 'port_range_max': 2, - 'protocol': 'ipv6-icmp', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--icmp-code', str(self._security_group_rule.port_range_max), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', self._security_group_rule.port_range_max), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'port_range_max': self._security_group_rule.port_range_max, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_icmpv6_type(self): - self._setup_security_group_rule({ - 'ethertype': 'IPv6', - 'port_range_min': 139, - 'protocol': 'icmpv6', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', None), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - -class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.compute.security_group_rules.create.return_value = \ - self._security_group_rule - expected_columns, expected_data = \ - security_group_rule._format_security_group_rule_show( - self._security_group_rule._info) - return expected_columns, expected_data - - def setUp(self): - super(TestCreateSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_protocol(self): - arglist = [ - '--protocol', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_network_options(self): - arglist = [ - '--ingress', - '--ethertype', 'IPv4', - '--icmp-type', '3', - '--icmp-code', '11', - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_default_rule(self): - expected_columns, expected_data = self._setup_security_group_rule() - dst_port = str(self._security_group_rule.from_port) + ':' + \ - str(self._security_group_rule.to_port) - arglist = [ - '--dst-port', dst_port, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_proto_option(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--proto', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.ip_protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - -class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group rules to be deleted. - _security_group_rules = \ - network_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleNetwork, self).setUp() - - self.network.delete_security_group_rule = mock.Mock(return_value=None) - - self.network.find_security_group_rule = ( - network_fakes.FakeSecurityGroupRule.get_security_group_rules( - self._security_group_rules) - ) - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule( - self.app, self.namespace) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_security_group_rule.assert_called_once_with( - self._security_group_rules[0]) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s)) - self.network.delete_security_group_rule.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [ - self._security_group_rules[0], exceptions.CommandError] - self.network.find_security_group_rule = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.network.find_security_group_rule.assert_any_call( - self._security_group_rules[0].id, ignore_missing=False) - self.network.find_security_group_rule.assert_any_call( - 'unexist_rule', ignore_missing=False) - self.network.delete_security_group_rule.assert_called_once_with( - self._security_group_rules[0] - ) - - -class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be deleted. - _security_group_rules = \ - compute_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.delete.assert_called_once_with( - self._security_group_rules[0].id) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s.id)) - self.compute.security_group_rules.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [None, exceptions.CommandError] - self.compute.security_group_rules.delete = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.compute.security_group_rules.delete.assert_any_call( - self._security_group_rules[0].id) - self.compute.security_group_rules.delete.assert_any_call( - 'unexist_rule') - - -class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group to hold the rules. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'protocol': 'tcp', - 'port_range_max': 80, - 'port_range_min': 80, - 'security_group_id': _security_group.id, - }) - _security_group_rule_icmp = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - 'security_group_id': _security_group.id, - }) - _security_group.security_group_rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - _security_group_rules = [_security_group_rule_tcp, - _security_group_rule_icmp] - - expected_columns_with_group_and_long = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Direction', - 'Ethertype', - 'Remote Security Group', - ) - expected_columns_no_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - 'Security Group', - ) - - expected_data_with_group_and_long = [] - expected_data_no_group = [] - for _security_group_rule in _security_group_rules: - expected_data_with_group_and_long.append(( - _security_group_rule.id, - _security_group_rule.protocol, - _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule), - _security_group_rule.direction, - _security_group_rule.ethertype, - _security_group_rule.remote_group_id, - )) - expected_data_no_group.append(( - _security_group_rule.id, - _security_group_rule.protocol, - _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule), - _security_group_rule.remote_group_id, - _security_group_rule.security_group_id, - )) - - def setUp(self): - super(TestListSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - self.network.security_group_rules = mock.Mock( - return_value=self._security_group_rules) - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule( - self.app, self.namespace) - - def test_list_default(self): - self._security_group_rule_tcp.port_range_min = 80 - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{}) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group_and_long(self): - self._security_group_rule_tcp.port_range_min = 80 - arglist = [ - '--long', - self._security_group.id, - ] - verifylist = [ - ('long', True), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{ - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns_with_group_and_long, columns) - self.assertEqual(self.expected_data_with_group_and_long, list(data)) - - def test_list_with_ignored_options(self): - self._security_group_rule_tcp.port_range_min = 80 - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{}) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - -class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group to hold the rules. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'tcp', - 'from_port': 80, - 'to_port': 80, - 'group': {'name': _security_group.name}, - }) - _security_group_rule_icmp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - 'group': {'name': _security_group.name}, - }) - _security_group.rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - - expected_columns_with_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - ) - expected_columns_no_group = \ - expected_columns_with_group + ('Security Group',) - - expected_data_with_group = [] - expected_data_no_group = [] - for _security_group_rule in _security_group.rules: - rule = network_utils.transform_compute_security_group_rule( - _security_group_rule - ) - expected_rule_with_group = ( - rule['id'], - rule['ip_protocol'], - rule['ip_range'], - rule['port_range'], - rule['remote_security_group'], - ) - expected_rule_no_group = expected_rule_with_group + \ - (_security_group_rule['parent_group_id'],) - expected_data_with_group.append(expected_rule_with_group) - expected_data_no_group.append(expected_rule_no_group) - - def setUp(self): - super(TestListSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = \ - self._security_group - self.compute.security_groups.list.return_value = \ - [self._security_group] - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) - - def test_list_default(self): - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id - ) - self.assertEqual(self.expected_columns_with_group, columns) - self.assertEqual(self.expected_data_with_group, list(data)) - - def test_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': True} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_ignored_options(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - -class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group rule to be shown. - _security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns = ( - 'direction', - 'ethertype', - 'id', - 'port_range_max', - 'port_range_min', - 'project_id', - 'protocol', - 'remote_group_id', - 'remote_ip_prefix', - 'security_group_id', - ) - - data = ( - _security_group_rule.direction, - _security_group_rule.ethertype, - _security_group_rule.id, - _security_group_rule.port_range_max, - _security_group_rule.port_range_min, - _security_group_rule.project_id, - _security_group_rule.protocol, - _security_group_rule.remote_group_id, - _security_group_rule.remote_ip_prefix, - _security_group_rule.security_group_id, - ) - - def setUp(self): - super(TestShowSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group_rule = mock.Mock( - return_value=self._security_group_rule) - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule( - self.app, self.namespace) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_security_group_rule.assert_called_once_with( - self._security_group_rule.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be shown. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns, data = \ - security_group_rule._format_security_group_rule_show( - _security_group_rule._info) - - def setUp(self): - super(TestShowSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Build a security group fake customized for this test. - security_group_rules = [self._security_group_rule._info] - security_group = fakes.FakeResource( - info=copy.deepcopy({'rules': security_group_rules}), - loaded=True) - security_group.rules = security_group_rules - self.compute.security_groups.list.return_value = [security_group] - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_subnet.py b/openstackclient/tests/network/v2/test_subnet.py deleted file mode 100644 index c117c6fd..00000000 --- a/openstackclient/tests/network/v2/test_subnet.py +++ /dev/null @@ -1,978 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import subnet as subnet_v2 -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSubnet(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSubnet, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateSubnet(TestSubnet): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # An IPv4 subnet to be created with mostly default values - _subnet = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - } - ) - - # Subnet pool to be used to create a subnet from a pool - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - # An IPv4 subnet to be created using a specific subnet pool - _subnet_from_pool = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - 'subnetpool_id': _subnet_pool.id, - 'dns_nameservers': ['8.8.8.8', - '8.8.4.4'], - 'host_routes': [{'destination': '10.20.20.0/24', - 'nexthop': '10.20.20.1'}, - {'destination': '10.30.30.0/24', - 'nexthop': '10.30.30.1'}], - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], - } - ) - - # An IPv6 subnet to be created with most options specified - _subnet_ipv6 = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - 'cidr': 'fe80:0:0:a00a::/64', - 'enable_dhcp': True, - 'dns_nameservers': ['fe80:27ff:a00a:f00f::ffff', - 'fe80:37ff:a00a:f00f::ffff'], - 'allocation_pools': [{'start': 'fe80::a00a:0:c0de:0:100', - 'end': 'fe80::a00a:0:c0de:0:f000'}, - {'start': 'fe80::a00a:0:c0de:1:100', - 'end': 'fe80::a00a:0:c0de:1:f000'}], - 'host_routes': [{'destination': 'fe80:27ff:a00a:f00f::/64', - 'nexthop': 'fe80:27ff:a00a:f00f::1'}, - {'destination': 'fe80:37ff:a00a:f00f::/64', - 'nexthop': 'fe80:37ff:a00a:f00f::1'}], - 'ip_version': 6, - 'gateway_ip': 'fe80::a00a:0:c0de:0:1', - 'ipv6_address_mode': 'slaac', - 'ipv6_ra_mode': 'slaac', - 'subnetpool_id': 'None', - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], - } - ) - - # The network to be returned from find_network - _network = network_fakes.FakeNetwork.create_one_network( - attrs={ - 'id': _subnet.network_id, - } - ) - - # The network segment to be returned from find_segment - _network_segment = \ - network_fakes.FakeNetworkSegment.create_one_network_segment( - attrs={ - 'network_id': _subnet.network_id, - } - ) - - columns = ( - 'allocation_pools', - 'cidr', - 'dns_nameservers', - 'enable_dhcp', - 'gateway_ip', - 'host_routes', - 'id', - 'ip_version', - 'ipv6_address_mode', - 'ipv6_ra_mode', - 'name', - 'network_id', - 'project_id', - 'segment_id', - 'service_types', - 'subnetpool_id', - ) - - data = ( - subnet_v2._format_allocation_pools(_subnet.allocation_pools), - _subnet.cidr, - utils.format_list(_subnet.dns_nameservers), - _subnet.enable_dhcp, - _subnet.gateway_ip, - subnet_v2._format_host_routes(_subnet.host_routes), - _subnet.id, - _subnet.ip_version, - _subnet.ipv6_address_mode, - _subnet.ipv6_ra_mode, - _subnet.name, - _subnet.network_id, - _subnet.project_id, - _subnet.segment_id, - utils.format_list(_subnet.service_types), - _subnet.subnetpool_id, - ) - - data_subnet_pool = ( - subnet_v2._format_allocation_pools(_subnet_from_pool.allocation_pools), - _subnet_from_pool.cidr, - utils.format_list(_subnet_from_pool.dns_nameservers), - _subnet_from_pool.enable_dhcp, - _subnet_from_pool.gateway_ip, - subnet_v2._format_host_routes(_subnet_from_pool.host_routes), - _subnet_from_pool.id, - _subnet_from_pool.ip_version, - _subnet_from_pool.ipv6_address_mode, - _subnet_from_pool.ipv6_ra_mode, - _subnet_from_pool.name, - _subnet_from_pool.network_id, - _subnet_from_pool.project_id, - _subnet_from_pool.segment_id, - utils.format_list(_subnet_from_pool.service_types), - _subnet_from_pool.subnetpool_id, - ) - - data_ipv6 = ( - subnet_v2._format_allocation_pools(_subnet_ipv6.allocation_pools), - _subnet_ipv6.cidr, - utils.format_list(_subnet_ipv6.dns_nameservers), - _subnet_ipv6.enable_dhcp, - _subnet_ipv6.gateway_ip, - subnet_v2._format_host_routes(_subnet_ipv6.host_routes), - _subnet_ipv6.id, - _subnet_ipv6.ip_version, - _subnet_ipv6.ipv6_address_mode, - _subnet_ipv6.ipv6_ra_mode, - _subnet_ipv6.name, - _subnet_ipv6.network_id, - _subnet_ipv6.project_id, - _subnet_ipv6.segment_id, - utils.format_list(_subnet_ipv6.service_types), - _subnet_ipv6.subnetpool_id, - ) - - def setUp(self): - super(TestCreateSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.CreateSubnet(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Mock SDK calls for all tests. - self.network.find_network = mock.Mock(return_value=self._network) - self.network.find_segment = mock.Mock( - return_value=self._network_segment - ) - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool - ) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Testing that a call without the required argument will fail and - # throw a "ParserExecption" - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_create_default_options(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet) - self._network.id = self._subnet.network_id - - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network', self._subnet.network_id), - ('ip_version', self._subnet.ip_version), - ('gateway', 'auto'), - - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet.cidr, - 'ip_version': self._subnet.ip_version, - 'name': self._subnet.name, - 'network_id': self._subnet.network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_from_subnet_pool_options(self): - # Mock SDK calls for this test. - self.network.create_subnet = \ - mock.Mock(return_value=self._subnet_from_pool) - self._network.id = self._subnet_from_pool.network_id - - arglist = [ - self._subnet_from_pool.name, - "--subnet-pool", self._subnet_from_pool.subnetpool_id, - "--prefix-length", '24', - "--network", self._subnet_from_pool.network_id, - "--ip-version", str(self._subnet_from_pool.ip_version), - "--gateway", self._subnet_from_pool.gateway_ip, - "--dhcp", - ] - - for dns_addr in self._subnet_from_pool.dns_nameservers: - arglist.append('--dns-nameserver') - arglist.append(dns_addr) - - for host_route in self._subnet_from_pool.host_routes: - arglist.append('--host-route') - value = 'gateway=' + host_route.get('nexthop', '') + \ - ',destination=' + host_route.get('destination', '') - arglist.append(value) - - for service_type in self._subnet_from_pool.service_types: - arglist.append('--service-type') - arglist.append(service_type) - - verifylist = [ - ('name', self._subnet_from_pool.name), - ('prefix_length', '24'), - ('network', self._subnet_from_pool.network_id), - ('ip_version', self._subnet_from_pool.ip_version), - ('gateway', self._subnet_from_pool.gateway_ip), - ('dns_nameservers', self._subnet_from_pool.dns_nameservers), - ('dhcp', self._subnet_from_pool.enable_dhcp), - ('host_routes', subnet_v2.convert_entries_to_gateway( - self._subnet_from_pool.host_routes)), - ('subnet_pool', self._subnet_from_pool.subnetpool_id), - ('service_types', self._subnet_from_pool.service_types), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'dns_nameservers': self._subnet_from_pool.dns_nameservers, - 'enable_dhcp': self._subnet_from_pool.enable_dhcp, - 'gateway_ip': self._subnet_from_pool.gateway_ip, - 'host_routes': self._subnet_from_pool.host_routes, - 'ip_version': self._subnet_from_pool.ip_version, - 'name': self._subnet_from_pool.name, - 'network_id': self._subnet_from_pool.network_id, - 'prefixlen': '24', - 'subnetpool_id': self._subnet_from_pool.subnetpool_id, - 'service_types': self._subnet_from_pool.service_types, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data_subnet_pool, data) - - def test_create_options_subnet_range_ipv6(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet_ipv6) - self._network.id = self._subnet_ipv6.network_id - - arglist = [ - self._subnet_ipv6.name, - "--subnet-range", self._subnet_ipv6.cidr, - "--network", self._subnet_ipv6.network_id, - "--ip-version", str(self._subnet_ipv6.ip_version), - "--ipv6-ra-mode", self._subnet_ipv6.ipv6_ra_mode, - "--ipv6-address-mode", self._subnet_ipv6.ipv6_address_mode, - "--gateway", self._subnet_ipv6.gateway_ip, - "--dhcp", - ] - - for dns_addr in self._subnet_ipv6.dns_nameservers: - arglist.append('--dns-nameserver') - arglist.append(dns_addr) - - for host_route in self._subnet_ipv6.host_routes: - arglist.append('--host-route') - value = 'gateway=' + host_route.get('nexthop', '') + \ - ',destination=' + host_route.get('destination', '') - arglist.append(value) - - for pool in self._subnet_ipv6.allocation_pools: - arglist.append('--allocation-pool') - value = 'start=' + pool.get('start', '') + \ - ',end=' + pool.get('end', '') - arglist.append(value) - - for service_type in self._subnet_ipv6.service_types: - arglist.append('--service-type') - arglist.append(service_type) - - verifylist = [ - ('name', self._subnet_ipv6.name), - ('subnet_range', self._subnet_ipv6.cidr), - ('network', self._subnet_ipv6.network_id), - ('ip_version', self._subnet_ipv6.ip_version), - ('ipv6_ra_mode', self._subnet_ipv6.ipv6_ra_mode), - ('ipv6_address_mode', self._subnet_ipv6.ipv6_address_mode), - ('gateway', self._subnet_ipv6.gateway_ip), - ('dns_nameservers', self._subnet_ipv6.dns_nameservers), - ('dhcp', self._subnet_ipv6.enable_dhcp), - ('host_routes', subnet_v2.convert_entries_to_gateway( - self._subnet_ipv6.host_routes)), - ('allocation_pools', self._subnet_ipv6.allocation_pools), - ('service_types', self._subnet_ipv6.service_types), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet_ipv6.cidr, - 'dns_nameservers': self._subnet_ipv6.dns_nameservers, - 'enable_dhcp': self._subnet_ipv6.enable_dhcp, - 'gateway_ip': self._subnet_ipv6.gateway_ip, - 'host_routes': self._subnet_ipv6.host_routes, - 'ip_version': self._subnet_ipv6.ip_version, - 'ipv6_address_mode': self._subnet_ipv6.ipv6_address_mode, - 'ipv6_ra_mode': self._subnet_ipv6.ipv6_ra_mode, - 'name': self._subnet_ipv6.name, - 'network_id': self._subnet_ipv6.network_id, - 'allocation_pools': self._subnet_ipv6.allocation_pools, - 'service_types': self._subnet_ipv6.service_types, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data_ipv6, data) - - def test_create_no_beta_command_options(self): - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network-segment", self._network_segment.id, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network-segment', self._network_segment.id), - ('network', self._subnet.network_id), - ] - self.app.options.os_beta_command = False - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_create_with_network_segment(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet) - self._network.id = self._subnet.network_id - - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network-segment", self._network_segment.id, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network_segment', self._network_segment.id), - ('network', self._subnet.network_id), - ('ip_version', self._subnet.ip_version), - ('gateway', 'auto'), - - ] - - self.app.options.os_beta_command = True - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet.cidr, - 'ip_version': self._subnet.ip_version, - 'name': self._subnet.name, - 'network_id': self._subnet.network_id, - 'segment_id': self._network_segment.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSubnet(TestSubnet): - - # The subnets to delete. - _subnets = network_fakes.FakeSubnet.create_subnets(count=2) - - def setUp(self): - super(TestDeleteSubnet, self).setUp() - - self.network.delete_subnet = mock.Mock(return_value=None) - - self.network.find_subnet = ( - network_fakes.FakeSubnet.get_subnets(self._subnets)) - - # Get the command object to test - self.cmd = subnet_v2.DeleteSubnet(self.app, self.namespace) - - def test_subnet_delete(self): - arglist = [ - self._subnets[0].name, - ] - verifylist = [ - ('subnet', [self._subnets[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.delete_subnet.assert_called_once_with(self._subnets[0]) - self.assertIsNone(result) - - def test_multi_subnets_delete(self): - arglist = [] - verifylist = [] - - for s in self._subnets: - arglist.append(s.name) - verifylist = [ - ('subnet', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._subnets: - calls.append(call(s)) - self.network.delete_subnet.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_subnets_delete_with_exception(self): - arglist = [ - self._subnets[0].name, - 'unexist_subnet', - ] - verifylist = [ - ('subnet', - [self._subnets[0].name, 'unexist_subnet']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._subnets[0], exceptions.CommandError] - self.network.find_subnet = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 subnets failed to delete.', str(e)) - - self.network.find_subnet.assert_any_call( - self._subnets[0].name, ignore_missing=False) - self.network.find_subnet.assert_any_call( - 'unexist_subnet', ignore_missing=False) - self.network.delete_subnet.assert_called_once_with( - self._subnets[0] - ) - - -class TestListSubnet(TestSubnet): - # The subnets going to be listed up. - _subnet = network_fakes.FakeSubnet.create_subnets(count=3) - - columns = ( - 'ID', - 'Name', - 'Network', - 'Subnet', - ) - columns_long = columns + ( - 'Project', - 'DHCP', - 'Name Servers', - 'Allocation Pools', - 'Host Routes', - 'IP Version', - 'Gateway', - 'Service Types', - ) - - data = [] - for subnet in _subnet: - data.append(( - subnet.id, - subnet.name, - subnet.network_id, - subnet.cidr, - )) - - data_long = [] - for subnet in _subnet: - data_long.append(( - subnet.id, - subnet.name, - subnet.network_id, - subnet.cidr, - subnet.tenant_id, - subnet.enable_dhcp, - utils.format_list(subnet.dns_nameservers), - subnet_v2._format_allocation_pools(subnet.allocation_pools), - utils.format_list(subnet.host_routes), - subnet.ip_version, - subnet.gateway_ip, - utils.format_list(subnet.service_types), - )) - - def setUp(self): - super(TestListSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.ListSubnet(self.app, self.namespace) - - self.network.subnets = mock.Mock(return_value=self._subnet) - - def test_subnet_list_no_options(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnets.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnets.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - def test_subnet_list_ip_version(self): - arglist = [ - '--ip-version', str(4), - ] - verifylist = [ - ('ip_version', 4), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_dhcp(self): - arglist = [ - '--dhcp', - ] - verifylist = [ - ('dhcp', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'enable_dhcp': True} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_no_dhcp(self): - arglist = [ - '--no-dhcp', - ] - verifylist = [ - ('no_dhcp', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'enable_dhcp': False} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_service_type(self): - arglist = [ - '--service-type', 'network:router_gateway', - ] - verifylist = [ - ('service_types', ['network:router_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'service_types': ['network:router_gateway']} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_service_type_multiple(self): - arglist = [ - '--service-type', 'network:router_gateway', - '--service-type', 'network:floatingip_agent_gateway', - ] - verifylist = [ - ('service_types', ['network:router_gateway', - 'network:floatingip_agent_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway']} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetSubnet(TestSubnet): - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - - def setUp(self): - super(TestSetSubnet, self).setUp() - self.network.update_subnet = mock.Mock(return_value=None) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - self.cmd = subnet_v2.SetSubnet(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - "--name", "new_subnet", - "--dhcp", - "--gateway", self._subnet.gateway_ip, - self._subnet.name, - ] - verifylist = [ - ('name', "new_subnet"), - ('dhcp', True), - ('gateway', self._subnet.gateway_ip), - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'enable_dhcp': True, - 'gateway_ip': self._subnet.gateway_ip, - 'name': "new_subnet", - } - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - "--name", "new_subnet", - "--no-dhcp", - "--gateway", "none", - self._subnet.name, - ] - verifylist = [ - ('name', "new_subnet"), - ('no_dhcp', True), - ('gateway', "none"), - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'enable_dhcp': False, - 'gateway_ip': None, - 'name': "new_subnet", - } - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._subnet.name, ] - verifylist = [('subnet', self._subnet.name)] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_append_options(self): - _testsubnet = network_fakes.FakeSubnet.create_one_subnet( - {'dns_nameservers': ["10.0.0.1"], - 'service_types': ["network:router_gateway"]}) - self.network.find_subnet = mock.Mock(return_value=_testsubnet) - arglist = [ - '--dns-nameserver', '10.0.0.2', - '--service-type', 'network:floatingip_agent_gateway', - _testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['10.0.0.2']), - ('service_types', ['network:floatingip_agent_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'dns_nameservers': ['10.0.0.2', '10.0.0.1'], - 'service_types': ['network:floatingip_agent_gateway', - 'network:router_gateway'], - } - self.network.update_subnet.assert_called_once_with( - _testsubnet, **attrs) - self.assertIsNone(result) - - -class TestShowSubnet(TestSubnet): - # The subnets to be shown - _subnet = network_fakes.FakeSubnet.create_one_subnet() - - columns = ( - 'allocation_pools', - 'cidr', - 'dns_nameservers', - 'enable_dhcp', - 'gateway_ip', - 'host_routes', - 'id', - 'ip_version', - 'ipv6_address_mode', - 'ipv6_ra_mode', - 'name', - 'network_id', - 'project_id', - 'segment_id', - 'service_types', - 'subnetpool_id', - ) - - data = ( - subnet_v2._format_allocation_pools(_subnet.allocation_pools), - _subnet.cidr, - utils.format_list(_subnet.dns_nameservers), - _subnet.enable_dhcp, - _subnet.gateway_ip, - utils.format_list(_subnet.host_routes), - _subnet.id, - _subnet.ip_version, - _subnet.ipv6_address_mode, - _subnet.ipv6_ra_mode, - _subnet.name, - _subnet.network_id, - _subnet.tenant_id, - _subnet.segment_id, - utils.format_list(_subnet.service_types), - _subnet.subnetpool_id, - ) - - def setUp(self): - super(TestShowSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.ShowSubnet(self.app, self.namespace) - - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Testing that a call without the required argument will fail and - # throw a "ParserExecption" - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._subnet.name, - ] - verifylist = [ - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_subnet.assert_called_once_with( - self._subnet.name, ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetSubnet(TestSubnet): - - def setUp(self): - super(TestUnsetSubnet, self).setUp() - self._testsubnet = network_fakes.FakeSubnet.create_one_subnet( - {'dns_nameservers': ['8.8.8.8', - '8.8.8.4'], - 'host_routes': [{'destination': '10.20.20.0/24', - 'nexthop': '10.20.20.1'}, - {'destination': '10.30.30.30/24', - 'nexthop': '10.30.30.1'}], - 'allocation_pools': [{'start': '8.8.8.100', - 'end': '8.8.8.150'}, - {'start': '8.8.8.160', - 'end': '8.8.8.170'}], - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], }) - self.network.find_subnet = mock.Mock(return_value=self._testsubnet) - self.network.update_subnet = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = subnet_v2.UnsetSubnet(self.app, self.namespace) - - def test_unset_subnet_params(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - '--service-type', 'network:router_gateway', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ('service_types', ['network:router_gateway']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'dns_nameservers': ['8.8.8.4'], - 'host_routes': [{ - "destination": "10.20.20.0/24", "nexthop": "10.20.20.1"}], - 'allocation_pools': [{'start': '8.8.8.160', 'end': '8.8.8.170'}], - 'service_types': ['network:floatingip_agent_gateway'], - } - self.network.update_subnet.assert_called_once_with( - self._testsubnet, **attrs) - self.assertIsNone(result) - - def test_unset_subnet_wrong_host_routes(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.2', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.2"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_allocation_pool(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.156', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.156'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_dns_nameservers(self): - arglist = [ - '--dns-nameserver', '8.8.8.1', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.1']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_service_type(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - '--service-type', 'network:dhcp', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ('service_types', ['network:dhcp']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/network/v2/test_subnet_pool.py b/openstackclient/tests/network/v2/test_subnet_pool.py deleted file mode 100644 index 4cfecef7..00000000 --- a/openstackclient/tests/network/v2/test_subnet_pool.py +++ /dev/null @@ -1,722 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import subnet_pool -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSubnetPool(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSubnetPool, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateSubnetPool(TestSubnetPool): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new subnet pool to create. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - columns = ( - 'address_scope_id', - 'default_prefixlen', - 'default_quota', - 'id', - 'ip_version', - 'is_default', - 'max_prefixlen', - 'min_prefixlen', - 'name', - 'prefixes', - 'project_id', - 'shared', - ) - data = ( - _subnet_pool.address_scope_id, - _subnet_pool.default_prefixlen, - _subnet_pool.default_quota, - _subnet_pool.id, - _subnet_pool.ip_version, - _subnet_pool.is_default, - _subnet_pool.max_prefixlen, - _subnet_pool.min_prefixlen, - _subnet_pool.name, - utils.format_list(_subnet_pool.prefixes), - _subnet_pool.project_id, - _subnet_pool.shared, - ) - - def setUp(self): - super(TestCreateSubnetPool, self).setUp() - - self.network.create_subnet_pool = mock.Mock( - return_value=self._subnet_pool) - - # Get the command object to test - self.cmd = subnet_pool.CreateSubnetPool(self.app, self.namespace) - - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_no_pool_prefix(self): - """Make sure --pool-prefix is a required argument""" - arglist = [ - self._subnet_pool.name, - ] - verifylist = [ - ('name', self._subnet_pool.name), - ] - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_prefixlen_options(self): - arglist = [ - '--default-prefix-length', self._subnet_pool.default_prefixlen, - '--max-prefix-length', self._subnet_pool.max_prefixlen, - '--min-prefix-length', self._subnet_pool.min_prefixlen, - '--pool-prefix', '10.0.10.0/24', - self._subnet_pool.name, - ] - verifylist = [ - ('default_prefix_length', - int(self._subnet_pool.default_prefixlen)), - ('max_prefix_length', int(self._subnet_pool.max_prefixlen)), - ('min_prefix_length', int(self._subnet_pool.min_prefixlen)), - ('name', self._subnet_pool.name), - ('prefixes', ['10.0.10.0/24']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'default_prefixlen': int(self._subnet_pool.default_prefixlen), - 'max_prefixlen': int(self._subnet_pool.max_prefixlen), - 'min_prefixlen': int(self._subnet_pool.min_prefixlen), - 'prefixes': ['10.0.10.0/24'], - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_len_negative(self): - arglist = [ - self._subnet_pool.name, - '--min-prefix-length', '-16', - ] - verifylist = [ - ('subnet_pool', self._subnet_pool.name), - ('min_prefix_length', '-16'), - ] - - self.assertRaises(argparse.ArgumentTypeError, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_project_domain(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - "--project", self.project.name, - "--project-domain", self.domain.name, - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'tenant_id': self.project.id, - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_address_scope_option(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--address-scope', self._address_scope.id, - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('address_scope', self._address_scope.id), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'address_scope_id': self._address_scope.id, - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_default_and_shared_options(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--default', - '--share', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('default', True), - ('share', True), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'is_default': True, - 'name': self._subnet_pool.name, - 'prefixes': ['10.0.10.0/24'], - 'shared': True, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSubnetPool(TestSubnetPool): - - # The subnet pools to delete. - _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=2) - - def setUp(self): - super(TestDeleteSubnetPool, self).setUp() - - self.network.delete_subnet_pool = mock.Mock(return_value=None) - - self.network.find_subnet_pool = ( - network_fakes.FakeSubnetPool.get_subnet_pools(self._subnet_pools) - ) - - # Get the command object to test - self.cmd = subnet_pool.DeleteSubnetPool(self.app, self.namespace) - - def test_subnet_pool_delete(self): - arglist = [ - self._subnet_pools[0].name, - ] - verifylist = [ - ('subnet_pool', [self._subnet_pools[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_subnet_pool.assert_called_once_with( - self._subnet_pools[0]) - self.assertIsNone(result) - - def test_multi_subnet_pools_delete(self): - arglist = [] - verifylist = [] - - for s in self._subnet_pools: - arglist.append(s.name) - verifylist = [ - ('subnet_pool', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._subnet_pools: - calls.append(call(s)) - self.network.delete_subnet_pool.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_subnet_pools_delete_with_exception(self): - arglist = [ - self._subnet_pools[0].name, - 'unexist_subnet_pool', - ] - verifylist = [ - ('subnet_pool', - [self._subnet_pools[0].name, 'unexist_subnet_pool']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._subnet_pools[0], exceptions.CommandError] - self.network.find_subnet_pool = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 subnet pools failed to delete.', str(e)) - - self.network.find_subnet_pool.assert_any_call( - self._subnet_pools[0].name, ignore_missing=False) - self.network.find_subnet_pool.assert_any_call( - 'unexist_subnet_pool', ignore_missing=False) - self.network.delete_subnet_pool.assert_called_once_with( - self._subnet_pools[0] - ) - - -class TestListSubnetPool(TestSubnetPool): - # The subnet pools going to be listed up. - _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=3) - - columns = ( - 'ID', - 'Name', - 'Prefixes', - ) - columns_long = columns + ( - 'Default Prefix Length', - 'Address Scope', - 'Default Subnet Pool', - 'Shared', - ) - - data = [] - for pool in _subnet_pools: - data.append(( - pool.id, - pool.name, - utils.format_list(pool.prefixes), - )) - - data_long = [] - for pool in _subnet_pools: - data_long.append(( - pool.id, - pool.name, - utils.format_list(pool.prefixes), - pool.default_prefixlen, - pool.address_scope_id, - pool.is_default, - pool.shared, - )) - - def setUp(self): - super(TestListSubnetPool, self).setUp() - - # Get the command object to test - self.cmd = subnet_pool.ListSubnetPool(self.app, self.namespace) - - self.network.subnet_pools = mock.Mock(return_value=self._subnet_pools) - - def test_subnet_pool_list_no_option(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnet_pools.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_pool_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnet_pools.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestSetSubnetPool(TestSubnetPool): - - # The subnet_pool to set. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - def setUp(self): - super(TestSetSubnetPool, self).setUp() - - self.network.update_subnet_pool = mock.Mock(return_value=None) - - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool) - - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = subnet_pool.SetSubnetPool(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - '--name', 'noob', - '--default-prefix-length', '8', - '--min-prefix-length', '8', - self._subnet_pool.name, - ] - verifylist = [ - ('name', 'noob'), - ('default_prefix_length', 8), - ('min_prefix_length', 8), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'name': 'noob', - 'default_prefixlen': 8, - 'min_prefixlen': 8, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - '--pool-prefix', '10.0.1.0/24', - '--pool-prefix', '10.0.2.0/24', - '--max-prefix-length', '16', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.1.0/24', '10.0.2.0/24']), - ('max_prefix_length', 16), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - prefixes = ['10.0.1.0/24', '10.0.2.0/24'] - prefixes.extend(self._subnet_pool.prefixes) - attrs = { - 'prefixes': prefixes, - 'max_prefixlen': 16, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._subnet_pool.name, ] - verifylist = [('subnet_pool', self._subnet_pool.name), ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_len_negative(self): - arglist = [ - '--max-prefix-length', '-16', - self._subnet_pool.name, - ] - verifylist = [ - ('max_prefix_length', '-16'), - ('subnet_pool', self._subnet_pool.name), - ] - - self.assertRaises(argparse.ArgumentTypeError, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_address_scope(self): - arglist = [ - '--address-scope', self._address_scope.id, - self._subnet_pool.name, - ] - verifylist = [ - ('address_scope', self._address_scope.id), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'address_scope_id': self._address_scope.id, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_address_scope(self): - arglist = [ - '--no-address-scope', - self._subnet_pool.name, - ] - verifylist = [ - ('no_address_scope', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'address_scope_id': None, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_address_scope_conflict(self): - arglist = [ - '--address-scope', self._address_scope.id, - '--no-address-scope', - self._subnet_pool.name, - ] - verifylist = [ - ('address_scope', self._address_scope.id), - ('no_address_scope', True), - ('subnet_pool', self._subnet_pool.name), - ] - - # Exclusive arguments will conflict here. - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_default(self): - arglist = [ - '--default', - self._subnet_pool.name, - ] - verifylist = [ - ('default', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'is_default': True - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_default(self): - arglist = [ - '--no-default', - self._subnet_pool.name, - ] - verifylist = [ - ('no_default', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'is_default': False, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_default_conflict(self): - arglist = [ - '--default', - '--no-default', - self._subnet_pool.name, - ] - verifylist = [ - ('default', True), - ('no_default', True), - ('subnet_pool', self._subnet_pool.name), - ] - - # Exclusive arguments will conflict here. - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - -class TestShowSubnetPool(TestSubnetPool): - - # The subnet_pool to set. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - columns = ( - 'address_scope_id', - 'default_prefixlen', - 'default_quota', - 'id', - 'ip_version', - 'is_default', - 'max_prefixlen', - 'min_prefixlen', - 'name', - 'prefixes', - 'project_id', - 'shared', - ) - - data = ( - _subnet_pool.address_scope_id, - _subnet_pool.default_prefixlen, - _subnet_pool.default_quota, - _subnet_pool.id, - _subnet_pool.ip_version, - _subnet_pool.is_default, - _subnet_pool.max_prefixlen, - _subnet_pool.min_prefixlen, - _subnet_pool.name, - utils.format_list(_subnet_pool.prefixes), - _subnet_pool.tenant_id, - _subnet_pool.shared, - ) - - def setUp(self): - super(TestShowSubnetPool, self).setUp() - - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool - ) - - # Get the command object to test - self.cmd = subnet_pool.ShowSubnetPool(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._subnet_pool.name, - ] - verifylist = [ - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_subnet_pool.assert_called_once_with( - self._subnet_pool.name, - ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetSubnetPool(TestSubnetPool): - - def setUp(self): - super(TestUnsetSubnetPool, self).setUp() - self._subnetpool = network_fakes.FakeSubnetPool.create_one_subnet_pool( - {'prefixes': ['10.0.10.0/24', '10.1.10.0/24', - '10.2.10.0/24'], }) - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnetpool) - self.network.update_subnet_pool = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = subnet_pool.UnsetSubnetPool(self.app, self.namespace) - - def test_unset_subnet_pool(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--pool-prefix', '10.1.10.0/24', - self._subnetpool.name, - ] - verifylist = [('prefixes', ['10.0.10.0/24', '10.1.10.0/24'])] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = {'prefixes': ['10.2.10.0/24']} - self.network.update_subnet_pool.assert_called_once_with( - self._subnetpool, **attrs) - self.assertIsNone(result) - - def test_unset_subnet_pool_prefix_not_existent(self): - arglist = [ - '--pool-prefix', '10.100.1.1/25', - self._subnetpool.name, - ] - verifylist = [('prefixes', ['10.100.1.1/25'])] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) |
