diff options
Diffstat (limited to 'openstackclient/tests/network')
18 files changed, 0 insertions, 9584 deletions
diff --git a/openstackclient/tests/network/__init__.py b/openstackclient/tests/network/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/openstackclient/tests/network/__init__.py +++ /dev/null diff --git a/openstackclient/tests/network/test_common.py b/openstackclient/tests/network/test_common.py deleted file mode 100644 index 48608734..00000000 --- a/openstackclient/tests/network/test_common.py +++ /dev/null @@ -1,174 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock - -from openstackclient.network import common -from openstackclient.tests import utils - - -def _add_common_argument(parser): - parser.add_argument( - 'common', - metavar='<common>', - help='Common argument', - ) - return parser - - -def _add_network_argument(parser): - parser.add_argument( - 'network', - metavar='<network>', - help='Network argument', - ) - return parser - - -def _add_compute_argument(parser): - parser.add_argument( - 'compute', - metavar='<compute>', - help='Compute argument', - ) - return parser - - -class FakeNetworkAndComputeCommand(common.NetworkAndComputeCommand): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class FakeNetworkAndComputeLister(common.NetworkAndComputeLister): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class FakeNetworkAndComputeShowOne(common.NetworkAndComputeShowOne): - - def update_parser_common(self, parser): - return _add_common_argument(parser) - - def update_parser_network(self, parser): - return _add_network_argument(parser) - - def update_parser_compute(self, parser): - return _add_compute_argument(parser) - - def take_action_network(self, client, parsed_args): - return client.network_action(parsed_args) - - def take_action_compute(self, client, parsed_args): - return client.compute_action(parsed_args) - - -class TestNetworkAndCompute(utils.TestCommand): - - def setUp(self): - super(TestNetworkAndCompute, self).setUp() - - self.namespace = argparse.Namespace() - - # Create network client mocks. - self.app.client_manager.network = mock.Mock() - self.network = self.app.client_manager.network - self.network.network_action = mock.Mock( - return_value='take_action_network') - - # Create compute client mocks. - self.app.client_manager.compute = mock.Mock() - self.compute = self.app.client_manager.compute - self.compute.compute_action = mock.Mock( - return_value='take_action_compute') - - # Subclasses can override the command object to test. - self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) - - def test_take_action_network(self): - arglist = [ - 'common', - 'network' - ] - verifylist = [ - ('common', 'common'), - ('network', 'network') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - self.network.network_action.assert_called_with(parsed_args) - self.assertEqual('take_action_network', result) - - def test_take_action_compute(self): - arglist = [ - 'common', - 'compute' - ] - verifylist = [ - ('common', 'common'), - ('compute', 'compute') - ] - - self.app.client_manager.network_endpoint_enabled = False - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - self.compute.compute_action.assert_called_with(parsed_args) - self.assertEqual('take_action_compute', result) - - -class TestNetworkAndComputeCommand(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeCommand, self).setUp() - self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) - - -class TestNetworkAndComputeLister(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeLister, self).setUp() - self.cmd = FakeNetworkAndComputeLister(self.app, self.namespace) - - -class TestNetworkAndComputeShowOne(TestNetworkAndCompute): - - def setUp(self): - super(TestNetworkAndComputeShowOne, self).setUp() - self.cmd = FakeNetworkAndComputeShowOne(self.app, self.namespace) diff --git a/openstackclient/tests/network/v2/__init__.py b/openstackclient/tests/network/v2/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/openstackclient/tests/network/v2/__init__.py +++ /dev/null diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py deleted file mode 100644 index 33bc4017..00000000 --- a/openstackclient/tests/network/v2/fakes.py +++ /dev/null @@ -1,1100 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import copy -import mock -import uuid - -from openstackclient.tests import fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests import utils - - -QUOTA = { - "subnet": 10, - "network": 10, - "floatingip": 50, - "subnetpool": -1, - "security_group_rule": 100, - "security_group": 10, - "router": 10, - "rbac_policy": -1, - "port": 50, - "vip": 10, - "member": 10, - "health_monitor": 10, -} - - -class FakeNetworkV2Client(object): - - def __init__(self, **kwargs): - self.extensions = mock.Mock() - self.extensions.resource_class = fakes.FakeResource(None, {}) - - -class TestNetworkV2(utils.TestCommand): - - def setUp(self): - super(TestNetworkV2, self).setUp() - - self.namespace = argparse.Namespace() - - self.app.client_manager.session = mock.Mock() - - self.app.client_manager.network = FakeNetworkV2Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - - self.app.client_manager.identity = ( - identity_fakes_v3.FakeIdentityv3Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - ) - - -class FakeAddressScope(object): - """Fake one or more address scopes.""" - - @staticmethod - def create_one_address_scope(attrs=None): - """Create a fake address scope. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, id, etc. - """ - attrs = attrs or {} - - # Set default attributes. - address_scope_attrs = { - 'name': 'address-scope-name-' + uuid.uuid4().hex, - 'id': 'address-scope-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'shared': False, - 'ip_version': 4, - } - - # Overwrite default attributes. - address_scope_attrs.update(attrs) - - address_scope = fakes.FakeResource( - info=copy.deepcopy(address_scope_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - address_scope.project_id = address_scope_attrs['tenant_id'] - - return address_scope - - @staticmethod - def create_address_scopes(attrs=None, count=2): - """Create multiple fake address scopes. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of address scopes to fake - :return: - A list of FakeResource objects faking the address scopes - """ - address_scopes = [] - for i in range(0, count): - address_scopes.append( - FakeAddressScope.create_one_address_scope(attrs)) - - return address_scopes - - @staticmethod - def get_address_scopes(address_scopes=None, count=2): - """Get an iterable MagicMock object with a list of faked address scopes. - - If address scopes list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List address scopes: - A list of FakeResource objects faking address scopes - :param int count: - The number of address scopes to fake - :return: - An iterable Mock object with side_effect set to a list of faked - address scopes - """ - if address_scopes is None: - address_scopes = FakeAddressScope.create_address_scopes(count) - return mock.MagicMock(side_effect=address_scopes) - - -class FakeAvailabilityZone(object): - """Fake one or more network availability zones (AZs).""" - - @staticmethod - def create_one_availability_zone(attrs=None): - """Create a fake AZ. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, state, etc. - """ - attrs = attrs or {} - - # Set default attributes. - availability_zone = { - 'name': uuid.uuid4().hex, - 'state': 'available', - 'resource': 'network', - } - - # Overwrite default attributes. - availability_zone.update(attrs) - - availability_zone = fakes.FakeResource( - info=copy.deepcopy(availability_zone), - loaded=True) - return availability_zone - - @staticmethod - def create_availability_zones(attrs=None, count=2): - """Create multiple fake AZs. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of AZs to fake - :return: - A list of FakeResource objects faking the AZs - """ - availability_zones = [] - for i in range(0, count): - availability_zone = \ - FakeAvailabilityZone.create_one_availability_zone(attrs) - availability_zones.append(availability_zone) - - return availability_zones - - -class FakeIPAvailability(object): - """Fake one or more network ip availabilities.""" - - @staticmethod - def create_one_ip_availability(): - """Create a fake list with ip availability stats of a network. - - :return: - A FakeResource object with network_name, network_id, etc. - """ - - # Set default attributes. - network_ip_availability = { - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'network_name': 'network-name-' + uuid.uuid4().hex, - 'tenant_id': '', - 'subnet_ip_availability': [], - 'total_ips': 254, - 'used_ips': 6, - } - - network_ip_availability = fakes.FakeResource( - info=copy.deepcopy(network_ip_availability), - loaded=True) - return network_ip_availability - - @staticmethod - def create_ip_availability(count=2): - """Create fake list of ip availability stats of multiple networks. - - :param int count: - The number of networks to fake - :return: - A list of FakeResource objects faking network ip availability stats - """ - network_ip_availabilities = [] - for i in range(0, count): - network_ip_availability = \ - FakeIPAvailability.create_one_ip_availability() - network_ip_availabilities.append(network_ip_availability) - - return network_ip_availabilities - - -class FakeExtension(object): - """Fake one or more extension.""" - - @staticmethod - def create_one_extension(attrs=None): - """Create a fake extension. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object with name, namespace, etc. - """ - attrs = attrs or {} - - # Set default attributes. - extension_info = { - 'name': 'name-' + uuid.uuid4().hex, - 'namespace': 'http://docs.openstack.org/network/', - 'description': 'description-' + uuid.uuid4().hex, - 'updated': '2013-07-09T12:00:0-00:00', - 'alias': 'Dystopian', - 'links': '[{"href":''"https://github.com/os/network", "type"}]', - } - - # Overwrite default attributes. - extension_info.update(attrs) - - extension = fakes.FakeResource( - info=copy.deepcopy(extension_info), - loaded=True) - return extension - - -class FakeNetwork(object): - """Fake one or more networks.""" - - @staticmethod - def create_one_network(attrs=None): - """Create a fake network. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - network_attrs = { - 'id': 'network-id-' + uuid.uuid4().hex, - 'name': 'network-name-' + uuid.uuid4().hex, - 'status': 'ACTIVE', - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'admin_state_up': True, - 'shared': False, - 'subnets': ['a', 'b'], - 'provider_network_type': 'vlan', - 'router:external': True, - 'availability_zones': [], - 'availability_zone_hints': [], - 'is_default': False, - 'port_security_enabled': True, - } - - # Overwrite default attributes. - network_attrs.update(attrs) - - network = fakes.FakeResource(info=copy.deepcopy(network_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - network.project_id = network_attrs['tenant_id'] - network.is_router_external = network_attrs['router:external'] - network.is_port_security_enabled = \ - network_attrs['port_security_enabled'] - - return network - - @staticmethod - def create_networks(attrs=None, count=2): - """Create multiple fake networks. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of networks to fake - :return: - A list of FakeResource objects faking the networks - """ - networks = [] - for i in range(0, count): - networks.append(FakeNetwork.create_one_network(attrs)) - - return networks - - @staticmethod - def get_networks(networks=None, count=2): - """Get an iterable MagicMock object with a list of faked networks. - - If networks list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List networks: - A list of FakeResource objects faking networks - :param int count: - The number of networks to fake - :return: - An iterable Mock object with side_effect set to a list of faked - networks - """ - if networks is None: - networks = FakeNetwork.create_networks(count) - return mock.MagicMock(side_effect=networks) - - -class FakeNetworkSegment(object): - """Fake one or more network segments.""" - - @staticmethod - def create_one_network_segment(attrs=None): - """Create a fake network segment. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the network segment - """ - attrs = attrs or {} - - # Set default attributes. - network_segment_attrs = { - 'id': 'network-segment-id-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'network_type': 'vlan', - 'physical_network': 'physical-network-name-' + uuid.uuid4().hex, - 'segmentation_id': 1024, - } - - # Overwrite default attributes. - network_segment_attrs.update(attrs) - - network_segment = fakes.FakeResource( - info=copy.deepcopy(network_segment_attrs), - loaded=True - ) - - return network_segment - - @staticmethod - def create_network_segments(attrs=None, count=2): - """Create multiple fake network segments. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of network segments to fake - :return: - A list of FakeResource objects faking the network segments - """ - network_segments = [] - for i in range(0, count): - network_segments.append( - FakeNetworkSegment.create_one_network_segment(attrs) - ) - return network_segments - - -class FakePort(object): - """Fake one or more ports.""" - - @staticmethod - def create_one_port(attrs=None): - """Create a fake port. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - port_attrs = { - 'admin_state_up': True, - 'allowed_address_pairs': [{}], - 'binding:host_id': 'binding-host-id-' + uuid.uuid4().hex, - 'binding:profile': {}, - 'binding:vif_details': {}, - 'binding:vif_type': 'ovs', - 'binding:vnic_type': 'normal', - 'device_id': 'device-id-' + uuid.uuid4().hex, - 'device_owner': 'compute:nova', - 'dns_assignment': [{}], - 'dns_name': 'dns-name-' + uuid.uuid4().hex, - 'extra_dhcp_opts': [{}], - 'fixed_ips': [{}], - 'id': 'port-id-' + uuid.uuid4().hex, - 'mac_address': 'fa:16:3e:a9:4e:72', - 'name': 'port-name-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'port_security_enabled': True, - 'security_groups': [], - 'status': 'ACTIVE', - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - port_attrs.update(attrs) - - port = fakes.FakeResource(info=copy.deepcopy(port_attrs), - loaded=True) - - # Set attributes with special mappings in OpenStack SDK. - port.project_id = port_attrs['tenant_id'] - port.binding_host_id = port_attrs['binding:host_id'] - port.binding_profile = port_attrs['binding:profile'] - port.binding_vif_details = port_attrs['binding:vif_details'] - port.binding_vif_type = port_attrs['binding:vif_type'] - port.binding_vnic_type = port_attrs['binding:vnic_type'] - - return port - - @staticmethod - def create_ports(attrs=None, count=2): - """Create multiple fake ports. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of ports to fake - :return: - A list of FakeResource objects faking the ports - """ - ports = [] - for i in range(0, count): - ports.append(FakePort.create_one_port(attrs)) - - return ports - - @staticmethod - def get_ports(ports=None, count=2): - """Get an iterable MagicMock object with a list of faked ports. - - If ports list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List ports: - A list of FakeResource objects faking ports - :param int count: - The number of ports to fake - :return: - An iterable Mock object with side_effect set to a list of faked - ports - """ - if ports is None: - ports = FakePort.create_ports(count) - return mock.MagicMock(side_effect=ports) - - -class FakeNetworkAgent(object): - """Fake one or more network agents.""" - - @staticmethod - def create_one_network_agent(attrs=None): - """Create a fake network agent - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, agent_type, and so on. - """ - attrs = attrs or {} - - # Set default attributes - agent_attrs = { - 'id': 'agent-id-' + uuid.uuid4().hex, - 'agent_type': 'agent-type-' + uuid.uuid4().hex, - 'host': 'host-' + uuid.uuid4().hex, - 'availability_zone': 'zone-' + uuid.uuid4().hex, - 'alive': True, - 'admin_state_up': True, - 'binary': 'binary-' + uuid.uuid4().hex, - 'configurations': {'subnet': 2, 'networks': 1}, - } - agent_attrs.update(attrs) - agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs), - loaded=True) - return agent - - @staticmethod - def create_network_agents(attrs=None, count=2): - """Create multiple fake network agents. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of network agents to fake - :return: - A list of FakeResource objects faking the network agents - """ - agents = [] - for i in range(0, count): - agents.append(FakeNetworkAgent.create_one_network_agent(attrs)) - - return agents - - @staticmethod - def get_network_agents(agents=None, count=2): - """Get an iterable MagicMock object with a list of faked network agents. - - If network agents list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List agents: - A list of FakeResource objects faking network agents - :param int count: - The number of network agents to fake - :return: - An iterable Mock object with side_effect set to a list of faked - network agents - """ - if agents is None: - agents = FakeNetworkAgent.create_network_agents(count) - return mock.MagicMock(side_effect=agents) - - -class FakeNetworkRBAC(object): - """Fake one or more network rbac policies.""" - - @staticmethod - def create_one_network_rbac(attrs=None): - """Create a fake network rbac - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, action, target_tenant, - tenant_id, type - """ - attrs = attrs or {} - - # Set default attributes - rbac_attrs = { - 'id': 'rbac-id-' + uuid.uuid4().hex, - 'object_type': 'network', - 'object_id': 'object-id-' + uuid.uuid4().hex, - 'action': 'access_as_shared', - 'target_tenant': 'target-tenant-' + uuid.uuid4().hex, - 'tenant_id': 'tenant-id-' + uuid.uuid4().hex, - } - rbac_attrs.update(attrs) - rbac = fakes.FakeResource(info=copy.deepcopy(rbac_attrs), - loaded=True) - # Set attributes with special mapping in OpenStack SDK. - rbac.project_id = rbac_attrs['tenant_id'] - rbac.target_project_id = rbac_attrs['target_tenant'] - return rbac - - @staticmethod - def create_network_rbacs(attrs=None, count=2): - """Create multiple fake network rbac policies. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of rbac policies to fake - :return: - A list of FakeResource objects faking the rbac policies - """ - rbac_policies = [] - for i in range(0, count): - rbac_policies.append(FakeNetworkRBAC. - create_one_network_rbac(attrs)) - - return rbac_policies - - @staticmethod - def get_network_rbacs(rbac_policies=None, count=2): - """Get an iterable MagicMock object with a list of faked rbac policies. - - If rbac policies list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List rbac_policies: - A list of FakeResource objects faking rbac policies - :param int count: - The number of rbac policies to fake - :return: - An iterable Mock object with side_effect set to a list of faked - rbac policies - """ - if rbac_policies is None: - rbac_policies = FakeNetworkRBAC.create_network_rbacs(count) - return mock.MagicMock(side_effect=rbac_policies) - - -class FakeRouter(object): - """Fake one or more routers.""" - - @staticmethod - def create_one_router(attrs=None): - """Create a fake router. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, admin_state_up, - status, tenant_id - """ - attrs = attrs or {} - - # Set default attributes. - router_attrs = { - 'id': 'router-id-' + uuid.uuid4().hex, - 'name': 'router-name-' + uuid.uuid4().hex, - 'status': 'ACTIVE', - 'admin_state_up': True, - 'distributed': False, - 'ha': False, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'routes': [], - 'external_gateway_info': {}, - 'availability_zone_hints': [], - 'availability_zones': [], - } - - # Overwrite default attributes. - router_attrs.update(attrs) - - router = fakes.FakeResource(info=copy.deepcopy(router_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - router.project_id = router_attrs['tenant_id'] - - return router - - @staticmethod - def create_routers(attrs=None, count=2): - """Create multiple fake routers. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of routers to fake - :return: - A list of FakeResource objects faking the routers - """ - routers = [] - for i in range(0, count): - routers.append(FakeRouter.create_one_router(attrs)) - - return routers - - @staticmethod - def get_routers(routers=None, count=2): - """Get an iterable MagicMock object with a list of faked routers. - - If routers list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List routers: - A list of FakeResource objects faking routers - :param int count: - The number of routers to fake - :return: - An iterable Mock object with side_effect set to a list of faked - routers - """ - if routers is None: - routers = FakeRouter.create_routers(count) - return mock.MagicMock(side_effect=routers) - - -class FakeSecurityGroup(object): - """Fake one or more security groups.""" - - @staticmethod - def create_one_security_group(attrs=None): - """Create a fake security group. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, name, etc. - """ - attrs = attrs or {} - - # Set default attributes. - security_group_attrs = { - 'id': 'security-group-id-' + uuid.uuid4().hex, - 'name': 'security-group-name-' + uuid.uuid4().hex, - 'description': 'security-group-description-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'security_group_rules': [], - } - - # Overwrite default attributes. - security_group_attrs.update(attrs) - - security_group = fakes.FakeResource( - info=copy.deepcopy(security_group_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - security_group.project_id = security_group_attrs['tenant_id'] - - return security_group - - @staticmethod - def create_security_groups(attrs=None, count=2): - """Create multiple fake security groups. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of security groups to fake - :return: - A list of FakeResource objects faking the security groups - """ - security_groups = [] - for i in range(0, count): - security_groups.append( - FakeSecurityGroup.create_one_security_group(attrs)) - - return security_groups - - @staticmethod - def get_security_groups(security_groups=None, count=2): - """Get an iterable MagicMock object with a list of faked security groups. - - If security groups list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List security groups: - A list of FakeResource objects faking security groups - :param int count: - The number of security groups to fake - :return: - An iterable Mock object with side_effect set to a list of faked - security groups - """ - if security_groups is None: - security_groups = FakeSecurityGroup.create_security_groups(count) - return mock.MagicMock(side_effect=security_groups) - - -class FakeSecurityGroupRule(object): - """Fake one or more security group rules.""" - - @staticmethod - def create_one_security_group_rule(attrs=None): - """Create a fake security group rule. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, etc. - """ - attrs = attrs or {} - - # Set default attributes. - security_group_rule_attrs = { - 'direction': 'ingress', - 'ethertype': 'IPv4', - 'id': 'security-group-rule-id-' + uuid.uuid4().hex, - 'port_range_max': None, - 'port_range_min': None, - 'protocol': 'tcp', - 'remote_group_id': None, - 'remote_ip_prefix': '0.0.0.0/0', - 'security_group_id': 'security-group-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - security_group_rule_attrs.update(attrs) - - security_group_rule = fakes.FakeResource( - info=copy.deepcopy(security_group_rule_attrs), - loaded=True) - - # Set attributes with special mapping in OpenStack SDK. - security_group_rule.project_id = security_group_rule_attrs['tenant_id'] - - return security_group_rule - - @staticmethod - def create_security_group_rules(attrs=None, count=2): - """Create multiple fake security group rules. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of security group rules to fake - :return: - A list of FakeResource objects faking the security group rules - """ - security_group_rules = [] - for i in range(0, count): - security_group_rules.append( - FakeSecurityGroupRule.create_one_security_group_rule(attrs)) - - return security_group_rules - - @staticmethod - def get_security_group_rules(security_group_rules=None, count=2): - """Get an iterable MagicMock object with a list of faked security group rules. - - If security group rules list is provided, then initialize the Mock - object with the list. Otherwise create one. - - :param List security group rules: - A list of FakeResource objects faking security group rules - :param int count: - The number of security group rules to fake - :return: - An iterable Mock object with side_effect set to a list of faked - security group rules - """ - if security_group_rules is None: - security_group_rules = ( - FakeSecurityGroupRule.create_security_group_rules(count)) - return mock.MagicMock(side_effect=security_group_rules) - - -class FakeSubnet(object): - """Fake one or more subnets.""" - - @staticmethod - def create_one_subnet(attrs=None): - """Create a fake subnet. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the subnet - """ - attrs = attrs or {} - - # Set default attributes. - project_id = 'project-id-' + uuid.uuid4().hex - subnet_attrs = { - 'id': 'subnet-id-' + uuid.uuid4().hex, - 'name': 'subnet-name-' + uuid.uuid4().hex, - 'network_id': 'network-id-' + uuid.uuid4().hex, - 'cidr': '10.10.10.0/24', - 'tenant_id': project_id, - 'enable_dhcp': True, - 'dns_nameservers': [], - 'allocation_pools': [], - 'host_routes': [], - 'ip_version': 4, - 'gateway_ip': '10.10.10.1', - 'ipv6_address_mode': None, - 'ipv6_ra_mode': None, - 'segment_id': None, - 'service_types': [], - 'subnetpool_id': None, - } - - # Overwrite default attributes. - subnet_attrs.update(attrs) - - subnet = fakes.FakeResource(info=copy.deepcopy(subnet_attrs), - loaded=True) - - # Set attributes with special mappings in OpenStack SDK. - subnet.project_id = subnet_attrs['tenant_id'] - - return subnet - - @staticmethod - def create_subnets(attrs=None, count=2): - """Create multiple fake subnets. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of subnets to fake - :return: - A list of FakeResource objects faking the subnets - """ - subnets = [] - for i in range(0, count): - subnets.append(FakeSubnet.create_one_subnet(attrs)) - - return subnets - - @staticmethod - def get_subnets(subnets=None, count=2): - """Get an iterable MagicMock object with a list of faked subnets. - - If subnets list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List subnets: - A list of FakeResource objects faking subnets - :param int count: - The number of subnets to fake - :return: - An iterable Mock object with side_effect set to a list of faked - subnets - """ - if subnets is None: - subnets = FakeSubnet.create_subnets(count) - return mock.MagicMock(side_effect=subnets) - - -class FakeFloatingIP(object): - """Fake one or more floating ip.""" - - @staticmethod - def create_one_floating_ip(attrs=None): - """Create a fake floating ip. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id, ip, and so on - """ - attrs = attrs or {} - - # Set default attributes. - floating_ip_attrs = { - 'id': 'floating-ip-id-' + uuid.uuid4().hex, - 'floating_ip_address': '1.0.9.0', - 'fixed_ip_address': '2.0.9.0', - 'dns_domain': None, - 'dns_name': None, - 'status': 'DOWN', - 'floating_network_id': 'network-id-' + uuid.uuid4().hex, - 'router_id': 'router-id-' + uuid.uuid4().hex, - 'port_id': 'port-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - floating_ip_attrs.update(attrs) - - floating_ip = fakes.FakeResource( - info=copy.deepcopy(floating_ip_attrs), - loaded=True - ) - - # Set attributes with special mappings in OpenStack SDK. - floating_ip.project_id = floating_ip_attrs['tenant_id'] - - return floating_ip - - @staticmethod - def create_floating_ips(attrs=None, count=2): - """Create multiple fake floating ips. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of floating ips to fake - :return: - A list of FakeResource objects faking the floating ips - """ - floating_ips = [] - for i in range(0, count): - floating_ips.append(FakeFloatingIP.create_one_floating_ip(attrs)) - return floating_ips - - @staticmethod - def get_floating_ips(floating_ips=None, count=2): - """Get an iterable MagicMock object with a list of faked floating ips. - - If floating_ips list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List floating ips: - A list of FakeResource objects faking floating ips - :param int count: - The number of floating ips to fake - :return: - An iterable Mock object with side_effect set to a list of faked - floating ips - """ - if floating_ips is None: - floating_ips = FakeFloatingIP.create_floating_ips(count) - return mock.MagicMock(side_effect=floating_ips) - - -class FakeSubnetPool(object): - """Fake one or more subnet pools.""" - - @staticmethod - def create_one_subnet_pool(attrs=None): - """Create a fake subnet pool. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A FakeResource object faking the subnet pool - """ - attrs = attrs or {} - - # Set default attributes. - subnet_pool_attrs = { - 'id': 'subnet-pool-id-' + uuid.uuid4().hex, - 'name': 'subnet-pool-name-' + uuid.uuid4().hex, - 'prefixes': ['10.0.0.0/24', '10.1.0.0/24'], - 'default_prefixlen': '8', - 'address_scope_id': 'address-scope-id-' + uuid.uuid4().hex, - 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'is_default': False, - 'shared': False, - 'max_prefixlen': '32', - 'min_prefixlen': '8', - 'default_quota': None, - 'ip_version': '4', - } - - # Overwrite default attributes. - subnet_pool_attrs.update(attrs) - - subnet_pool = fakes.FakeResource( - info=copy.deepcopy(subnet_pool_attrs), - loaded=True - ) - - # Set attributes with special mapping in OpenStack SDK. - subnet_pool.project_id = subnet_pool_attrs['tenant_id'] - - return subnet_pool - - @staticmethod - def create_subnet_pools(attrs=None, count=2): - """Create multiple fake subnet pools. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of subnet pools to fake - :return: - A list of FakeResource objects faking the subnet pools - """ - subnet_pools = [] - for i in range(0, count): - subnet_pools.append( - FakeSubnetPool.create_one_subnet_pool(attrs) - ) - - return subnet_pools - - @staticmethod - def get_subnet_pools(subnet_pools=None, count=2): - """Get an iterable MagicMock object with a list of faked subnet pools. - - If subnet_pools list is provided, then initialize the Mock object - with the list. Otherwise create one. - - :param List subnet pools: - A list of FakeResource objects faking subnet pools - :param int count: - The number of subnet pools to fake - :return: - An iterable Mock object with side_effect set to a list of faked - subnet pools - """ - if subnet_pools is None: - subnet_pools = FakeSubnetPool.create_subnet_pools(count) - return mock.MagicMock(side_effect=subnet_pools) diff --git a/openstackclient/tests/network/v2/test_address_scope.py b/openstackclient/tests/network/v2/test_address_scope.py deleted file mode 100644 index 342cf49b..00000000 --- a/openstackclient/tests/network/v2/test_address_scope.py +++ /dev/null @@ -1,399 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import address_scope -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestAddressScope(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestAddressScope, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateAddressScope(TestAddressScope): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new address scope created. - new_address_scope = ( - network_fakes.FakeAddressScope.create_one_address_scope( - attrs={ - 'tenant_id': project.id, - } - )) - columns = ( - 'id', - 'ip_version', - 'name', - 'project_id', - 'shared' - ) - data = ( - new_address_scope.id, - new_address_scope.ip_version, - new_address_scope.name, - new_address_scope.project_id, - new_address_scope.shared, - ) - - def setUp(self): - super(TestCreateAddressScope, self).setUp() - self.network.create_address_scope = mock.Mock( - return_value=self.new_address_scope) - - # Get the command object to test - self.cmd = address_scope.CreateAddressScope(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.new_address_scope.name, - ] - verifylist = [ - ('project', None), - ('ip_version', self.new_address_scope.ip_version), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--ip-version', str(self.new_address_scope.ip_version), - '--share', - '--project', self.project.name, - '--project-domain', self.domain.name, - self.new_address_scope.name, - ] - verifylist = [ - ('ip_version', self.new_address_scope.ip_version), - ('share', True), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'shared': True, - 'tenant_id': self.project.id, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_no_share(self): - arglist = [ - '--no-share', - self.new_address_scope.name, - ] - verifylist = [ - ('no_share', True), - ('name', self.new_address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_address_scope.assert_called_once_with(**{ - 'ip_version': self.new_address_scope.ip_version, - 'shared': False, - 'name': self.new_address_scope.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteAddressScope(TestAddressScope): - - # The address scope to delete. - _address_scopes = ( - network_fakes.FakeAddressScope.create_address_scopes(count=2)) - - def setUp(self): - super(TestDeleteAddressScope, self).setUp() - self.network.delete_address_scope = mock.Mock(return_value=None) - self.network.find_address_scope = ( - network_fakes.FakeAddressScope.get_address_scopes( - address_scopes=self._address_scopes) - ) - - # Get the command object to test - self.cmd = address_scope.DeleteAddressScope(self.app, self.namespace) - - def test_address_scope_delete(self): - arglist = [ - self._address_scopes[0].name, - ] - verifylist = [ - ('address_scope', [self._address_scopes[0].name]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_address_scope.assert_called_once_with( - self._address_scopes[0].name, ignore_missing=False) - self.network.delete_address_scope.assert_called_once_with( - self._address_scopes[0]) - self.assertIsNone(result) - - def test_multi_address_scopes_delete(self): - arglist = [] - verifylist = [] - - for a in self._address_scopes: - arglist.append(a.name) - verifylist = [ - ('address_scope', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for a in self._address_scopes: - calls.append(call(a)) - self.network.delete_address_scope.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_address_scopes_delete_with_exception(self): - arglist = [ - self._address_scopes[0].name, - 'unexist_address_scope', - ] - verifylist = [ - ('address_scope', - [self._address_scopes[0].name, 'unexist_address_scope']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._address_scopes[0], exceptions.CommandError] - self.network.find_address_scope = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 address scopes failed to delete.', str(e)) - - self.network.find_address_scope.assert_any_call( - self._address_scopes[0].name, ignore_missing=False) - self.network.find_address_scope.assert_any_call( - 'unexist_address_scope', ignore_missing=False) - self.network.delete_address_scope.assert_called_once_with( - self._address_scopes[0] - ) - - -class TestListAddressScope(TestAddressScope): - - # The address scopes to list up. - address_scopes = ( - network_fakes.FakeAddressScope.create_address_scopes(count=3)) - columns = ( - 'ID', - 'Name', - 'IP Version', - 'Shared', - 'Project', - ) - data = [] - for scope in address_scopes: - data.append(( - scope.id, - scope.name, - scope.ip_version, - scope.shared, - scope.project_id, - )) - - def setUp(self): - super(TestListAddressScope, self).setUp() - self.network.address_scopes = mock.Mock( - return_value=self.address_scopes) - - # Get the command object to test - self.cmd = address_scope.ListAddressScope(self.app, self.namespace) - - def test_address_scope_list(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.address_scopes.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetAddressScope(TestAddressScope): - - # The address scope to set. - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - def setUp(self): - super(TestSetAddressScope, self).setUp() - self.network.update_address_scope = mock.Mock(return_value=None) - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = address_scope.SetAddressScope(self.app, self.namespace) - - def test_set_nothing(self): - arglist = [self._address_scope.name, ] - verifylist = [ - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - def test_set_name_and_share(self): - arglist = [ - '--name', 'new_address_scope', - '--share', - self._address_scope.name, - ] - verifylist = [ - ('name', 'new_address_scope'), - ('share', True), - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'name': "new_address_scope", - 'shared': True, - } - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - def test_set_no_share(self): - arglist = [ - '--no-share', - self._address_scope.name, - ] - verifylist = [ - ('no_share', True), - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'shared': False, - } - self.network.update_address_scope.assert_called_with( - self._address_scope, **attrs) - self.assertIsNone(result) - - -class TestShowAddressScope(TestAddressScope): - - # The address scope to show. - _address_scope = ( - network_fakes.FakeAddressScope.create_one_address_scope()) - columns = ( - 'id', - 'ip_version', - 'name', - 'project_id', - 'shared', - ) - data = ( - _address_scope.id, - _address_scope.ip_version, - _address_scope.name, - _address_scope.project_id, - _address_scope.shared, - ) - - def setUp(self): - super(TestShowAddressScope, self).setUp() - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = address_scope.ShowAddressScope(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._address_scope.name, - ] - verifylist = [ - ('address_scope', self._address_scope.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_address_scope.assert_called_once_with( - self._address_scope.name, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/network/v2/test_floating_ip.py b/openstackclient/tests/network/v2/test_floating_ip.py deleted file mode 100644 index 234fe446..00000000 --- a/openstackclient/tests/network/v2/test_floating_ip.py +++ /dev/null @@ -1,565 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import floating_ip -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -# Tests for Neutron network -# -class TestFloatingIPNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestFloatingIPNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestCreateFloatingIPNetwork(TestFloatingIPNetwork): - - # Fake data for option tests. - floating_network = network_fakes.FakeNetwork.create_one_network() - subnet = network_fakes.FakeSubnet.create_one_subnet() - port = network_fakes.FakePort.create_one_port() - - # The floating ip to be deleted. - floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip( - attrs={ - 'floating_network_id': floating_network.id, - 'port_id': port.id, - } - ) - - columns = ( - 'dns_domain', - 'dns_name', - 'fixed_ip_address', - 'floating_ip_address', - 'floating_network_id', - 'id', - 'port_id', - 'project_id', - 'router_id', - 'status', - ) - - data = ( - floating_ip.dns_domain, - floating_ip.dns_name, - floating_ip.fixed_ip_address, - floating_ip.floating_ip_address, - floating_ip.floating_network_id, - floating_ip.id, - floating_ip.port_id, - floating_ip.project_id, - floating_ip.router_id, - floating_ip.status, - ) - - def setUp(self): - super(TestCreateFloatingIPNetwork, self).setUp() - - self.network.create_ip = mock.Mock(return_value=self.floating_ip) - - self.network.find_network = mock.Mock( - return_value=self.floating_network) - self.network.find_subnet = mock.Mock(return_value=self.subnet) - self.network.find_port = mock.Mock(return_value=self.port) - - # Get the command object to test - self.cmd = floating_ip.CreateFloatingIP(self.app, self.namespace) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.floating_ip.floating_network_id, - ] - verifylist = [ - ('network', self.floating_ip.floating_network_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_ip.assert_called_once_with(**{ - 'floating_network_id': self.floating_ip.floating_network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--subnet', self.subnet.id, - '--port', self.floating_ip.port_id, - '--floating-ip-address', self.floating_ip.floating_ip_address, - '--fixed-ip-address', self.floating_ip.fixed_ip_address, - self.floating_ip.floating_network_id, - ] - verifylist = [ - ('subnet', self.subnet.id), - ('port', self.floating_ip.port_id), - ('floating_ip_address', self.floating_ip.floating_ip_address), - ('fixed_ip_address', self.floating_ip.fixed_ip_address), - ('network', self.floating_ip.floating_network_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_ip.assert_called_once_with(**{ - 'subnet_id': self.subnet.id, - 'port_id': self.floating_ip.port_id, - 'floating_ip_address': self.floating_ip.floating_ip_address, - 'fixed_ip_address': self.floating_ip.fixed_ip_address, - 'floating_network_id': self.floating_ip.floating_network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ips to be deleted. - floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2) - - def setUp(self): - super(TestDeleteFloatingIPNetwork, self).setUp() - - self.network.delete_ip = mock.Mock(return_value=None) - self.network.find_ip = ( - network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) - - # Get the command object to test - self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace) - - def test_floating_ip_delete(self): - arglist = [ - self.floating_ips[0].id, - ] - verifylist = [ - ('floating_ip', [self.floating_ips[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.find_ip.assert_called_once_with( - self.floating_ips[0].id, ignore_missing=False) - self.network.delete_ip.assert_called_once_with(self.floating_ips[0]) - self.assertIsNone(result) - - def test_multi_floating_ips_delete(self): - arglist = [] - verifylist = [] - - for f in self.floating_ips: - arglist.append(f.id) - verifylist = [ - ('floating_ip', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for f in self.floating_ips: - calls.append(call(f)) - self.network.delete_ip.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_floating_ips_delete_with_exception(self): - arglist = [ - self.floating_ips[0].id, - 'unexist_floating_ip', - ] - verifylist = [ - ('floating_ip', - [self.floating_ips[0].id, 'unexist_floating_ip']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.network.find_ip = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - - self.network.find_ip.assert_any_call( - self.floating_ips[0].id, ignore_missing=False) - self.network.find_ip.assert_any_call( - 'unexist_floating_ip', ignore_missing=False) - self.network.delete_ip.assert_called_once_with( - self.floating_ips[0] - ) - - -class TestListFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ips to list up - floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=3) - - columns = ( - 'ID', - 'Floating IP Address', - 'Fixed IP Address', - 'Port', - ) - - data = [] - for ip in floating_ips: - data.append(( - ip.id, - ip.floating_ip_address, - ip.fixed_ip_address, - ip.port_id, - )) - - def setUp(self): - super(TestListFloatingIPNetwork, self).setUp() - - self.network.ips = mock.Mock(return_value=self.floating_ips) - - # Get the command object to test - self.cmd = floating_ip.ListFloatingIP(self.app, self.namespace) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ips.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowFloatingIPNetwork(TestFloatingIPNetwork): - - # The floating ip to display. - floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'dns_domain', - 'dns_name', - 'fixed_ip_address', - 'floating_ip_address', - 'floating_network_id', - 'id', - 'port_id', - 'project_id', - 'router_id', - 'status', - ) - - data = ( - floating_ip.dns_domain, - floating_ip.dns_name, - floating_ip.fixed_ip_address, - floating_ip.floating_ip_address, - floating_ip.floating_network_id, - floating_ip.id, - floating_ip.port_id, - floating_ip.tenant_id, - floating_ip.router_id, - floating_ip.status, - ) - - def setUp(self): - super(TestShowFloatingIPNetwork, self).setUp() - - self.network.find_ip = mock.Mock(return_value=self.floating_ip) - - # Get the command object to test - self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace) - - def test_floating_ip_show(self): - arglist = [ - self.floating_ip.id, - ] - verifylist = [ - ('floating_ip', self.floating_ip.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_ip.assert_called_once_with( - self.floating_ip.id, - ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -# Tests for Nova network -# -class TestFloatingIPCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestFloatingIPCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to be deleted. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestCreateFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.create.return_value = self.floating_ip - - # Get the command object to test - self.cmd = floating_ip.CreateFloatingIP(self.app, None) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.floating_ip.pool, - ] - verifylist = [ - ('network', self.floating_ip.pool), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.create.assert_called_once_with( - self.floating_ip.pool) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be deleted. - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2) - - def setUp(self): - super(TestDeleteFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.delete.return_value = None - - # Return value of utils.find_resource() - self.compute.floating_ips.get = ( - compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) - - # Get the command object to test - self.cmd = floating_ip.DeleteFloatingIP(self.app, None) - - def test_floating_ip_delete(self): - arglist = [ - self.floating_ips[0].id, - ] - verifylist = [ - ('floating_ip', [self.floating_ips[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - self.assertIsNone(result) - - def test_multi_floating_ips_delete(self): - arglist = [] - verifylist = [] - - for f in self.floating_ips: - arglist.append(f.id) - verifylist = [ - ('floating_ip', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for f in self.floating_ips: - calls.append(call(f.id)) - self.compute.floating_ips.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_floating_ips_delete_with_exception(self): - arglist = [ - self.floating_ips[0].id, - 'unexist_floating_ip', - ] - verifylist = [ - ('floating_ip', - [self.floating_ips[0].id, 'unexist_floating_ip']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.compute.floating_ips.get = ( - mock.MagicMock(side_effect=find_mock_result) - ) - self.compute.floating_ips.find.side_effect = exceptions.NotFound(None) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - - self.compute.floating_ips.get.assert_any_call( - self.floating_ips[0].id) - self.compute.floating_ips.get.assert_any_call( - 'unexist_floating_ip') - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - - -class TestListFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be list up - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3) - - columns = ( - 'ID', - 'Floating IP Address', - 'Fixed IP Address', - 'Server', - 'Pool', - ) - - data = [] - for ip in floating_ips: - data.append(( - ip.id, - ip.ip, - ip.fixed_ip, - ip.instance_id, - ip.pool, - )) - - def setUp(self): - super(TestListFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.list.return_value = self.floating_ips - - # Get the command object to test - self.cmd = floating_ip.ListFloatingIP(self.app, None) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to display. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestShowFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Return value of utils.find_resource() - self.compute.floating_ips.get.return_value = self.floating_ip - - # Get the command object to test - self.cmd = floating_ip.ShowFloatingIP(self.app, None) - - def test_floating_ip_show(self): - arglist = [ - self.floating_ip.id, - ] - verifylist = [ - ('floating_ip', self.floating_ip.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_floating_ip_pool.py b/openstackclient/tests/network/v2/test_floating_ip_pool.py deleted file mode 100644 index 22d20d20..00000000 --- a/openstackclient/tests/network/v2/test_floating_ip_pool.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from osc_lib import exceptions - -from openstackclient.network.v2 import floating_ip_pool -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes - - -# Tests for Network API v2 -# -class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestFloatingIPPoolNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork): - - def setUp(self): - super(TestListFloatingIPPoolNetwork, self).setUp() - - # Get the command object to test - self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, - self.namespace) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - -# Tests for Compute network -# -class TestFloatingIPPoolCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestFloatingIPPoolCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestListFloatingIPPoolCompute(TestFloatingIPPoolCompute): - - # The floating ip pools to list up - floating_ip_pools = \ - compute_fakes.FakeFloatingIPPool.create_floating_ip_pools(count=3) - - columns = ( - 'Name', - ) - - data = [] - for pool in floating_ip_pools: - data.append(( - pool.name, - )) - - def setUp(self): - super(TestListFloatingIPPoolCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ip_pools.list.return_value = \ - self.floating_ip_pools - - # Get the command object to test - self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ip_pools.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/network/v2/test_ip_availability.py b/openstackclient/tests/network/v2/test_ip_availability.py deleted file mode 100644 index f74bf8f7..00000000 --- a/openstackclient/tests/network/v2/test_ip_availability.py +++ /dev/null @@ -1,172 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from osc_lib import utils as common_utils - -from openstackclient.network.v2 import ip_availability -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestIPAvailability(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestIPAvailability, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - - self.project = identity_fakes.FakeProject.create_one_project() - self.projects_mock.get.return_value = self.project - - -class TestListIPAvailability(TestIPAvailability): - - _ip_availability = \ - network_fakes.FakeIPAvailability.create_ip_availability(count=3) - columns = ( - 'Network ID', - 'Network Name', - 'Total IPs', - 'Used IPs', - ) - data = [] - for net in _ip_availability: - data.append(( - net.network_id, - net.network_name, - net.total_ips, - net.used_ips, - )) - - def setUp(self): - super(TestListIPAvailability, self).setUp() - - self.cmd = ip_availability.ListIPAvailability( - self.app, self.namespace) - self.network.network_ip_availabilities = mock.Mock( - return_value=self._ip_availability) - - def test_list_no_options(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_ip_version(self): - arglist = [ - '--ip-version', str(4), - ] - verifylist = [ - ('ip_version', 4) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_project(self): - arglist = [ - '--project', self.project.name - ] - verifylist = [ - ('project', self.project.name) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'tenant_id': self.project.id, - 'ip_version': 4} - - self.network.network_ip_availabilities.assert_called_once_with( - **filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowIPAvailability(TestIPAvailability): - - _ip_availability = \ - network_fakes.FakeIPAvailability.create_one_ip_availability() - - columns = ( - 'network_id', - 'network_name', - 'project_id', - 'subnet_ip_availability', - 'total_ips', - 'used_ips', - ) - data = ( - _ip_availability.network_id, - _ip_availability.network_name, - _ip_availability.tenant_id, - common_utils.format_list( - _ip_availability.subnet_ip_availability), - _ip_availability.total_ips, - _ip_availability.used_ips, - ) - - def setUp(self): - super(TestShowIPAvailability, self).setUp() - - self.network.find_network_ip_availability = mock.Mock( - return_value=self._ip_availability) - - # Get the command object to test - self.cmd = ip_availability.ShowIPAvailability( - self.app, self.namespace) - - def test_show_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._ip_availability.network_name, - ] - verifylist = [ - ('network', self._ip_availability.network_name) - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - self.network.find_network_ip_availability.assert_called_once_with( - self._ip_availability.network_name, - ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_network.py b/openstackclient/tests/network/v2/test_network.py deleted file mode 100644 index bb606819..00000000 --- a/openstackclient/tests/network/v2/test_network.py +++ /dev/null @@ -1,1059 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import network -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests import fakes -from openstackclient.tests.identity.v2_0 import fakes as identity_fakes_v2 -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -# Tests for Neutron network -# -class TestNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateNetworkIdentityV3(TestNetwork): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new network created. - _network = network_fakes.FakeNetwork.create_one_network( - attrs={ - 'tenant_id': project.id, - 'availability_zone_hints': ["nova"], - } - ) - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestCreateNetworkIdentityV3, self).setUp() - - self.network.create_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self._network.name, - ] - verifylist = [ - ('name', self._network.name), - ('enable', True), - ('share', None), - ('project', None), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - "--disable", - "--share", - "--project", self.project.name, - "--project-domain", self.domain.name, - "--availability-zone-hint", "nova", - "--external", "--default", - "--provider-network-type", "vlan", - "--provider-physical-network", "physnet1", - "--provider-segment", "400", - "--transparent-vlan", - "--enable-port-security", - self._network.name, - ] - verifylist = [ - ('disable', True), - ('share', True), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('availability_zone_hints', ["nova"]), - ('external', True), - ('default', True), - ('provider_network_type', 'vlan'), - ('physical_network', 'physnet1'), - ('segmentation_id', '400'), - ('transparent_vlan', True), - ('enable_port_security', True), - ('name', self._network.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': False, - 'availability_zone_hints': ["nova"], - 'name': self._network.name, - 'shared': True, - 'tenant_id': self.project.id, - 'is_default': True, - 'router:external': True, - 'provider:network_type': 'vlan', - 'provider:physical_network': 'physnet1', - 'provider:segmentation_id': '400', - 'vlan_transparent': True, - 'port_security_enabled': True, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_other_options(self): - arglist = [ - "--enable", - "--no-share", - "--disable-port-security", - self._network.name, - ] - verifylist = [ - ('enable', True), - ('no_share', True), - ('name', self._network.name), - ('external', False), - ('disable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - 'shared': False, - 'port_security_enabled': False, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestCreateNetworkIdentityV2(TestNetwork): - - project = identity_fakes_v2.FakeProject.create_one_project() - # The new network created. - _network = network_fakes.FakeNetwork.create_one_network( - attrs={'tenant_id': project.id} - ) - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestCreateNetworkIdentityV2, self).setUp() - - self.network.create_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, self.namespace) - - # Set identity client v2. And get a shortcut to Identity client. - identity_client = identity_fakes_v2.FakeIdentityv2Client( - endpoint=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ) - self.app.client_manager.identity = identity_client - self.identity = self.app.client_manager.identity - - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.identity.tenants - self.projects_mock.get.return_value = self.project - - # There is no DomainManager Mock in fake identity v2. - - def test_create_with_project_identityv2(self): - arglist = [ - "--project", self.project.name, - self._network.name, - ] - verifylist = [ - ('enable', True), - ('share', None), - ('name', self._network.name), - ('project', self.project.name), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_network.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self._network.name, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_with_domain_identityv2(self): - arglist = [ - "--project", self.project.name, - "--project-domain", "domain-name", - self._network.name, - ] - verifylist = [ - ('enable', True), - ('share', None), - ('project', self.project.name), - ('project_domain', "domain-name"), - ('name', self._network.name), - ('external', False), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.assertRaises( - AttributeError, - self.cmd.take_action, - parsed_args, - ) - - -class TestDeleteNetwork(TestNetwork): - - def setUp(self): - super(TestDeleteNetwork, self).setUp() - - # The networks to delete - self._networks = network_fakes.FakeNetwork.create_networks(count=3) - - self.network.delete_network = mock.Mock(return_value=None) - - self.network.find_network = network_fakes.FakeNetwork.get_networks( - networks=self._networks) - - # Get the command object to test - self.cmd = network.DeleteNetwork(self.app, self.namespace) - - def test_delete_one_network(self): - arglist = [ - self._networks[0].name, - ] - verifylist = [ - ('network', [self._networks[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_network.assert_called_once_with(self._networks[0]) - self.assertIsNone(result) - - def test_delete_multiple_networks(self): - arglist = [] - for n in self._networks: - arglist.append(n.id) - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self._networks: - calls.append(call(n)) - self.network.delete_network.assert_has_calls(calls) - self.assertIsNone(result) - - def test_delete_multiple_networks_exception(self): - arglist = [ - self._networks[0].id, - 'xxxx-yyyy-zzzz', - self._networks[1].id, - ] - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # Fake exception in find_network() - ret_find = [ - self._networks[0], - exceptions.NotFound('404'), - self._networks[1], - ] - self.network.find_network = mock.Mock(side_effect=ret_find) - - # Fake exception in delete_network() - ret_delete = [ - None, - exceptions.NotFound('404'), - ] - self.network.delete_network = mock.Mock(side_effect=ret_delete) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - # The second call of find_network() should fail. So delete_network() - # was only called twice. - calls = [ - call(self._networks[0]), - call(self._networks[1]), - ] - self.network.delete_network.assert_has_calls(calls) - - -class TestListNetwork(TestNetwork): - - # The networks going to be listed up. - _network = network_fakes.FakeNetwork.create_networks(count=3) - - columns = ( - 'ID', - 'Name', - 'Subnets', - ) - columns_long = ( - 'ID', - 'Name', - 'Status', - 'Project', - 'State', - 'Shared', - 'Subnets', - 'Network Type', - 'Router Type', - 'Availability Zones', - ) - - data = [] - for net in _network: - data.append(( - net.id, - net.name, - utils.format_list(net.subnets), - )) - - data_long = [] - for net in _network: - data_long.append(( - net.id, - net.name, - net.status, - net.project_id, - network._format_admin_state(net.admin_state_up), - net.shared, - utils.format_list(net.subnets), - net.provider_network_type, - network._format_router_external(net.is_router_external), - utils.format_list(net.availability_zones), - )) - - def setUp(self): - super(TestListNetwork, self).setUp() - - # Get the command object to test - self.cmd = network.ListNetwork(self.app, self.namespace) - - self.network.networks = mock.Mock(return_value=self._network) - - def test_network_list_no_options(self): - arglist = [] - verifylist = [ - ('external', False), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_external(self): - arglist = [ - '--external', - ] - verifylist = [ - ('external', True), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with( - **{'router:external': True} - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_network_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ('external', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.networks.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestSetNetwork(TestNetwork): - - # The network to set. - _network = network_fakes.FakeNetwork.create_one_network() - - def setUp(self): - super(TestSetNetwork, self).setUp() - - self.network.update_network = mock.Mock(return_value=None) - - self.network.find_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.SetNetwork(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - self._network.name, - '--enable', - '--name', 'noob', - '--share', - '--external', - '--default', - '--provider-network-type', 'vlan', - '--provider-physical-network', 'physnet1', - '--provider-segment', '400', - '--no-transparent-vlan', - '--enable-port-security', - ] - verifylist = [ - ('network', self._network.name), - ('enable', True), - ('name', 'noob'), - ('share', True), - ('external', True), - ('default', True), - ('provider_network_type', 'vlan'), - ('physical_network', 'physnet1'), - ('segmentation_id', '400'), - ('no_transparent_vlan', True), - ('enable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'name': 'noob', - 'admin_state_up': True, - 'shared': True, - 'router:external': True, - 'is_default': True, - 'provider:network_type': 'vlan', - 'provider:physical_network': 'physnet1', - 'provider:segmentation_id': '400', - 'vlan_transparent': False, - 'port_security_enabled': True, - } - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - self._network.name, - '--disable', - '--no-share', - '--internal', - '--disable-port-security', - ] - verifylist = [ - ('network', self._network.name), - ('disable', True), - ('no_share', True), - ('internal', True), - ('disable_port_security', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'shared': False, - 'router:external': False, - 'port_security_enabled': False, - } - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._network.name, ] - verifylist = [('network', self._network.name), ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_network.assert_called_once_with( - self._network, **attrs) - self.assertIsNone(result) - - -class TestShowNetwork(TestNetwork): - - # The network to show. - _network = network_fakes.FakeNetwork.create_one_network() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'id', - 'is_default', - 'name', - 'port_security_enabled', - 'project_id', - 'provider_network_type', - 'router:external', - 'shared', - 'status', - 'subnets', - ) - - data = ( - network._format_admin_state(_network.admin_state_up), - utils.format_list(_network.availability_zone_hints), - utils.format_list(_network.availability_zones), - _network.id, - _network.is_default, - _network.name, - _network.is_port_security_enabled, - _network.project_id, - _network.provider_network_type, - network._format_router_external(_network.is_router_external), - _network.shared, - _network.status, - utils.format_list(_network.subnets), - ) - - def setUp(self): - super(TestShowNetwork, self).setUp() - - self.network.find_network = mock.Mock(return_value=self._network) - - # Get the command object to test - self.cmd = network.ShowNetwork(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network.name, - ] - verifylist = [ - ('network', self._network.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_network.assert_called_once_with( - self._network.name, ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -# Tests for Nova network -# -class TestNetworkCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestNetworkCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateNetworkCompute(TestNetworkCompute): - - # The network to create. - _network = compute_fakes.FakeNetwork.create_one_network() - - columns = ( - 'bridge', - 'bridge_interface', - 'broadcast', - 'cidr', - 'cidr_v6', - 'created_at', - 'deleted', - 'deleted_at', - 'dhcp_server', - 'dhcp_start', - 'dns1', - 'dns2', - 'enable_dhcp', - 'gateway', - 'gateway_v6', - 'host', - 'id', - 'injected', - 'label', - 'mtu', - 'multi_host', - 'netmask', - 'netmask_v6', - 'priority', - 'project_id', - 'rxtx_base', - 'share_address', - 'updated_at', - 'vlan', - 'vpn_private_address', - 'vpn_public_address', - 'vpn_public_port', - ) - - data = ( - _network.bridge, - _network.bridge_interface, - _network.broadcast, - _network.cidr, - _network.cidr_v6, - _network.created_at, - _network.deleted, - _network.deleted_at, - _network.dhcp_server, - _network.dhcp_start, - _network.dns1, - _network.dns2, - _network.enable_dhcp, - _network.gateway, - _network.gateway_v6, - _network.host, - _network.id, - _network.injected, - _network.label, - _network.mtu, - _network.multi_host, - _network.netmask, - _network.netmask_v6, - _network.priority, - _network.project_id, - _network.rxtx_base, - _network.share_address, - _network.updated_at, - _network.vlan, - _network.vpn_private_address, - _network.vpn_public_address, - _network.vpn_public_port, - ) - - def setUp(self): - super(TestCreateNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.networks.create.return_value = self._network - - # Get the command object to test - self.cmd = network.CreateNetwork(self.app, None) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should raise exception here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - "--subnet", self._network.cidr, - self._network.label, - ] - verifylist = [ - ('subnet', self._network.cidr), - ('name', self._network.label), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.networks.create.assert_called_once_with(**{ - 'cidr': self._network.cidr, - 'label': self._network.label, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteNetworkCompute(TestNetworkCompute): - - def setUp(self): - super(TestDeleteNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # The networks to delete - self._networks = compute_fakes.FakeNetwork.create_networks(count=3) - - self.compute.networks.delete.return_value = None - - # Return value of utils.find_resource() - self.compute.networks.get = \ - compute_fakes.FakeNetwork.get_networks(networks=self._networks) - - # Get the command object to test - self.cmd = network.DeleteNetwork(self.app, None) - - def test_delete_one_network(self): - arglist = [ - self._networks[0].label, - ] - verifylist = [ - ('network', [self._networks[0].label]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.networks.delete.assert_called_once_with( - self._networks[0].id) - self.assertIsNone(result) - - def test_delete_multiple_networks(self): - arglist = [] - for n in self._networks: - arglist.append(n.label) - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self._networks: - calls.append(call(n.id)) - self.compute.networks.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_delete_multiple_networks_exception(self): - arglist = [ - self._networks[0].id, - 'xxxx-yyyy-zzzz', - self._networks[1].id, - ] - verifylist = [ - ('network', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # Fake exception in utils.find_resource() - # In compute v2, we use utils.find_resource() to find a network. - # It calls get() several times, but find() only one time. So we - # choose to fake get() always raise exception, then pass through. - # And fake find() to find the real network or not. - self.compute.networks.get.side_effect = Exception() - ret_find = [ - self._networks[0], - Exception(), - self._networks[1], - ] - self.compute.networks.find.side_effect = ret_find - - # Fake exception in delete() - ret_delete = [ - None, - Exception(), - ] - self.compute.networks.delete = mock.Mock(side_effect=ret_delete) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - # The second call of utils.find_resource() should fail. So delete() - # was only called twice. - calls = [ - call(self._networks[0].id), - call(self._networks[1].id), - ] - self.compute.networks.delete.assert_has_calls(calls) - - -class TestListNetworkCompute(TestNetworkCompute): - - # The networks going to be listed up. - _networks = compute_fakes.FakeNetwork.create_networks(count=3) - - columns = ( - 'ID', - 'Name', - 'Subnet', - ) - - data = [] - for net in _networks: - data.append(( - net.id, - net.label, - net.cidr, - )) - - def setUp(self): - super(TestListNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.networks.list.return_value = self._networks - - # Get the command object to test - self.cmd = network.ListNetwork(self.app, None) - - def test_network_list_no_options(self): - arglist = [] - verifylist = [ - ('external', False), - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.compute.networks.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowNetworkCompute(TestNetworkCompute): - - # The network to show. - _network = compute_fakes.FakeNetwork.create_one_network() - - columns = ( - 'bridge', - 'bridge_interface', - 'broadcast', - 'cidr', - 'cidr_v6', - 'created_at', - 'deleted', - 'deleted_at', - 'dhcp_server', - 'dhcp_start', - 'dns1', - 'dns2', - 'enable_dhcp', - 'gateway', - 'gateway_v6', - 'host', - 'id', - 'injected', - 'label', - 'mtu', - 'multi_host', - 'netmask', - 'netmask_v6', - 'priority', - 'project_id', - 'rxtx_base', - 'share_address', - 'updated_at', - 'vlan', - 'vpn_private_address', - 'vpn_public_address', - 'vpn_public_port', - ) - - data = ( - _network.bridge, - _network.bridge_interface, - _network.broadcast, - _network.cidr, - _network.cidr_v6, - _network.created_at, - _network.deleted, - _network.deleted_at, - _network.dhcp_server, - _network.dhcp_start, - _network.dns1, - _network.dns2, - _network.enable_dhcp, - _network.gateway, - _network.gateway_v6, - _network.host, - _network.id, - _network.injected, - _network.label, - _network.mtu, - _network.multi_host, - _network.netmask, - _network.netmask_v6, - _network.priority, - _network.project_id, - _network.rxtx_base, - _network.share_address, - _network.updated_at, - _network.vlan, - _network.vpn_private_address, - _network.vpn_public_address, - _network.vpn_public_port, - ) - - def setUp(self): - super(TestShowNetworkCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Return value of utils.find_resource() - self.compute.networks.get.return_value = self._network - - # Get the command object to test - self.cmd = network.ShowNetwork(self.app, None) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network.label, - ] - verifylist = [ - ('network', self._network.label), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_network_agent.py b/openstackclient/tests/network/v2/test_network_agent.py deleted file mode 100644 index 269d4e1d..00000000 --- a/openstackclient/tests/network/v2/test_network_agent.py +++ /dev/null @@ -1,294 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import network_agent -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkAgent(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkAgent, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestDeleteNetworkAgent(TestNetworkAgent): - - network_agents = ( - network_fakes.FakeNetworkAgent.create_network_agents(count=2)) - - def setUp(self): - super(TestDeleteNetworkAgent, self).setUp() - self.network.delete_agent = mock.Mock(return_value=None) - self.network.get_agent = ( - network_fakes.FakeNetworkAgent.get_network_agents( - agents=self.network_agents) - ) - - # Get the command object to test - self.cmd = network_agent.DeleteNetworkAgent(self.app, self.namespace) - - def test_network_agent_delete(self): - arglist = [ - self.network_agents[0].id, - ] - verifylist = [ - ('network_agent', [self.network_agents[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.get_agent.assert_called_once_with( - self.network_agents[0].id, ignore_missing=False) - self.network.delete_agent.assert_called_once_with( - self.network_agents[0]) - self.assertIsNone(result) - - def test_multi_network_agents_delete(self): - arglist = [] - verifylist = [] - - for n in self.network_agents: - arglist.append(n.id) - verifylist = [ - ('network_agent', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for n in self.network_agents: - calls.append(call(n)) - self.network.delete_agent.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_network_agents_delete_with_exception(self): - arglist = [ - self.network_agents[0].id, - 'unexist_network_agent', - ] - verifylist = [ - ('network_agent', - [self.network_agents[0].id, 'unexist_network_agent']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.network_agents[0], exceptions.CommandError] - self.network.get_agent = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 network agents failed to delete.', str(e)) - - self.network.get_agent.assert_any_call( - self.network_agents[0].id, ignore_missing=False) - self.network.get_agent.assert_any_call( - 'unexist_network_agent', ignore_missing=False) - self.network.delete_agent.assert_called_once_with( - self.network_agents[0] - ) - - -class TestListNetworkAgent(TestNetworkAgent): - - network_agents = ( - network_fakes.FakeNetworkAgent.create_network_agents(count=3)) - - columns = ( - 'ID', - 'Agent Type', - 'Host', - 'Availability Zone', - 'Alive', - 'State', - 'Binary' - ) - data = [] - for agent in network_agents: - data.append(( - agent.id, - agent.agent_type, - agent.host, - agent.availability_zone, - agent.alive, - network_agent._format_admin_state(agent.admin_state_up), - agent.binary, - )) - - def setUp(self): - super(TestListNetworkAgent, self).setUp() - self.network.agents = mock.Mock( - return_value=self.network_agents) - - # Get the command object to test - self.cmd = network_agent.ListNetworkAgent(self.app, self.namespace) - - def test_network_agents_list(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.agents.assert_called_once_with(**{}) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetNetworkAgent(TestNetworkAgent): - - _network_agent = ( - network_fakes.FakeNetworkAgent.create_one_network_agent()) - - def setUp(self): - super(TestSetNetworkAgent, self).setUp() - self.network.update_agent = mock.Mock(return_value=None) - self.network.get_agent = mock.Mock(return_value=self._network_agent) - - # Get the command object to test - self.cmd = network_agent.SetNetworkAgent(self.app, self.namespace) - - def test_set_nothing(self): - arglist = [ - self._network_agent.id, - ] - verifylist = [ - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - def test_set_all(self): - arglist = [ - '--description', 'new_description', - '--enable', - self._network_agent.id, - ] - verifylist = [ - ('description', 'new_description'), - ('enable', True), - ('disable', False), - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'description': 'new_description', - 'admin_state_up': True, - } - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - def test_set_with_disable(self): - arglist = [ - '--disable', - self._network_agent.id, - ] - verifylist = [ - ('enable', False), - ('disable', True), - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - } - self.network.update_agent.assert_called_once_with( - self._network_agent, **attrs) - self.assertIsNone(result) - - -class TestShowNetworkAgent(TestNetworkAgent): - - _network_agent = ( - network_fakes.FakeNetworkAgent.create_one_network_agent()) - - columns = ( - 'admin_state_up', - 'agent_type', - 'alive', - 'availability_zone', - 'binary', - 'configurations', - 'host', - 'id', - ) - data = ( - network_agent._format_admin_state(_network_agent.admin_state_up), - _network_agent.agent_type, - _network_agent.alive, - _network_agent.availability_zone, - _network_agent.binary, - utils.format_dict(_network_agent.configurations), - _network_agent.host, - _network_agent.id, - ) - - def setUp(self): - super(TestShowNetworkAgent, self).setUp() - self.network.get_agent = mock.Mock( - return_value=self._network_agent) - - # Get the command object to test - self.cmd = network_agent.ShowNetworkAgent(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._network_agent.id, - ] - verifylist = [ - ('network_agent', self._network_agent.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.get_agent.assert_called_once_with( - self._network_agent.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/network/v2/test_network_rbac.py b/openstackclient/tests/network/v2/test_network_rbac.py deleted file mode 100644 index 9250e91b..00000000 --- a/openstackclient/tests/network/v2/test_network_rbac.py +++ /dev/null @@ -1,430 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import network_rbac -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkRBAC(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkRBAC, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - - -class TestCreateNetworkRBAC(TestNetworkRBAC): - - network_object = network_fakes.FakeNetwork.create_one_network() - project = identity_fakes_v3.FakeProject.create_one_project() - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( - attrs={'tenant_id': project.id, - 'target_tenant': project.id, - 'object_id': network_object.id} - ) - - columns = ( - 'action', - 'id', - 'object_id', - 'object_type', - 'project_id', - 'target_project_id', - ) - - data = [ - rbac_policy.action, - rbac_policy.id, - rbac_policy.object_id, - rbac_policy.object_type, - rbac_policy.tenant_id, - rbac_policy.target_tenant, - ] - - def setUp(self): - super(TestCreateNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace) - - self.network.create_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - self.network.find_network = mock.Mock( - return_value=self.network_object) - self.projects_mock.get.return_value = self.project - - def test_network_rbac_create_no_type(self): - arglist = [ - '--action', self.rbac_policy.action, - self.rbac_policy.object_id, - ] - verifylist = [ - ('action', self.rbac_policy.action), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_no_action(self): - arglist = [ - '--type', self.rbac_policy.object_type, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_invalid_type(self): - arglist = [ - '--action', self.rbac_policy.action, - '--type', 'invalid_type', - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('action', self.rbac_policy.action), - ('type', 'invalid_type'), - ('target-project', self.rbac_policy.target_tenant), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create_invalid_action(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', 'invalid_action', - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', 'invalid_action'), - ('target-project', self.rbac_policy.target_tenant), - ('rbac_policy', self.rbac_policy.id), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_create(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', self.rbac_policy.action, - '--target-project', self.rbac_policy.target_tenant, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', self.rbac_policy.action), - ('target_project', self.rbac_policy.target_tenant), - ('rbac_object', self.rbac_policy.object_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_rbac_policy.assert_called_with(**{ - 'object_id': self.rbac_policy.object_id, - 'object_type': self.rbac_policy.object_type, - 'action': self.rbac_policy.action, - 'target_tenant': self.rbac_policy.target_tenant, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_network_rbac_create_all_options(self): - arglist = [ - '--type', self.rbac_policy.object_type, - '--action', self.rbac_policy.action, - '--target-project', self.rbac_policy.target_tenant, - '--project', self.rbac_policy.tenant_id, - '--project-domain', self.project.domain_id, - '--target-project-domain', self.project.domain_id, - self.rbac_policy.object_id, - ] - verifylist = [ - ('type', self.rbac_policy.object_type), - ('action', self.rbac_policy.action), - ('target_project', self.rbac_policy.target_tenant), - ('project', self.rbac_policy.tenant_id), - ('project_domain', self.project.domain_id), - ('target_project_domain', self.project.domain_id), - ('rbac_object', self.rbac_policy.object_id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_rbac_policy.assert_called_with(**{ - 'object_id': self.rbac_policy.object_id, - 'object_type': self.rbac_policy.object_type, - 'action': self.rbac_policy.action, - 'target_tenant': self.rbac_policy.target_tenant, - 'tenant_id': self.rbac_policy.tenant_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestDeleteNetworkRBAC(TestNetworkRBAC): - - rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2) - - def setUp(self): - super(TestDeleteNetworkRBAC, self).setUp() - self.network.delete_rbac_policy = mock.Mock(return_value=None) - self.network.find_rbac_policy = ( - network_fakes.FakeNetworkRBAC.get_network_rbacs( - rbac_policies=self.rbac_policies) - ) - - # Get the command object to test - self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace) - - def test_network_rbac_delete(self): - arglist = [ - self.rbac_policies[0].id, - ] - verifylist = [ - ('rbac_policy', [self.rbac_policies[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policies[0].id, ignore_missing=False) - self.network.delete_rbac_policy.assert_called_once_with( - self.rbac_policies[0]) - self.assertIsNone(result) - - def test_multi_network_rbacs_delete(self): - arglist = [] - verifylist = [] - - for r in self.rbac_policies: - arglist.append(r.id) - verifylist = [ - ('rbac_policy', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for r in self.rbac_policies: - calls.append(call(r)) - self.network.delete_rbac_policy.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_network_policies_delete_with_exception(self): - arglist = [ - self.rbac_policies[0].id, - 'unexist_rbac_policy', - ] - verifylist = [ - ('rbac_policy', - [self.rbac_policies[0].id, 'unexist_rbac_policy']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.rbac_policies[0], exceptions.CommandError] - self.network.find_rbac_policy = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e)) - - self.network.find_rbac_policy.assert_any_call( - self.rbac_policies[0].id, ignore_missing=False) - self.network.find_rbac_policy.assert_any_call( - 'unexist_rbac_policy', ignore_missing=False) - self.network.delete_rbac_policy.assert_called_once_with( - self.rbac_policies[0] - ) - - -class TestListNetworkRABC(TestNetworkRBAC): - - # The network rbac policies going to be listed up. - rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=3) - - columns = ( - 'ID', - 'Object Type', - 'Object ID', - ) - - data = [] - for r in rbac_policies: - data.append(( - r.id, - r.object_type, - r.object_id, - )) - - def setUp(self): - super(TestListNetworkRABC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.ListNetworkRBAC(self.app, self.namespace) - - self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies) - - def test_network_rbac_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.rbac_policies.assert_called_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetNetworkRBAC(TestNetworkRBAC): - - project = identity_fakes_v3.FakeProject.create_one_project() - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( - attrs={'target_tenant': project.id}) - - def setUp(self): - super(TestSetNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.SetNetworkRBAC(self.app, self.namespace) - - self.network.find_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - self.network.update_rbac_policy = mock.Mock(return_value=None) - self.projects_mock.get.return_value = self.project - - def test_network_rbac_set_nothing(self): - arglist = [ - self.rbac_policy.id, - ] - verifylist = [ - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policy.id, ignore_missing=False - ) - attrs = {} - self.network.update_rbac_policy.assert_called_once_with( - self.rbac_policy, **attrs) - self.assertIsNone(result) - - def test_network_rbac_set(self): - arglist = [ - '--target-project', self.project.id, - self.rbac_policy.id, - ] - verifylist = [ - ('target_project', self.project.id), - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_rbac_policy.assert_called_once_with( - self.rbac_policy.id, ignore_missing=False - ) - attrs = {'target_tenant': self.project.id} - self.network.update_rbac_policy.assert_called_once_with( - self.rbac_policy, **attrs) - self.assertIsNone(result) - - -class TestShowNetworkRBAC(TestNetworkRBAC): - - rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac() - - columns = ( - 'action', - 'id', - 'object_id', - 'object_type', - 'project_id', - 'target_project_id', - ) - - data = [ - rbac_policy.action, - rbac_policy.id, - rbac_policy.object_id, - rbac_policy.object_type, - rbac_policy.tenant_id, - rbac_policy.target_tenant, - ] - - def setUp(self): - super(TestShowNetworkRBAC, self).setUp() - - # Get the command object to test - self.cmd = network_rbac.ShowNetworkRBAC(self.app, self.namespace) - - self.network.find_rbac_policy = mock.Mock( - return_value=self.rbac_policy) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_network_rbac_show_all_options(self): - arglist = [ - self.rbac_policy.id, - ] - verifylist = [ - ('rbac_policy', self.rbac_policy.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # DisplayCommandBase.take_action() returns two tuples - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_rbac_policy.assert_called_with( - self.rbac_policy.id, ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/network/v2/test_network_segment.py b/openstackclient/tests/network/v2/test_network_segment.py deleted file mode 100644 index a635d845..00000000 --- a/openstackclient/tests/network/v2/test_network_segment.py +++ /dev/null @@ -1,200 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from osc_lib import exceptions - -from openstackclient.network.v2 import network_segment -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestNetworkSegment(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestNetworkSegment, self).setUp() - - # Enable beta commands. - self.app.options.os_beta_command = True - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestListNetworkSegment(TestNetworkSegment): - _network = network_fakes.FakeNetwork.create_one_network() - _network_segments = \ - network_fakes.FakeNetworkSegment.create_network_segments(count=3) - - columns = ( - 'ID', - 'Network', - 'Network Type', - 'Segment', - ) - columns_long = columns + ( - 'Physical Network', - ) - - data = [] - for _network_segment in _network_segments: - data.append(( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.segmentation_id, - )) - - data_long = [] - for _network_segment in _network_segments: - data_long.append(( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.segmentation_id, - _network_segment.physical_network, - )) - - def setUp(self): - super(TestListNetworkSegment, self).setUp() - - # Get the command object to test - self.cmd = network_segment.ListNetworkSegment(self.app, self.namespace) - - self.network.find_network = mock.Mock(return_value=self._network) - self.network.segments = mock.Mock(return_value=self._network_segments) - - def test_list_no_option(self): - arglist = [] - verifylist = [ - ('long', False), - ('network', None), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_list_no_beta_commands(self): - self.app.options.os_beta_command = False - parsed_args = self.check_parser(self.cmd, [], []) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ('network', None), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - def test_list_network(self): - arglist = [ - '--network', - self._network.id, - ] - verifylist = [ - ('long', False), - ('network', self._network.id) - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.segments.assert_called_once_with( - **{'network_id': self._network.id} - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowNetworkSegment(TestNetworkSegment): - - # The network segment to show. - _network_segment = \ - network_fakes.FakeNetworkSegment.create_one_network_segment() - - columns = ( - 'id', - 'network_id', - 'network_type', - 'physical_network', - 'segmentation_id', - ) - - data = ( - _network_segment.id, - _network_segment.network_id, - _network_segment.network_type, - _network_segment.physical_network, - _network_segment.segmentation_id, - ) - - def setUp(self): - super(TestShowNetworkSegment, self).setUp() - - self.network.find_segment = mock.Mock( - return_value=self._network_segment - ) - - # Get the command object to test - self.cmd = network_segment.ShowNetworkSegment(self.app, self.namespace) - - def test_show_no_options(self): - # Missing required args should bail here - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, [], []) - - def test_show_no_beta_commands(self): - arglist = [ - self._network_segment.id, - ] - verifylist = [ - ('network_segment', self._network_segment.id), - ] - self.app.options.os_beta_command = False - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_show_all_options(self): - arglist = [ - self._network_segment.id, - ] - verifylist = [ - ('network_segment', self._network_segment.id), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_segment.assert_called_once_with( - self._network_segment.id, - ignore_missing=False - ) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_port.py b/openstackclient/tests/network/v2/test_port.py deleted file mode 100644 index a1cecec8..00000000 --- a/openstackclient/tests/network/v2/test_port.py +++ /dev/null @@ -1,696 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock - -from mock import call -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import port -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestPort(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestPort, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - def _get_common_cols_data(self, fake_port): - columns = ( - 'admin_state_up', - 'allowed_address_pairs', - 'binding_host_id', - 'binding_profile', - 'binding_vif_details', - 'binding_vif_type', - 'binding_vnic_type', - 'device_id', - 'device_owner', - 'dns_assignment', - 'dns_name', - 'extra_dhcp_opts', - 'fixed_ips', - 'id', - 'mac_address', - 'name', - 'network_id', - 'port_security_enabled', - 'project_id', - 'security_groups', - 'status', - ) - - data = ( - port._format_admin_state(fake_port.admin_state_up), - utils.format_list_of_dicts(fake_port.allowed_address_pairs), - fake_port.binding_host_id, - utils.format_dict(fake_port.binding_profile), - utils.format_dict(fake_port.binding_vif_details), - fake_port.binding_vif_type, - fake_port.binding_vnic_type, - fake_port.device_id, - fake_port.device_owner, - utils.format_list_of_dicts(fake_port.dns_assignment), - fake_port.dns_name, - utils.format_list_of_dicts(fake_port.extra_dhcp_opts), - utils.format_list_of_dicts(fake_port.fixed_ips), - fake_port.id, - fake_port.mac_address, - fake_port.name, - fake_port.network_id, - fake_port.port_security_enabled, - fake_port.project_id, - utils.format_list(fake_port.security_groups), - fake_port.status, - ) - - return columns, data - - -class TestCreatePort(TestPort): - - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestCreatePort, self).setUp() - - self.network.create_port = mock.Mock(return_value=self._port) - fake_net = network_fakes.FakeNetwork.create_one_network({ - 'id': self._port.network_id, - }) - self.network.find_network = mock.Mock(return_value=fake_net) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - # Get the command object to test - self.cmd = port.CreatePort(self.app, self.namespace) - - def test_create_default_options(self): - arglist = [ - '--network', self._port.network_id, - 'test-port', - ] - verifylist = [ - ('network', self._port.network_id,), - ('enable', True), - ('name', 'test-port'), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'admin_state_up': True, - 'network_id': self._port.network_id, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - def test_create_full_options(self): - arglist = [ - '--mac-address', 'aa:aa:aa:aa:aa:aa', - '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2' - % self.fake_subnet.id, - '--device', 'deviceid', - '--device-owner', 'fakeowner', - '--disable', - '--vnic-type', 'macvtap', - '--binding-profile', 'foo=bar', - '--binding-profile', 'foo2=bar2', - '--network', self._port.network_id, - 'test-port', - - ] - verifylist = [ - ('mac_address', 'aa:aa:aa:aa:aa:aa'), - ( - 'fixed_ip', - [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}] - ), - ('device', 'deviceid'), - ('device_owner', 'fakeowner'), - ('disable', True), - ('vnic_type', 'macvtap'), - ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), - ('network', self._port.network_id), - ('name', 'test-port'), - - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'mac_address': 'aa:aa:aa:aa:aa:aa', - 'fixed_ips': [{'subnet_id': self.fake_subnet.id, - 'ip_address': '10.0.0.2'}], - 'device_id': 'deviceid', - 'device_owner': 'fakeowner', - 'admin_state_up': False, - 'binding:vnic_type': 'macvtap', - 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, - 'network_id': self._port.network_id, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - def test_create_invalid_json_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', '{"parent_name":"fake_parent"', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_create_invalid_key_value_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', 'key', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_create_json_binding_profile(self): - arglist = [ - '--network', self._port.network_id, - '--binding-profile', '{"parent_name":"fake_parent"}', - '--binding-profile', '{"tag":42}', - 'test-port', - ] - verifylist = [ - ('network', self._port.network_id,), - ('enable', True), - ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}), - ('name', 'test-port'), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_port.assert_called_once_with(**{ - 'admin_state_up': True, - 'network_id': self._port.network_id, - 'binding:profile': {'parent_name': 'fake_parent', 'tag': 42}, - 'name': 'test-port', - }) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - -class TestDeletePort(TestPort): - - # Ports to delete. - _ports = network_fakes.FakePort.create_ports(count=2) - - def setUp(self): - super(TestDeletePort, self).setUp() - - self.network.delete_port = mock.Mock(return_value=None) - self.network.find_port = network_fakes.FakePort.get_ports( - ports=self._ports) - # Get the command object to test - self.cmd = port.DeletePort(self.app, self.namespace) - - def test_port_delete(self): - arglist = [ - self._ports[0].name, - ] - verifylist = [ - ('port', [self._ports[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.find_port.assert_called_once_with( - self._ports[0].name, ignore_missing=False) - self.network.delete_port.assert_called_once_with(self._ports[0]) - self.assertIsNone(result) - - def test_multi_ports_delete(self): - arglist = [] - verifylist = [] - - for p in self._ports: - arglist.append(p.name) - verifylist = [ - ('port', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for p in self._ports: - calls.append(call(p)) - self.network.delete_port.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_ports_delete_with_exception(self): - arglist = [ - self._ports[0].name, - 'unexist_port', - ] - verifylist = [ - ('port', - [self._ports[0].name, 'unexist_port']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._ports[0], exceptions.CommandError] - self.network.find_port = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 ports failed to delete.', str(e)) - - self.network.find_port.assert_any_call( - self._ports[0].name, ignore_missing=False) - self.network.find_port.assert_any_call( - 'unexist_port', ignore_missing=False) - self.network.delete_port.assert_called_once_with( - self._ports[0] - ) - - -class TestListPort(TestPort): - - _ports = network_fakes.FakePort.create_ports(count=3) - - columns = ( - 'ID', - 'Name', - 'MAC Address', - 'Fixed IP Addresses', - ) - - data = [] - for prt in _ports: - data.append(( - prt.id, - prt.name, - prt.mac_address, - utils.format_list_of_dicts(prt.fixed_ips), - )) - - def setUp(self): - super(TestListPort, self).setUp() - - # Get the command object to test - self.cmd = port.ListPort(self.app, self.namespace) - self.network.ports = mock.Mock(return_value=self._ports) - fake_router = network_fakes.FakeRouter.create_one_router({ - 'id': 'fake-router-id', - }) - self.network.find_router = mock.Mock(return_value=fake_router) - - def test_port_list_no_options(self): - arglist = [] - verifylist = [] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_router_opt(self): - arglist = [ - '--router', 'fake-router-name', - ] - - verifylist = [ - ('router', 'fake-router-name') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_id': 'fake-router-id' - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_device_owner_opt(self): - arglist = [ - '--device-owner', self._ports[0].device_owner, - ] - - verifylist = [ - ('device_owner', self._ports[0].device_owner) - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_owner': self._ports[0].device_owner - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_port_list_all_opt(self): - arglist = [ - '--device-owner', self._ports[0].device_owner, - '--router', 'fake-router-name', - ] - - verifylist = [ - ('device_owner', self._ports[0].device_owner), - ('router', 'fake-router-name') - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.ports.assert_called_once_with(**{ - 'device_owner': self._ports[0].device_owner, - 'device_id': 'fake-router-id' - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetPort(TestPort): - - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestSetPort, self).setUp() - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - self.network.find_port = mock.Mock(return_value=self._port) - self.network.update_port = mock.Mock(return_value=None) - - # Get the command object to test - self.cmd = port.SetPort(self.app, self.namespace) - - def test_set_fixed_ip(self): - arglist = [ - '--fixed-ip', 'ip-address=10.0.0.11', - self._port.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '10.0.0.11'}]), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'fixed_ips': [{'ip_address': '10.0.0.11'}], - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_append_fixed_ip(self): - _testport = network_fakes.FakePort.create_one_port( - {'fixed_ips': [{'ip_address': '0.0.0.1'}]}) - self.network.find_port = mock.Mock(return_value=_testport) - arglist = [ - '--fixed-ip', 'ip-address=10.0.0.12', - _testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '10.0.0.12'}]), - ('port', _testport.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'fixed_ips': [ - {'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}], - } - self.network.update_port.assert_called_once_with(_testport, **attrs) - self.assertIsNone(result) - - def test_set_this(self): - arglist = [ - '--disable', - '--no-fixed-ip', - '--no-binding-profile', - self._port.name, - ] - verifylist = [ - ('disable', True), - ('no_binding_profile', True), - ('no_fixed_ip', True), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'binding:profile': {}, - 'fixed_ips': [], - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - '--enable', - '--vnic-type', 'macvtap', - '--binding-profile', 'foo=bar', - '--host', 'binding-host-id-xxxx', - '--name', 'newName', - self._port.name, - ] - verifylist = [ - ('enable', True), - ('vnic_type', 'macvtap'), - ('binding_profile', {'foo': 'bar'}), - ('host', 'binding-host-id-xxxx'), - ('name', 'newName'), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': True, - 'binding:vnic_type': 'macvtap', - 'binding:profile': {'foo': 'bar'}, - 'binding:host_id': 'binding-host-id-xxxx', - 'name': 'newName', - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [ - self._port.name, - ] - verifylist = [ - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - def test_set_invalid_json_binding_profile(self): - arglist = [ - '--binding-profile', '{"parent_name"}', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_set_invalid_key_value_binding_profile(self): - arglist = [ - '--binding-profile', 'key', - 'test-port', - ] - self.assertRaises(argparse.ArgumentTypeError, - self.check_parser, - self.cmd, - arglist, - None) - - def test_set_mixed_binding_profile(self): - arglist = [ - '--binding-profile', 'foo=bar', - '--binding-profile', '{"foo2": "bar2"}', - self._port.name, - ] - verifylist = [ - ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), - ('port', self._port.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, - } - self.network.update_port.assert_called_once_with(self._port, **attrs) - self.assertIsNone(result) - - -class TestShowPort(TestPort): - - # The port to show. - _port = network_fakes.FakePort.create_one_port() - - def setUp(self): - super(TestShowPort, self).setUp() - - self.network.find_port = mock.Mock(return_value=self._port) - - # Get the command object to test - self.cmd = port.ShowPort(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._port.name, - ] - verifylist = [ - ('port', self._port.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_port.assert_called_once_with( - self._port.name, ignore_missing=False) - - ref_columns, ref_data = self._get_common_cols_data(self._port) - self.assertEqual(ref_columns, columns) - self.assertEqual(ref_data, data) - - -class TestUnsetPort(TestPort): - - def setUp(self): - super(TestUnsetPort, self).setUp() - self._testport = network_fakes.FakePort.create_one_port( - {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '0.0.0.1'}, - {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '1.0.0.0'}], - 'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'}}) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet( - {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'}) - self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) - self.network.find_port = mock.Mock(return_value=self._testport) - self.network.update_port = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = port.UnsetPort(self.app, self.namespace) - - def test_unset_port_parameters(self): - arglist = [ - '--fixed-ip', - 'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0', - '--binding-profile', 'Superman', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{ - 'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip-address': '1.0.0.0'}]), - ('binding_profile', ['Superman']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'fixed_ips': [{ - 'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', - 'ip_address': '0.0.0.1'}], - 'binding:profile': {'batman': 'Joker'} - } - self.network.update_port.assert_called_once_with( - self._testport, **attrs) - self.assertIsNone(result) - - def test_unset_port_fixed_ip_not_existent(self): - arglist = [ - '--fixed-ip', 'ip-address=1.0.0.1', - '--binding-profile', 'Superman', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '1.0.0.1'}]), - ('binding_profile', ['Superman']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) - - def test_unset_port_binding_profile_not_existent(self): - arglist = [ - '--fixed-ip', 'ip-address=1.0.0.0', - '--binding-profile', 'Neo', - self._testport.name, - ] - verifylist = [ - ('fixed_ip', [{'ip-address': '1.0.0.0'}]), - ('binding_profile', ['Neo']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) diff --git a/openstackclient/tests/network/v2/test_router.py b/openstackclient/tests/network/v2/test_router.py deleted file mode 100644 index 1ef4707b..00000000 --- a/openstackclient/tests/network/v2/test_router.py +++ /dev/null @@ -1,751 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils as osc_utils - -from openstackclient.network.v2 import router -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestRouter(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestRouter, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestAddPortToRouter(TestRouter): - '''Add port to Router ''' - - _port = network_fakes.FakePort.create_one_port() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'port': _port.id}) - - def setUp(self): - super(TestAddPortToRouter, self).setUp() - self.network.router_add_interface = mock.Mock() - self.cmd = router.AddPortToRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_port = mock.Mock(return_value=self._port) - - def test_add_port_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_add_port_required_options(self): - arglist = [ - self._router.id, - self._router.port, - ] - verifylist = [ - ('router', self._router.id), - ('port', self._router.port), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.router_add_interface.assert_called_with(self._router, **{ - 'port_id': self._router.port, - }) - self.assertIsNone(result) - - -class TestAddSubnetToRouter(TestRouter): - '''Add subnet to Router ''' - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'subnet': _subnet.id}) - - def setUp(self): - super(TestAddSubnetToRouter, self).setUp() - self.network.router_add_interface = mock.Mock() - self.cmd = router.AddSubnetToRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_add_subnet_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_add_subnet_required_options(self): - arglist = [ - self._router.id, - self._router.subnet, - ] - verifylist = [ - ('router', self._router.id), - ('subnet', self._router.subnet), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.router_add_interface.assert_called_with( - self._router, **{'subnet_id': self._router.subnet}) - - self.assertIsNone(result) - - -class TestCreateRouter(TestRouter): - - # The new router created. - new_router = network_fakes.FakeRouter.create_one_router() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'distributed', - 'external_gateway_info', - 'ha', - 'id', - 'name', - 'project_id', - 'routes', - 'status', - ) - data = ( - router._format_admin_state(new_router.admin_state_up), - osc_utils.format_list(new_router.availability_zone_hints), - osc_utils.format_list(new_router.availability_zones), - new_router.distributed, - router._format_external_gateway_info(new_router.external_gateway_info), - new_router.ha, - new_router.id, - new_router.name, - new_router.tenant_id, - router._format_routes(new_router.routes), - new_router.status, - ) - - def setUp(self): - super(TestCreateRouter, self).setUp() - - self.network.create_router = mock.Mock(return_value=self.new_router) - - # Get the command object to test - self.cmd = router.CreateRouter(self.app, self.namespace) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.new_router.name, - ] - verifylist = [ - ('name', self.new_router.name), - ('enable', True), - ('distributed', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_router.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self.new_router.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_with_AZ_hints(self): - arglist = [ - self.new_router.name, - '--availability-zone-hint', 'fake-az', - '--availability-zone-hint', 'fake-az2', - ] - verifylist = [ - ('name', self.new_router.name), - ('availability_zone_hints', ['fake-az', 'fake-az2']), - ('enable', True), - ('distributed', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - self.network.create_router.assert_called_once_with(**{ - 'admin_state_up': True, - 'name': self.new_router.name, - 'availability_zone_hints': ['fake-az', 'fake-az2'], - }) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteRouter(TestRouter): - - # The routers to delete. - _routers = network_fakes.FakeRouter.create_routers(count=2) - - def setUp(self): - super(TestDeleteRouter, self).setUp() - - self.network.delete_router = mock.Mock(return_value=None) - - self.network.find_router = ( - network_fakes.FakeRouter.get_routers(self._routers)) - - # Get the command object to test - self.cmd = router.DeleteRouter(self.app, self.namespace) - - def test_router_delete(self): - arglist = [ - self._routers[0].name, - ] - verifylist = [ - ('router', [self._routers[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.delete_router.assert_called_once_with(self._routers[0]) - self.assertIsNone(result) - - def test_multi_routers_delete(self): - arglist = [] - verifylist = [] - - for r in self._routers: - arglist.append(r.name) - verifylist = [ - ('router', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for r in self._routers: - calls.append(call(r)) - self.network.delete_router.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_routers_delete_with_exception(self): - arglist = [ - self._routers[0].name, - 'unexist_router', - ] - verifylist = [ - ('router', - [self._routers[0].name, 'unexist_router']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._routers[0], exceptions.CommandError] - self.network.find_router = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 routers failed to delete.', str(e)) - - self.network.find_router.assert_any_call( - self._routers[0].name, ignore_missing=False) - self.network.find_router.assert_any_call( - 'unexist_router', ignore_missing=False) - self.network.delete_router.assert_called_once_with( - self._routers[0] - ) - - -class TestListRouter(TestRouter): - - # The routers going to be listed up. - routers = network_fakes.FakeRouter.create_routers(count=3) - - columns = ( - 'ID', - 'Name', - 'Status', - 'State', - 'Distributed', - 'HA', - 'Project', - ) - columns_long = columns + ( - 'Routes', - 'External gateway info', - 'Availability zones' - ) - - data = [] - for r in routers: - data.append(( - r.id, - r.name, - r.status, - router._format_admin_state(r.admin_state_up), - r.distributed, - r.ha, - r.tenant_id, - )) - data_long = [] - for i in range(0, len(routers)): - r = routers[i] - data_long.append( - data[i] + ( - router._format_routes(r.routes), - router._format_external_gateway_info(r.external_gateway_info), - osc_utils.format_list(r.availability_zones), - ) - ) - - def setUp(self): - super(TestListRouter, self).setUp() - - # Get the command object to test - self.cmd = router.ListRouter(self.app, self.namespace) - - self.network.routers = mock.Mock(return_value=self.routers) - - def test_router_list_no_options(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.routers.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_router_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. - columns, data = self.cmd.take_action(parsed_args) - - self.network.routers.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestRemovePortFromRouter(TestRouter): - '''Remove port from a Router ''' - - _port = network_fakes.FakePort.create_one_port() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'port': _port.id}) - - def setUp(self): - super(TestRemovePortFromRouter, self).setUp() - self.network.router_remove_interface = mock.Mock() - self.cmd = router.RemovePortFromRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_port = mock.Mock(return_value=self._port) - - def test_remove_port_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_remove_port_required_options(self): - arglist = [ - self._router.id, - self._router.port, - ] - verifylist = [ - ('router', self._router.id), - ('port', self._router.port), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.router_remove_interface.assert_called_with( - self._router, **{'port_id': self._router.port}) - self.assertIsNone(result) - - -class TestRemoveSubnetFromRouter(TestRouter): - '''Remove subnet from Router ''' - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - _router = network_fakes.FakeRouter.create_one_router( - attrs={'subnet': _subnet.id}) - - def setUp(self): - super(TestRemoveSubnetFromRouter, self).setUp() - self.network.router_remove_interface = mock.Mock() - self.cmd = router.RemoveSubnetFromRouter(self.app, self.namespace) - self.network.find_router = mock.Mock(return_value=self._router) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_remove_subnet_no_option(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_remove_subnet_required_options(self): - arglist = [ - self._router.id, - self._router.subnet, - ] - verifylist = [ - ('subnet', self._router.subnet), - ('router', self._router.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.router_remove_interface.assert_called_with( - self._router, **{'subnet_id': self._router.subnet}) - self.assertIsNone(result) - - -class TestSetRouter(TestRouter): - - # The router to set. - _default_route = {'destination': '10.20.20.0/24', 'nexthop': '10.20.30.1'} - _router = network_fakes.FakeRouter.create_one_router( - attrs={'routes': [_default_route]} - ) - - def setUp(self): - super(TestSetRouter, self).setUp() - - self.network.update_router = mock.Mock(return_value=None) - - self.network.find_router = mock.Mock(return_value=self._router) - - # Get the command object to test - self.cmd = router.SetRouter(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - self._router.name, - '--enable', - '--distributed', - '--name', 'noob', - ] - verifylist = [ - ('router', self._router.name), - ('enable', True), - ('distributed', True), - ('name', 'noob'), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': True, - 'distributed': True, - 'name': 'noob', - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - self._router.name, - '--disable', - '--centralized', - ] - verifylist = [ - ('router', self._router.name), - ('disable', True), - ('centralized', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - 'distributed': False, - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_distributed_centralized(self): - arglist = [ - self._router.name, - '--distributed', - '--centralized', - ] - verifylist = [ - ('router', self._router.name), - ('distributed', True), - ('distributed', False), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_route(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': self._router.routes + [{'destination': '10.20.30.0/24', - 'nexthop': '10.20.30.1'}], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_no_route(self): - arglist = [ - self._router.name, - '--no-route', - ] - verifylist = [ - ('router', self._router.name), - ('no_route', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_route_no_route(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - '--no-route', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ('no_route', True), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_clear_routes(self): - arglist = [ - self._router.name, - '--clear-routes', - ] - verifylist = [ - ('router', self._router.name), - ('clear_routes', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [], - } - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - def test_set_route_clear_routes(self): - arglist = [ - self._router.name, - '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', - '--clear-routes', - ] - verifylist = [ - ('router', self._router.name), - ('routes', [{'destination': '10.20.30.0/24', - 'gateway': '10.20.30.1'}]), - ('clear_routes', True), - ] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_nothing(self): - arglist = [ - self._router.name, - ] - verifylist = [ - ('router', self._router.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_router.assert_called_once_with( - self._router, **attrs) - self.assertIsNone(result) - - -class TestShowRouter(TestRouter): - - # The router to set. - _router = network_fakes.FakeRouter.create_one_router() - - columns = ( - 'admin_state_up', - 'availability_zone_hints', - 'availability_zones', - 'distributed', - 'external_gateway_info', - 'ha', - 'id', - 'name', - 'project_id', - 'routes', - 'status', - ) - data = ( - router._format_admin_state(_router.admin_state_up), - osc_utils.format_list(_router.availability_zone_hints), - osc_utils.format_list(_router.availability_zones), - _router.distributed, - router._format_external_gateway_info(_router.external_gateway_info), - _router.ha, - _router.id, - _router.name, - _router.tenant_id, - router._format_routes(_router.routes), - _router.status, - ) - - def setUp(self): - super(TestShowRouter, self).setUp() - - self.network.find_router = mock.Mock(return_value=self._router) - - # Get the command object to test - self.cmd = router.ShowRouter(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._router.name, - ] - verifylist = [ - ('router', self._router.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_router.assert_called_once_with( - self._router.name, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetRouter(TestRouter): - - def setUp(self): - super(TestUnsetRouter, self).setUp() - self._testrouter = network_fakes.FakeRouter.create_one_router( - {'routes': [{"destination": "192.168.101.1/24", - "gateway": "172.24.4.3"}, - {"destination": "192.168.101.2/24", - "gateway": "172.24.4.3"}], }) - self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() - self.network.find_router = mock.Mock(return_value=self._testrouter) - self.network.update_router = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = router.UnsetRouter(self.app, self.namespace) - - def test_unset_router_params(self): - arglist = [ - '--route', 'destination=192.168.101.1/24,gateway=172.24.4.3', - self._testrouter.name, - ] - verifylist = [ - ('routes', [ - {"destination": "192.168.101.1/24", "gateway": "172.24.4.3"}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'routes': [{"destination": "192.168.101.2/24", - "nexthop": "172.24.4.3"}], - } - self.network.update_router.assert_called_once_with( - self._testrouter, **attrs) - self.assertIsNone(result) - - def test_unset_router_wrong_routes(self): - arglist = [ - '--route', 'destination=192.168.101.1/24,gateway=172.24.4.2', - self._testrouter.name, - ] - verifylist = [ - ('routes', [ - {"destination": "192.168.101.1/24", "gateway": "172.24.4.2"}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/network/v2/test_security_group.py b/openstackclient/tests/network/v2/test_security_group.py deleted file mode 100644 index 15f4cffe..00000000 --- a/openstackclient/tests/network/v2/test_security_group.py +++ /dev/null @@ -1,770 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network.v2 import security_group -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSecurityGroupNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSecurityGroupNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestSecurityGroupCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be created. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.project_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupNetwork, self).setUp() - - self.network.create_security_group = mock.Mock( - return_value=self._security_group) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, self.namespace) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group.assert_called_once_with(**{ - 'description': self._security_group.name, - 'name': self._security_group.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ('project', self.project.name), - ('project_domain', self.domain.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group.assert_called_once_with(**{ - 'description': self._security_group.description, - 'name': self._security_group.name, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.create.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_network_options(self): - arglist = [ - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.name) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.description) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security groups to be deleted. - _security_groups = \ - network_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupNetwork, self).setUp() - - self.network.delete_security_group = mock.Mock(return_value=None) - - self.network.find_security_group = ( - network_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].name, - ] - verifylist = [ - ('group', [self._security_groups[0].name]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.network.delete_security_group.assert_called_once_with( - self._security_groups[0]) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.name) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s)) - self.network.delete_security_group.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].name, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].name, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.network.find_security_group = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.network.find_security_group.assert_any_call( - self._security_groups[0].name, ignore_missing=False) - self.network.find_security_group.assert_any_call( - 'unexist_security_group', ignore_missing=False) - self.network.delete_security_group.assert_called_once_with( - self._security_groups[0] - ) - - -class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): - - # The security groups to be deleted. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.delete = mock.Mock(return_value=None) - - self.compute.security_groups.get = ( - compute_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, None) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].id, - ] - verifylist = [ - ('group', [self._security_groups[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.id) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s.id)) - self.compute.security_groups.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].id, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].id, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.compute.security_groups.get = ( - mock.MagicMock(side_effect=find_mock_result) - ) - self.compute.security_groups.find.side_effect = ( - exceptions.NotFound(None)) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.compute.security_groups.get.assert_any_call( - self._security_groups[0].id) - self.compute.security_groups.get.assert_any_call( - 'unexist_security_group') - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id - ) - - -class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group to be listed. - _security_groups = \ - network_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupNetwork, self).setUp() - - self.network.security_groups = mock.Mock( - return_value=self._security_groups) - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, self.namespace) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_groups.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_groups.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestListSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be listed. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - ) - columns_all_projects = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - )) - data_all_projects = [] - for grp in _security_groups: - data_all_projects.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - self.compute.security_groups.list.return_value = self._security_groups - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, None) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': False}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': True}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns_all_projects, columns) - self.assertEqual(self.data_all_projects, list(data)) - - -class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group to be set. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupNetwork, self).setUp() - - self.network.update_security_group = mock.Mock(return_value=None) - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, self.namespace) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.network.update_security_group.assert_called_once_with( - self._security_group, - **{} - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'description': new_description, - 'name': new_name, - } - self.network.update_security_group.assert_called_once_with( - self._security_group, - **attrs - ) - self.assertIsNone(result) - - -class TestSetSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be set. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.update = mock.Mock(return_value=None) - - self.compute.security_groups.get = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, None) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - self._security_group.name, - self._security_group.description - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - new_name, - new_description - ) - self.assertIsNone(result) - - -class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): - - # The security group rule to be shown with the group. - _security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'security_group_rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.project_id, - security_group._format_network_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, self.namespace) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_security_group.assert_called_once_with( - self._security_group.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestShowSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group rule to be shown with the group. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - security_group._format_compute_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py deleted file mode 100644 index 170989bf..00000000 --- a/openstackclient/tests/network/v2/test_security_group_rule.py +++ /dev/null @@ -1,1177 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import copy -import mock -from mock import call - -from osc_lib import exceptions - -from openstackclient.network import utils as network_utils -from openstackclient.network.v2 import security_group_rule -from openstackclient.tests.compute.v2 import fakes as compute_fakes -from openstackclient.tests import fakes -from openstackclient.tests.identity.v3 import fakes as identity_fakes -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSecurityGroupRuleNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupRuleCompute, self).setUp() - - # Get a shortcut to the network client - self.compute = self.app.client_manager.compute - - -class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - expected_columns = ( - 'direction', - 'ethertype', - 'id', - 'port_range_max', - 'port_range_min', - 'project_id', - 'protocol', - 'remote_group_id', - 'remote_ip_prefix', - 'security_group_id', - ) - - expected_data = None - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.network.create_security_group_rule = mock.Mock( - return_value=self._security_group_rule) - self.expected_data = ( - self._security_group_rule.direction, - self._security_group_rule.ethertype, - self._security_group_rule.id, - self._security_group_rule.port_range_max, - self._security_group_rule.port_range_min, - self._security_group_rule.project_id, - self._security_group_rule.protocol, - self._security_group_rule.remote_group_id, - self._security_group_rule.remote_ip_prefix, - self._security_group_rule.security_group_id, - ) - - def setUp(self): - super(TestCreateSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule( - self.app, self.namespace) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_ethertype(self): - arglist = [ - '--ethertype', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_port_range_options(self): - arglist = [ - '--dst-port', '80:80', - '--icmp-type', '3', - '--icmp-code', '1', - self._security_group.id, - ] - verifylist = [ - ('dst_port', (80, 80)), - ('icmp_type', 3), - ('icmp_code', 1), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_default_rule(self): - self._setup_security_group_rule({ - 'port_range_max': 443, - 'port_range_min': 443, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_proto_option(self): - self._setup_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - }) - arglist = [ - '--proto', self._security_group_rule.protocol, - '--src-ip', self._security_group_rule.remote_ip_prefix, - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.remote_ip_prefix), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_source_group(self): - self._setup_security_group_rule({ - 'port_range_max': 22, - 'port_range_min': 22, - 'remote_group_id': self._security_group.id, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - '--ingress', - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('ingress', True), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_group_id': self._security_group_rule.remote_group_id, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_source_ip(self): - self._setup_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - }) - arglist = [ - '--protocol', self._security_group_rule.protocol, - '--src-ip', self._security_group_rule.remote_ip_prefix, - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.protocol), - ('src_ip', self._security_group_rule.remote_ip_prefix), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_network_options(self): - self._setup_security_group_rule({ - 'direction': 'egress', - 'ethertype': 'IPv6', - 'port_range_max': 443, - 'port_range_min': 443, - 'protocol': '6', - 'remote_group_id': None, - 'remote_ip_prefix': None, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.port_range_min), - '--egress', - '--ethertype', self._security_group_rule.ethertype, - '--project', self.project.name, - '--project-domain', self.domain.name, - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.port_range_min, - self._security_group_rule.port_range_max)), - ('egress', True), - ('ethertype', self._security_group_rule.ethertype), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_max': self._security_group_rule.port_range_max, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - 'tenant_id': self.project.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_tcp_with_icmp_type(self): - arglist = [ - '--protocol', 'tcp', - '--icmp-type', '15', - self._security_group.id, - ] - verifylist = [ - ('protocol', 'tcp'), - ('icmp_type', 15), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_icmp_code(self): - arglist = [ - '--protocol', '1', - '--icmp-code', '1', - self._security_group.id, - ] - verifylist = [ - ('protocol', '1'), - ('icmp_code', 1), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) - - def test_create_icmp_type(self): - self._setup_security_group_rule({ - 'port_range_min': 15, - 'protocol': 'icmp', - 'remote_ip_prefix': '0.0.0.0/0', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', None), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_ipv6_icmp_type_code(self): - self._setup_security_group_rule({ - 'ethertype': 'IPv6', - 'port_range_min': 139, - 'port_range_max': 2, - 'protocol': 'ipv6-icmp', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--icmp-code', str(self._security_group_rule.port_range_max), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', self._security_group_rule.port_range_max), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'port_range_max': self._security_group_rule.port_range_max, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - def test_create_icmpv6_type(self): - self._setup_security_group_rule({ - 'ethertype': 'IPv6', - 'port_range_min': 139, - 'protocol': 'icmpv6', - }) - arglist = [ - '--icmp-type', str(self._security_group_rule.port_range_min), - '--protocol', self._security_group_rule.protocol, - self._security_group.id, - ] - verifylist = [ - ('dst_port', None), - ('icmp_type', self._security_group_rule.port_range_min), - ('icmp_code', None), - ('protocol', self._security_group_rule.protocol), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_security_group_rule.assert_called_once_with(**{ - 'direction': self._security_group_rule.direction, - 'ethertype': self._security_group_rule.ethertype, - 'port_range_min': self._security_group_rule.port_range_min, - 'protocol': self._security_group_rule.protocol, - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns, columns) - self.assertEqual(self.expected_data, data) - - -class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.compute.security_group_rules.create.return_value = \ - self._security_group_rule - expected_columns, expected_data = \ - security_group_rule._format_security_group_rule_show( - self._security_group_rule._info) - return expected_columns, expected_data - - def setUp(self): - super(TestCreateSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_protocol(self): - arglist = [ - '--protocol', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_network_options(self): - arglist = [ - '--ingress', - '--ethertype', 'IPv4', - '--icmp-type', '3', - '--icmp-code', '11', - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_default_rule(self): - expected_columns, expected_data = self._setup_security_group_rule() - dst_port = str(self._security_group_rule.from_port) + ':' + \ - str(self._security_group_rule.to_port) - arglist = [ - '--dst-port', dst_port, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_proto_option(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--proto', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.ip_protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - -class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group rules to be deleted. - _security_group_rules = \ - network_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleNetwork, self).setUp() - - self.network.delete_security_group_rule = mock.Mock(return_value=None) - - self.network.find_security_group_rule = ( - network_fakes.FakeSecurityGroupRule.get_security_group_rules( - self._security_group_rules) - ) - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule( - self.app, self.namespace) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_security_group_rule.assert_called_once_with( - self._security_group_rules[0]) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s)) - self.network.delete_security_group_rule.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [ - self._security_group_rules[0], exceptions.CommandError] - self.network.find_security_group_rule = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.network.find_security_group_rule.assert_any_call( - self._security_group_rules[0].id, ignore_missing=False) - self.network.find_security_group_rule.assert_any_call( - 'unexist_rule', ignore_missing=False) - self.network.delete_security_group_rule.assert_called_once_with( - self._security_group_rules[0] - ) - - -class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be deleted. - _security_group_rules = \ - compute_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.delete.assert_called_once_with( - self._security_group_rules[0].id) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s.id)) - self.compute.security_group_rules.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [None, exceptions.CommandError] - self.compute.security_group_rules.delete = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.compute.security_group_rules.delete.assert_any_call( - self._security_group_rules[0].id) - self.compute.security_group_rules.delete.assert_any_call( - 'unexist_rule') - - -class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group to hold the rules. - _security_group = \ - network_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'protocol': 'tcp', - 'port_range_max': 80, - 'port_range_min': 80, - 'security_group_id': _security_group.id, - }) - _security_group_rule_icmp = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'protocol': 'icmp', - 'remote_ip_prefix': '10.0.2.0/24', - 'security_group_id': _security_group.id, - }) - _security_group.security_group_rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - _security_group_rules = [_security_group_rule_tcp, - _security_group_rule_icmp] - - expected_columns_with_group_and_long = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Direction', - 'Ethertype', - 'Remote Security Group', - ) - expected_columns_no_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - 'Security Group', - ) - - expected_data_with_group_and_long = [] - expected_data_no_group = [] - for _security_group_rule in _security_group_rules: - expected_data_with_group_and_long.append(( - _security_group_rule.id, - _security_group_rule.protocol, - _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule), - _security_group_rule.direction, - _security_group_rule.ethertype, - _security_group_rule.remote_group_id, - )) - expected_data_no_group.append(( - _security_group_rule.id, - _security_group_rule.protocol, - _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule), - _security_group_rule.remote_group_id, - _security_group_rule.security_group_id, - )) - - def setUp(self): - super(TestListSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group = mock.Mock( - return_value=self._security_group) - self.network.security_group_rules = mock.Mock( - return_value=self._security_group_rules) - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule( - self.app, self.namespace) - - def test_list_default(self): - self._security_group_rule_tcp.port_range_min = 80 - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{}) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group_and_long(self): - self._security_group_rule_tcp.port_range_min = 80 - arglist = [ - '--long', - self._security_group.id, - ] - verifylist = [ - ('long', True), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{ - 'security_group_id': self._security_group.id, - }) - self.assertEqual(self.expected_columns_with_group_and_long, columns) - self.assertEqual(self.expected_data_with_group_and_long, list(data)) - - def test_list_with_ignored_options(self): - self._security_group_rule_tcp.port_range_min = 80 - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.security_group_rules.assert_called_once_with(**{}) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - -class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group to hold the rules. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'tcp', - 'from_port': 80, - 'to_port': 80, - 'group': {'name': _security_group.name}, - }) - _security_group_rule_icmp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - 'group': {'name': _security_group.name}, - }) - _security_group.rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - - expected_columns_with_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - ) - expected_columns_no_group = \ - expected_columns_with_group + ('Security Group',) - - expected_data_with_group = [] - expected_data_no_group = [] - for _security_group_rule in _security_group.rules: - rule = network_utils.transform_compute_security_group_rule( - _security_group_rule - ) - expected_rule_with_group = ( - rule['id'], - rule['ip_protocol'], - rule['ip_range'], - rule['port_range'], - rule['remote_security_group'], - ) - expected_rule_no_group = expected_rule_with_group + \ - (_security_group_rule['parent_group_id'],) - expected_data_with_group.append(expected_rule_with_group) - expected_data_no_group.append(expected_rule_no_group) - - def setUp(self): - super(TestListSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = \ - self._security_group - self.compute.security_groups.list.return_value = \ - [self._security_group] - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) - - def test_list_default(self): - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id - ) - self.assertEqual(self.expected_columns_with_group, columns) - self.assertEqual(self.expected_data_with_group, list(data)) - - def test_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': True} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_ignored_options(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - -class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): - - # The security group rule to be shown. - _security_group_rule = \ - network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns = ( - 'direction', - 'ethertype', - 'id', - 'port_range_max', - 'port_range_min', - 'project_id', - 'protocol', - 'remote_group_id', - 'remote_ip_prefix', - 'security_group_id', - ) - - data = ( - _security_group_rule.direction, - _security_group_rule.ethertype, - _security_group_rule.id, - _security_group_rule.port_range_max, - _security_group_rule.port_range_min, - _security_group_rule.project_id, - _security_group_rule.protocol, - _security_group_rule.remote_group_id, - _security_group_rule.remote_ip_prefix, - _security_group_rule.security_group_id, - ) - - def setUp(self): - super(TestShowSecurityGroupRuleNetwork, self).setUp() - - self.network.find_security_group_rule = mock.Mock( - return_value=self._security_group_rule) - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule( - self.app, self.namespace) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_security_group_rule.assert_called_once_with( - self._security_group_rule.id, ignore_missing=False) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be shown. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns, data = \ - security_group_rule._format_security_group_rule_show( - _security_group_rule._info) - - def setUp(self): - super(TestShowSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Build a security group fake customized for this test. - security_group_rules = [self._security_group_rule._info] - security_group = fakes.FakeResource( - info=copy.deepcopy({'rules': security_group_rules}), - loaded=True) - security_group.rules = security_group_rules - self.compute.security_groups.list.return_value = [security_group] - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/network/v2/test_subnet.py b/openstackclient/tests/network/v2/test_subnet.py deleted file mode 100644 index c117c6fd..00000000 --- a/openstackclient/tests/network/v2/test_subnet.py +++ /dev/null @@ -1,978 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import subnet as subnet_v2 -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSubnet(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSubnet, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateSubnet(TestSubnet): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # An IPv4 subnet to be created with mostly default values - _subnet = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - } - ) - - # Subnet pool to be used to create a subnet from a pool - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - # An IPv4 subnet to be created using a specific subnet pool - _subnet_from_pool = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - 'subnetpool_id': _subnet_pool.id, - 'dns_nameservers': ['8.8.8.8', - '8.8.4.4'], - 'host_routes': [{'destination': '10.20.20.0/24', - 'nexthop': '10.20.20.1'}, - {'destination': '10.30.30.0/24', - 'nexthop': '10.30.30.1'}], - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], - } - ) - - # An IPv6 subnet to be created with most options specified - _subnet_ipv6 = network_fakes.FakeSubnet.create_one_subnet( - attrs={ - 'tenant_id': project.id, - 'cidr': 'fe80:0:0:a00a::/64', - 'enable_dhcp': True, - 'dns_nameservers': ['fe80:27ff:a00a:f00f::ffff', - 'fe80:37ff:a00a:f00f::ffff'], - 'allocation_pools': [{'start': 'fe80::a00a:0:c0de:0:100', - 'end': 'fe80::a00a:0:c0de:0:f000'}, - {'start': 'fe80::a00a:0:c0de:1:100', - 'end': 'fe80::a00a:0:c0de:1:f000'}], - 'host_routes': [{'destination': 'fe80:27ff:a00a:f00f::/64', - 'nexthop': 'fe80:27ff:a00a:f00f::1'}, - {'destination': 'fe80:37ff:a00a:f00f::/64', - 'nexthop': 'fe80:37ff:a00a:f00f::1'}], - 'ip_version': 6, - 'gateway_ip': 'fe80::a00a:0:c0de:0:1', - 'ipv6_address_mode': 'slaac', - 'ipv6_ra_mode': 'slaac', - 'subnetpool_id': 'None', - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], - } - ) - - # The network to be returned from find_network - _network = network_fakes.FakeNetwork.create_one_network( - attrs={ - 'id': _subnet.network_id, - } - ) - - # The network segment to be returned from find_segment - _network_segment = \ - network_fakes.FakeNetworkSegment.create_one_network_segment( - attrs={ - 'network_id': _subnet.network_id, - } - ) - - columns = ( - 'allocation_pools', - 'cidr', - 'dns_nameservers', - 'enable_dhcp', - 'gateway_ip', - 'host_routes', - 'id', - 'ip_version', - 'ipv6_address_mode', - 'ipv6_ra_mode', - 'name', - 'network_id', - 'project_id', - 'segment_id', - 'service_types', - 'subnetpool_id', - ) - - data = ( - subnet_v2._format_allocation_pools(_subnet.allocation_pools), - _subnet.cidr, - utils.format_list(_subnet.dns_nameservers), - _subnet.enable_dhcp, - _subnet.gateway_ip, - subnet_v2._format_host_routes(_subnet.host_routes), - _subnet.id, - _subnet.ip_version, - _subnet.ipv6_address_mode, - _subnet.ipv6_ra_mode, - _subnet.name, - _subnet.network_id, - _subnet.project_id, - _subnet.segment_id, - utils.format_list(_subnet.service_types), - _subnet.subnetpool_id, - ) - - data_subnet_pool = ( - subnet_v2._format_allocation_pools(_subnet_from_pool.allocation_pools), - _subnet_from_pool.cidr, - utils.format_list(_subnet_from_pool.dns_nameservers), - _subnet_from_pool.enable_dhcp, - _subnet_from_pool.gateway_ip, - subnet_v2._format_host_routes(_subnet_from_pool.host_routes), - _subnet_from_pool.id, - _subnet_from_pool.ip_version, - _subnet_from_pool.ipv6_address_mode, - _subnet_from_pool.ipv6_ra_mode, - _subnet_from_pool.name, - _subnet_from_pool.network_id, - _subnet_from_pool.project_id, - _subnet_from_pool.segment_id, - utils.format_list(_subnet_from_pool.service_types), - _subnet_from_pool.subnetpool_id, - ) - - data_ipv6 = ( - subnet_v2._format_allocation_pools(_subnet_ipv6.allocation_pools), - _subnet_ipv6.cidr, - utils.format_list(_subnet_ipv6.dns_nameservers), - _subnet_ipv6.enable_dhcp, - _subnet_ipv6.gateway_ip, - subnet_v2._format_host_routes(_subnet_ipv6.host_routes), - _subnet_ipv6.id, - _subnet_ipv6.ip_version, - _subnet_ipv6.ipv6_address_mode, - _subnet_ipv6.ipv6_ra_mode, - _subnet_ipv6.name, - _subnet_ipv6.network_id, - _subnet_ipv6.project_id, - _subnet_ipv6.segment_id, - utils.format_list(_subnet_ipv6.service_types), - _subnet_ipv6.subnetpool_id, - ) - - def setUp(self): - super(TestCreateSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.CreateSubnet(self.app, self.namespace) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - # Mock SDK calls for all tests. - self.network.find_network = mock.Mock(return_value=self._network) - self.network.find_segment = mock.Mock( - return_value=self._network_segment - ) - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool - ) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - # Testing that a call without the required argument will fail and - # throw a "ParserExecption" - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_create_default_options(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet) - self._network.id = self._subnet.network_id - - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network', self._subnet.network_id), - ('ip_version', self._subnet.ip_version), - ('gateway', 'auto'), - - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet.cidr, - 'ip_version': self._subnet.ip_version, - 'name': self._subnet.name, - 'network_id': self._subnet.network_id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_from_subnet_pool_options(self): - # Mock SDK calls for this test. - self.network.create_subnet = \ - mock.Mock(return_value=self._subnet_from_pool) - self._network.id = self._subnet_from_pool.network_id - - arglist = [ - self._subnet_from_pool.name, - "--subnet-pool", self._subnet_from_pool.subnetpool_id, - "--prefix-length", '24', - "--network", self._subnet_from_pool.network_id, - "--ip-version", str(self._subnet_from_pool.ip_version), - "--gateway", self._subnet_from_pool.gateway_ip, - "--dhcp", - ] - - for dns_addr in self._subnet_from_pool.dns_nameservers: - arglist.append('--dns-nameserver') - arglist.append(dns_addr) - - for host_route in self._subnet_from_pool.host_routes: - arglist.append('--host-route') - value = 'gateway=' + host_route.get('nexthop', '') + \ - ',destination=' + host_route.get('destination', '') - arglist.append(value) - - for service_type in self._subnet_from_pool.service_types: - arglist.append('--service-type') - arglist.append(service_type) - - verifylist = [ - ('name', self._subnet_from_pool.name), - ('prefix_length', '24'), - ('network', self._subnet_from_pool.network_id), - ('ip_version', self._subnet_from_pool.ip_version), - ('gateway', self._subnet_from_pool.gateway_ip), - ('dns_nameservers', self._subnet_from_pool.dns_nameservers), - ('dhcp', self._subnet_from_pool.enable_dhcp), - ('host_routes', subnet_v2.convert_entries_to_gateway( - self._subnet_from_pool.host_routes)), - ('subnet_pool', self._subnet_from_pool.subnetpool_id), - ('service_types', self._subnet_from_pool.service_types), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'dns_nameservers': self._subnet_from_pool.dns_nameservers, - 'enable_dhcp': self._subnet_from_pool.enable_dhcp, - 'gateway_ip': self._subnet_from_pool.gateway_ip, - 'host_routes': self._subnet_from_pool.host_routes, - 'ip_version': self._subnet_from_pool.ip_version, - 'name': self._subnet_from_pool.name, - 'network_id': self._subnet_from_pool.network_id, - 'prefixlen': '24', - 'subnetpool_id': self._subnet_from_pool.subnetpool_id, - 'service_types': self._subnet_from_pool.service_types, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data_subnet_pool, data) - - def test_create_options_subnet_range_ipv6(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet_ipv6) - self._network.id = self._subnet_ipv6.network_id - - arglist = [ - self._subnet_ipv6.name, - "--subnet-range", self._subnet_ipv6.cidr, - "--network", self._subnet_ipv6.network_id, - "--ip-version", str(self._subnet_ipv6.ip_version), - "--ipv6-ra-mode", self._subnet_ipv6.ipv6_ra_mode, - "--ipv6-address-mode", self._subnet_ipv6.ipv6_address_mode, - "--gateway", self._subnet_ipv6.gateway_ip, - "--dhcp", - ] - - for dns_addr in self._subnet_ipv6.dns_nameservers: - arglist.append('--dns-nameserver') - arglist.append(dns_addr) - - for host_route in self._subnet_ipv6.host_routes: - arglist.append('--host-route') - value = 'gateway=' + host_route.get('nexthop', '') + \ - ',destination=' + host_route.get('destination', '') - arglist.append(value) - - for pool in self._subnet_ipv6.allocation_pools: - arglist.append('--allocation-pool') - value = 'start=' + pool.get('start', '') + \ - ',end=' + pool.get('end', '') - arglist.append(value) - - for service_type in self._subnet_ipv6.service_types: - arglist.append('--service-type') - arglist.append(service_type) - - verifylist = [ - ('name', self._subnet_ipv6.name), - ('subnet_range', self._subnet_ipv6.cidr), - ('network', self._subnet_ipv6.network_id), - ('ip_version', self._subnet_ipv6.ip_version), - ('ipv6_ra_mode', self._subnet_ipv6.ipv6_ra_mode), - ('ipv6_address_mode', self._subnet_ipv6.ipv6_address_mode), - ('gateway', self._subnet_ipv6.gateway_ip), - ('dns_nameservers', self._subnet_ipv6.dns_nameservers), - ('dhcp', self._subnet_ipv6.enable_dhcp), - ('host_routes', subnet_v2.convert_entries_to_gateway( - self._subnet_ipv6.host_routes)), - ('allocation_pools', self._subnet_ipv6.allocation_pools), - ('service_types', self._subnet_ipv6.service_types), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet_ipv6.cidr, - 'dns_nameservers': self._subnet_ipv6.dns_nameservers, - 'enable_dhcp': self._subnet_ipv6.enable_dhcp, - 'gateway_ip': self._subnet_ipv6.gateway_ip, - 'host_routes': self._subnet_ipv6.host_routes, - 'ip_version': self._subnet_ipv6.ip_version, - 'ipv6_address_mode': self._subnet_ipv6.ipv6_address_mode, - 'ipv6_ra_mode': self._subnet_ipv6.ipv6_ra_mode, - 'name': self._subnet_ipv6.name, - 'network_id': self._subnet_ipv6.network_id, - 'allocation_pools': self._subnet_ipv6.allocation_pools, - 'service_types': self._subnet_ipv6.service_types, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data_ipv6, data) - - def test_create_no_beta_command_options(self): - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network-segment", self._network_segment.id, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network-segment', self._network_segment.id), - ('network', self._subnet.network_id), - ] - self.app.options.os_beta_command = False - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_create_with_network_segment(self): - # Mock SDK calls for this test. - self.network.create_subnet = mock.Mock(return_value=self._subnet) - self._network.id = self._subnet.network_id - - arglist = [ - "--subnet-range", self._subnet.cidr, - "--network-segment", self._network_segment.id, - "--network", self._subnet.network_id, - self._subnet.name, - ] - verifylist = [ - ('name', self._subnet.name), - ('subnet_range', self._subnet.cidr), - ('network_segment', self._network_segment.id), - ('network', self._subnet.network_id), - ('ip_version', self._subnet.ip_version), - ('gateway', 'auto'), - - ] - - self.app.options.os_beta_command = True - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.create_subnet.assert_called_once_with(**{ - 'cidr': self._subnet.cidr, - 'ip_version': self._subnet.ip_version, - 'name': self._subnet.name, - 'network_id': self._subnet.network_id, - 'segment_id': self._network_segment.id, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSubnet(TestSubnet): - - # The subnets to delete. - _subnets = network_fakes.FakeSubnet.create_subnets(count=2) - - def setUp(self): - super(TestDeleteSubnet, self).setUp() - - self.network.delete_subnet = mock.Mock(return_value=None) - - self.network.find_subnet = ( - network_fakes.FakeSubnet.get_subnets(self._subnets)) - - # Get the command object to test - self.cmd = subnet_v2.DeleteSubnet(self.app, self.namespace) - - def test_subnet_delete(self): - arglist = [ - self._subnets[0].name, - ] - verifylist = [ - ('subnet', [self._subnets[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.network.delete_subnet.assert_called_once_with(self._subnets[0]) - self.assertIsNone(result) - - def test_multi_subnets_delete(self): - arglist = [] - verifylist = [] - - for s in self._subnets: - arglist.append(s.name) - verifylist = [ - ('subnet', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._subnets: - calls.append(call(s)) - self.network.delete_subnet.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_subnets_delete_with_exception(self): - arglist = [ - self._subnets[0].name, - 'unexist_subnet', - ] - verifylist = [ - ('subnet', - [self._subnets[0].name, 'unexist_subnet']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._subnets[0], exceptions.CommandError] - self.network.find_subnet = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 subnets failed to delete.', str(e)) - - self.network.find_subnet.assert_any_call( - self._subnets[0].name, ignore_missing=False) - self.network.find_subnet.assert_any_call( - 'unexist_subnet', ignore_missing=False) - self.network.delete_subnet.assert_called_once_with( - self._subnets[0] - ) - - -class TestListSubnet(TestSubnet): - # The subnets going to be listed up. - _subnet = network_fakes.FakeSubnet.create_subnets(count=3) - - columns = ( - 'ID', - 'Name', - 'Network', - 'Subnet', - ) - columns_long = columns + ( - 'Project', - 'DHCP', - 'Name Servers', - 'Allocation Pools', - 'Host Routes', - 'IP Version', - 'Gateway', - 'Service Types', - ) - - data = [] - for subnet in _subnet: - data.append(( - subnet.id, - subnet.name, - subnet.network_id, - subnet.cidr, - )) - - data_long = [] - for subnet in _subnet: - data_long.append(( - subnet.id, - subnet.name, - subnet.network_id, - subnet.cidr, - subnet.tenant_id, - subnet.enable_dhcp, - utils.format_list(subnet.dns_nameservers), - subnet_v2._format_allocation_pools(subnet.allocation_pools), - utils.format_list(subnet.host_routes), - subnet.ip_version, - subnet.gateway_ip, - utils.format_list(subnet.service_types), - )) - - def setUp(self): - super(TestListSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.ListSubnet(self.app, self.namespace) - - self.network.subnets = mock.Mock(return_value=self._subnet) - - def test_subnet_list_no_options(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnets.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnets.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - def test_subnet_list_ip_version(self): - arglist = [ - '--ip-version', str(4), - ] - verifylist = [ - ('ip_version', 4), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'ip_version': 4} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_dhcp(self): - arglist = [ - '--dhcp', - ] - verifylist = [ - ('dhcp', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'enable_dhcp': True} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_no_dhcp(self): - arglist = [ - '--no-dhcp', - ] - verifylist = [ - ('no_dhcp', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'enable_dhcp': False} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_service_type(self): - arglist = [ - '--service-type', 'network:router_gateway', - ] - verifylist = [ - ('service_types', ['network:router_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'service_types': ['network:router_gateway']} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_list_service_type_multiple(self): - arglist = [ - '--service-type', 'network:router_gateway', - '--service-type', 'network:floatingip_agent_gateway', - ] - verifylist = [ - ('service_types', ['network:router_gateway', - 'network:floatingip_agent_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - filters = {'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway']} - - self.network.subnets.assert_called_once_with(**filters) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestSetSubnet(TestSubnet): - - _subnet = network_fakes.FakeSubnet.create_one_subnet() - - def setUp(self): - super(TestSetSubnet, self).setUp() - self.network.update_subnet = mock.Mock(return_value=None) - self.network.find_subnet = mock.Mock(return_value=self._subnet) - self.cmd = subnet_v2.SetSubnet(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - "--name", "new_subnet", - "--dhcp", - "--gateway", self._subnet.gateway_ip, - self._subnet.name, - ] - verifylist = [ - ('name', "new_subnet"), - ('dhcp', True), - ('gateway', self._subnet.gateway_ip), - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'enable_dhcp': True, - 'gateway_ip': self._subnet.gateway_ip, - 'name': "new_subnet", - } - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - "--name", "new_subnet", - "--no-dhcp", - "--gateway", "none", - self._subnet.name, - ] - verifylist = [ - ('name', "new_subnet"), - ('no_dhcp', True), - ('gateway', "none"), - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'enable_dhcp': False, - 'gateway_ip': None, - 'name': "new_subnet", - } - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._subnet.name, ] - verifylist = [('subnet', self._subnet.name)] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_subnet.assert_called_with(self._subnet, **attrs) - self.assertIsNone(result) - - def test_append_options(self): - _testsubnet = network_fakes.FakeSubnet.create_one_subnet( - {'dns_nameservers': ["10.0.0.1"], - 'service_types': ["network:router_gateway"]}) - self.network.find_subnet = mock.Mock(return_value=_testsubnet) - arglist = [ - '--dns-nameserver', '10.0.0.2', - '--service-type', 'network:floatingip_agent_gateway', - _testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['10.0.0.2']), - ('service_types', ['network:floatingip_agent_gateway']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = { - 'dns_nameservers': ['10.0.0.2', '10.0.0.1'], - 'service_types': ['network:floatingip_agent_gateway', - 'network:router_gateway'], - } - self.network.update_subnet.assert_called_once_with( - _testsubnet, **attrs) - self.assertIsNone(result) - - -class TestShowSubnet(TestSubnet): - # The subnets to be shown - _subnet = network_fakes.FakeSubnet.create_one_subnet() - - columns = ( - 'allocation_pools', - 'cidr', - 'dns_nameservers', - 'enable_dhcp', - 'gateway_ip', - 'host_routes', - 'id', - 'ip_version', - 'ipv6_address_mode', - 'ipv6_ra_mode', - 'name', - 'network_id', - 'project_id', - 'segment_id', - 'service_types', - 'subnetpool_id', - ) - - data = ( - subnet_v2._format_allocation_pools(_subnet.allocation_pools), - _subnet.cidr, - utils.format_list(_subnet.dns_nameservers), - _subnet.enable_dhcp, - _subnet.gateway_ip, - utils.format_list(_subnet.host_routes), - _subnet.id, - _subnet.ip_version, - _subnet.ipv6_address_mode, - _subnet.ipv6_ra_mode, - _subnet.name, - _subnet.network_id, - _subnet.tenant_id, - _subnet.segment_id, - utils.format_list(_subnet.service_types), - _subnet.subnetpool_id, - ) - - def setUp(self): - super(TestShowSubnet, self).setUp() - - # Get the command object to test - self.cmd = subnet_v2.ShowSubnet(self.app, self.namespace) - - self.network.find_subnet = mock.Mock(return_value=self._subnet) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - # Testing that a call without the required argument will fail and - # throw a "ParserExecption" - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._subnet.name, - ] - verifylist = [ - ('subnet', self._subnet.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_subnet.assert_called_once_with( - self._subnet.name, ignore_missing=False) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetSubnet(TestSubnet): - - def setUp(self): - super(TestUnsetSubnet, self).setUp() - self._testsubnet = network_fakes.FakeSubnet.create_one_subnet( - {'dns_nameservers': ['8.8.8.8', - '8.8.8.4'], - 'host_routes': [{'destination': '10.20.20.0/24', - 'nexthop': '10.20.20.1'}, - {'destination': '10.30.30.30/24', - 'nexthop': '10.30.30.1'}], - 'allocation_pools': [{'start': '8.8.8.100', - 'end': '8.8.8.150'}, - {'start': '8.8.8.160', - 'end': '8.8.8.170'}], - 'service_types': ['network:router_gateway', - 'network:floatingip_agent_gateway'], }) - self.network.find_subnet = mock.Mock(return_value=self._testsubnet) - self.network.update_subnet = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = subnet_v2.UnsetSubnet(self.app, self.namespace) - - def test_unset_subnet_params(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - '--service-type', 'network:router_gateway', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ('service_types', ['network:router_gateway']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = { - 'dns_nameservers': ['8.8.8.4'], - 'host_routes': [{ - "destination": "10.20.20.0/24", "nexthop": "10.20.20.1"}], - 'allocation_pools': [{'start': '8.8.8.160', 'end': '8.8.8.170'}], - 'service_types': ['network:floatingip_agent_gateway'], - } - self.network.update_subnet.assert_called_once_with( - self._testsubnet, **attrs) - self.assertIsNone(result) - - def test_unset_subnet_wrong_host_routes(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.2', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.2"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_allocation_pool(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.156', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.156'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_dns_nameservers(self): - arglist = [ - '--dns-nameserver', '8.8.8.1', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.1']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) - - def test_unset_subnet_wrong_service_type(self): - arglist = [ - '--dns-nameserver', '8.8.8.8', - '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', - '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', - '--service-type', 'network:dhcp', - self._testsubnet.name, - ] - verifylist = [ - ('dns_nameservers', ['8.8.8.8']), - ('host_routes', [{ - "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), - ('allocation_pools', [{ - 'start': '8.8.8.100', 'end': '8.8.8.150'}]), - ('service_types', ['network:dhcp']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/network/v2/test_subnet_pool.py b/openstackclient/tests/network/v2/test_subnet_pool.py deleted file mode 100644 index 4cfecef7..00000000 --- a/openstackclient/tests/network/v2/test_subnet_pool.py +++ /dev/null @@ -1,722 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import argparse -import mock -from mock import call - -from osc_lib import exceptions -from osc_lib import utils - -from openstackclient.network.v2 import subnet_pool -from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 -from openstackclient.tests.network.v2 import fakes as network_fakes -from openstackclient.tests import utils as tests_utils - - -class TestSubnetPool(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestSubnetPool, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - # Get a shortcut to the ProjectManager Mock - self.projects_mock = self.app.client_manager.identity.projects - # Get a shortcut to the DomainManager Mock - self.domains_mock = self.app.client_manager.identity.domains - - -class TestCreateSubnetPool(TestSubnetPool): - - project = identity_fakes_v3.FakeProject.create_one_project() - domain = identity_fakes_v3.FakeDomain.create_one_domain() - # The new subnet pool to create. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - columns = ( - 'address_scope_id', - 'default_prefixlen', - 'default_quota', - 'id', - 'ip_version', - 'is_default', - 'max_prefixlen', - 'min_prefixlen', - 'name', - 'prefixes', - 'project_id', - 'shared', - ) - data = ( - _subnet_pool.address_scope_id, - _subnet_pool.default_prefixlen, - _subnet_pool.default_quota, - _subnet_pool.id, - _subnet_pool.ip_version, - _subnet_pool.is_default, - _subnet_pool.max_prefixlen, - _subnet_pool.min_prefixlen, - _subnet_pool.name, - utils.format_list(_subnet_pool.prefixes), - _subnet_pool.project_id, - _subnet_pool.shared, - ) - - def setUp(self): - super(TestCreateSubnetPool, self).setUp() - - self.network.create_subnet_pool = mock.Mock( - return_value=self._subnet_pool) - - # Get the command object to test - self.cmd = subnet_pool.CreateSubnetPool(self.app, self.namespace) - - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - self.projects_mock.get.return_value = self.project - self.domains_mock.get.return_value = self.domain - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_no_pool_prefix(self): - """Make sure --pool-prefix is a required argument""" - arglist = [ - self._subnet_pool.name, - ] - verifylist = [ - ('name', self._subnet_pool.name), - ] - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_prefixlen_options(self): - arglist = [ - '--default-prefix-length', self._subnet_pool.default_prefixlen, - '--max-prefix-length', self._subnet_pool.max_prefixlen, - '--min-prefix-length', self._subnet_pool.min_prefixlen, - '--pool-prefix', '10.0.10.0/24', - self._subnet_pool.name, - ] - verifylist = [ - ('default_prefix_length', - int(self._subnet_pool.default_prefixlen)), - ('max_prefix_length', int(self._subnet_pool.max_prefixlen)), - ('min_prefix_length', int(self._subnet_pool.min_prefixlen)), - ('name', self._subnet_pool.name), - ('prefixes', ['10.0.10.0/24']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'default_prefixlen': int(self._subnet_pool.default_prefixlen), - 'max_prefixlen': int(self._subnet_pool.max_prefixlen), - 'min_prefixlen': int(self._subnet_pool.min_prefixlen), - 'prefixes': ['10.0.10.0/24'], - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_len_negative(self): - arglist = [ - self._subnet_pool.name, - '--min-prefix-length', '-16', - ] - verifylist = [ - ('subnet_pool', self._subnet_pool.name), - ('min_prefix_length', '-16'), - ] - - self.assertRaises(argparse.ArgumentTypeError, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_project_domain(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - "--project", self.project.name, - "--project-domain", self.domain.name, - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('project', self.project.name), - ('project_domain', self.domain.name), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'tenant_id': self.project.id, - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_address_scope_option(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--address-scope', self._address_scope.id, - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('address_scope', self._address_scope.id), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'prefixes': ['10.0.10.0/24'], - 'address_scope_id': self._address_scope.id, - 'name': self._subnet_pool.name, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_default_and_shared_options(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--default', - '--share', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.10.0/24']), - ('default', True), - ('share', True), - ('name', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.network.create_subnet_pool.assert_called_once_with(**{ - 'is_default': True, - 'name': self._subnet_pool.name, - 'prefixes': ['10.0.10.0/24'], - 'shared': True, - }) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteSubnetPool(TestSubnetPool): - - # The subnet pools to delete. - _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=2) - - def setUp(self): - super(TestDeleteSubnetPool, self).setUp() - - self.network.delete_subnet_pool = mock.Mock(return_value=None) - - self.network.find_subnet_pool = ( - network_fakes.FakeSubnetPool.get_subnet_pools(self._subnet_pools) - ) - - # Get the command object to test - self.cmd = subnet_pool.DeleteSubnetPool(self.app, self.namespace) - - def test_subnet_pool_delete(self): - arglist = [ - self._subnet_pools[0].name, - ] - verifylist = [ - ('subnet_pool', [self._subnet_pools[0].name]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.network.delete_subnet_pool.assert_called_once_with( - self._subnet_pools[0]) - self.assertIsNone(result) - - def test_multi_subnet_pools_delete(self): - arglist = [] - verifylist = [] - - for s in self._subnet_pools: - arglist.append(s.name) - verifylist = [ - ('subnet_pool', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._subnet_pools: - calls.append(call(s)) - self.network.delete_subnet_pool.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_subnet_pools_delete_with_exception(self): - arglist = [ - self._subnet_pools[0].name, - 'unexist_subnet_pool', - ] - verifylist = [ - ('subnet_pool', - [self._subnet_pools[0].name, 'unexist_subnet_pool']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._subnet_pools[0], exceptions.CommandError] - self.network.find_subnet_pool = ( - mock.MagicMock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 subnet pools failed to delete.', str(e)) - - self.network.find_subnet_pool.assert_any_call( - self._subnet_pools[0].name, ignore_missing=False) - self.network.find_subnet_pool.assert_any_call( - 'unexist_subnet_pool', ignore_missing=False) - self.network.delete_subnet_pool.assert_called_once_with( - self._subnet_pools[0] - ) - - -class TestListSubnetPool(TestSubnetPool): - # The subnet pools going to be listed up. - _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=3) - - columns = ( - 'ID', - 'Name', - 'Prefixes', - ) - columns_long = columns + ( - 'Default Prefix Length', - 'Address Scope', - 'Default Subnet Pool', - 'Shared', - ) - - data = [] - for pool in _subnet_pools: - data.append(( - pool.id, - pool.name, - utils.format_list(pool.prefixes), - )) - - data_long = [] - for pool in _subnet_pools: - data_long.append(( - pool.id, - pool.name, - utils.format_list(pool.prefixes), - pool.default_prefixlen, - pool.address_scope_id, - pool.is_default, - pool.shared, - )) - - def setUp(self): - super(TestListSubnetPool, self).setUp() - - # Get the command object to test - self.cmd = subnet_pool.ListSubnetPool(self.app, self.namespace) - - self.network.subnet_pools = mock.Mock(return_value=self._subnet_pools) - - def test_subnet_pool_list_no_option(self): - arglist = [] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnet_pools.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_subnet_pool_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.subnet_pools.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertEqual(self.data_long, list(data)) - - -class TestSetSubnetPool(TestSubnetPool): - - # The subnet_pool to set. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() - - def setUp(self): - super(TestSetSubnetPool, self).setUp() - - self.network.update_subnet_pool = mock.Mock(return_value=None) - - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool) - - self.network.find_address_scope = mock.Mock( - return_value=self._address_scope) - - # Get the command object to test - self.cmd = subnet_pool.SetSubnetPool(self.app, self.namespace) - - def test_set_this(self): - arglist = [ - '--name', 'noob', - '--default-prefix-length', '8', - '--min-prefix-length', '8', - self._subnet_pool.name, - ] - verifylist = [ - ('name', 'noob'), - ('default_prefix_length', 8), - ('min_prefix_length', 8), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'name': 'noob', - 'default_prefixlen': 8, - 'min_prefixlen': 8, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_that(self): - arglist = [ - '--pool-prefix', '10.0.1.0/24', - '--pool-prefix', '10.0.2.0/24', - '--max-prefix-length', '16', - self._subnet_pool.name, - ] - verifylist = [ - ('prefixes', ['10.0.1.0/24', '10.0.2.0/24']), - ('max_prefix_length', 16), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - prefixes = ['10.0.1.0/24', '10.0.2.0/24'] - prefixes.extend(self._subnet_pool.prefixes) - attrs = { - 'prefixes': prefixes, - 'max_prefixlen': 16, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_nothing(self): - arglist = [self._subnet_pool.name, ] - verifylist = [('subnet_pool', self._subnet_pool.name), ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_len_negative(self): - arglist = [ - '--max-prefix-length', '-16', - self._subnet_pool.name, - ] - verifylist = [ - ('max_prefix_length', '-16'), - ('subnet_pool', self._subnet_pool.name), - ] - - self.assertRaises(argparse.ArgumentTypeError, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_address_scope(self): - arglist = [ - '--address-scope', self._address_scope.id, - self._subnet_pool.name, - ] - verifylist = [ - ('address_scope', self._address_scope.id), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'address_scope_id': self._address_scope.id, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_address_scope(self): - arglist = [ - '--no-address-scope', - self._subnet_pool.name, - ] - verifylist = [ - ('no_address_scope', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'address_scope_id': None, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_address_scope_conflict(self): - arglist = [ - '--address-scope', self._address_scope.id, - '--no-address-scope', - self._subnet_pool.name, - ] - verifylist = [ - ('address_scope', self._address_scope.id), - ('no_address_scope', True), - ('subnet_pool', self._subnet_pool.name), - ] - - # Exclusive arguments will conflict here. - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_set_default(self): - arglist = [ - '--default', - self._subnet_pool.name, - ] - verifylist = [ - ('default', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'is_default': True - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_default(self): - arglist = [ - '--no-default', - self._subnet_pool.name, - ] - verifylist = [ - ('no_default', True), - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'is_default': False, - } - self.network.update_subnet_pool.assert_called_once_with( - self._subnet_pool, **attrs) - self.assertIsNone(result) - - def test_set_no_default_conflict(self): - arglist = [ - '--default', - '--no-default', - self._subnet_pool.name, - ] - verifylist = [ - ('default', True), - ('no_default', True), - ('subnet_pool', self._subnet_pool.name), - ] - - # Exclusive arguments will conflict here. - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - -class TestShowSubnetPool(TestSubnetPool): - - # The subnet_pool to set. - _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() - - columns = ( - 'address_scope_id', - 'default_prefixlen', - 'default_quota', - 'id', - 'ip_version', - 'is_default', - 'max_prefixlen', - 'min_prefixlen', - 'name', - 'prefixes', - 'project_id', - 'shared', - ) - - data = ( - _subnet_pool.address_scope_id, - _subnet_pool.default_prefixlen, - _subnet_pool.default_quota, - _subnet_pool.id, - _subnet_pool.ip_version, - _subnet_pool.is_default, - _subnet_pool.max_prefixlen, - _subnet_pool.min_prefixlen, - _subnet_pool.name, - utils.format_list(_subnet_pool.prefixes), - _subnet_pool.tenant_id, - _subnet_pool.shared, - ) - - def setUp(self): - super(TestShowSubnetPool, self).setUp() - - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnet_pool - ) - - # Get the command object to test - self.cmd = subnet_pool.ShowSubnetPool(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._subnet_pool.name, - ] - verifylist = [ - ('subnet_pool', self._subnet_pool.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.network.find_subnet_pool.assert_called_once_with( - self._subnet_pool.name, - ignore_missing=False - ) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestUnsetSubnetPool(TestSubnetPool): - - def setUp(self): - super(TestUnsetSubnetPool, self).setUp() - self._subnetpool = network_fakes.FakeSubnetPool.create_one_subnet_pool( - {'prefixes': ['10.0.10.0/24', '10.1.10.0/24', - '10.2.10.0/24'], }) - self.network.find_subnet_pool = mock.Mock( - return_value=self._subnetpool) - self.network.update_subnet_pool = mock.Mock(return_value=None) - # Get the command object to test - self.cmd = subnet_pool.UnsetSubnetPool(self.app, self.namespace) - - def test_unset_subnet_pool(self): - arglist = [ - '--pool-prefix', '10.0.10.0/24', - '--pool-prefix', '10.1.10.0/24', - self._subnetpool.name, - ] - verifylist = [('prefixes', ['10.0.10.0/24', '10.1.10.0/24'])] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - attrs = {'prefixes': ['10.2.10.0/24']} - self.network.update_subnet_pool.assert_called_once_with( - self._subnetpool, **attrs) - self.assertIsNone(result) - - def test_unset_subnet_pool_prefix_not_existent(self): - arglist = [ - '--pool-prefix', '10.100.1.1/25', - self._subnetpool.name, - ] - verifylist = [('prefixes', ['10.100.1.1/25'])] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - self.assertRaises(exceptions.CommandError, - self.cmd.take_action, - parsed_args) |
