diff options
Diffstat (limited to 'openstackclient/tests/unit/network')
18 files changed, 9584 insertions, 0 deletions
diff --git a/openstackclient/tests/unit/network/__init__.py b/openstackclient/tests/unit/network/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/openstackclient/tests/unit/network/__init__.py diff --git a/openstackclient/tests/unit/network/test_common.py b/openstackclient/tests/unit/network/test_common.py new file mode 100644 index 00000000..325aad2a --- /dev/null +++ b/openstackclient/tests/unit/network/test_common.py @@ -0,0 +1,174 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import argparse +import mock + +from openstackclient.network import common +from openstackclient.tests.unit import utils + + +def _add_common_argument(parser): + parser.add_argument( + 'common', + metavar='<common>', + help='Common argument', + ) + return parser + + +def _add_network_argument(parser): + parser.add_argument( + 'network', + metavar='<network>', + help='Network argument', + ) + return parser + + +def _add_compute_argument(parser): + parser.add_argument( + 'compute', + metavar='<compute>', + help='Compute argument', + ) + return parser + + +class FakeNetworkAndComputeCommand(common.NetworkAndComputeCommand): + + def update_parser_common(self, parser): + return _add_common_argument(parser) + + def update_parser_network(self, parser): + return _add_network_argument(parser) + + def update_parser_compute(self, parser): + return _add_compute_argument(parser) + + def take_action_network(self, client, parsed_args): + return client.network_action(parsed_args) + + def take_action_compute(self, client, parsed_args): + return client.compute_action(parsed_args) + + +class FakeNetworkAndComputeLister(common.NetworkAndComputeLister): + + def update_parser_common(self, parser): + return _add_common_argument(parser) + + def update_parser_network(self, parser): + return _add_network_argument(parser) + + def update_parser_compute(self, parser): + return _add_compute_argument(parser) + + def take_action_network(self, client, parsed_args): + return client.network_action(parsed_args) + + def take_action_compute(self, client, parsed_args): + return client.compute_action(parsed_args) + + +class FakeNetworkAndComputeShowOne(common.NetworkAndComputeShowOne): + + def update_parser_common(self, parser): + return _add_common_argument(parser) + + def update_parser_network(self, parser): + return _add_network_argument(parser) + + def update_parser_compute(self, parser): + return _add_compute_argument(parser) + + def take_action_network(self, client, parsed_args): + return client.network_action(parsed_args) + + def take_action_compute(self, client, parsed_args): + return client.compute_action(parsed_args) + + +class TestNetworkAndCompute(utils.TestCommand): + + def setUp(self): + super(TestNetworkAndCompute, self).setUp() + + self.namespace = argparse.Namespace() + + # Create network client mocks. + self.app.client_manager.network = mock.Mock() + self.network = self.app.client_manager.network + self.network.network_action = mock.Mock( + return_value='take_action_network') + + # Create compute client mocks. + self.app.client_manager.compute = mock.Mock() + self.compute = self.app.client_manager.compute + self.compute.compute_action = mock.Mock( + return_value='take_action_compute') + + # Subclasses can override the command object to test. + self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) + + def test_take_action_network(self): + arglist = [ + 'common', + 'network' + ] + verifylist = [ + ('common', 'common'), + ('network', 'network') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.network_action.assert_called_with(parsed_args) + self.assertEqual('take_action_network', result) + + def test_take_action_compute(self): + arglist = [ + 'common', + 'compute' + ] + verifylist = [ + ('common', 'common'), + ('compute', 'compute') + ] + + self.app.client_manager.network_endpoint_enabled = False + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.compute.compute_action.assert_called_with(parsed_args) + self.assertEqual('take_action_compute', result) + + +class TestNetworkAndComputeCommand(TestNetworkAndCompute): + + def setUp(self): + super(TestNetworkAndComputeCommand, self).setUp() + self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace) + + +class TestNetworkAndComputeLister(TestNetworkAndCompute): + + def setUp(self): + super(TestNetworkAndComputeLister, self).setUp() + self.cmd = FakeNetworkAndComputeLister(self.app, self.namespace) + + +class TestNetworkAndComputeShowOne(TestNetworkAndCompute): + + def setUp(self): + super(TestNetworkAndComputeShowOne, self).setUp() + self.cmd = FakeNetworkAndComputeShowOne(self.app, self.namespace) diff --git a/openstackclient/tests/unit/network/v2/__init__.py b/openstackclient/tests/unit/network/v2/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/openstackclient/tests/unit/network/v2/__init__.py diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py new file mode 100644 index 00000000..ed30bad3 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/fakes.py @@ -0,0 +1,1100 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import argparse +import copy +import mock +import uuid + +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit import utils + + +QUOTA = { + "subnet": 10, + "network": 10, + "floatingip": 50, + "subnetpool": -1, + "security_group_rule": 100, + "security_group": 10, + "router": 10, + "rbac_policy": -1, + "port": 50, + "vip": 10, + "member": 10, + "health_monitor": 10, +} + + +class FakeNetworkV2Client(object): + + def __init__(self, **kwargs): + self.extensions = mock.Mock() + self.extensions.resource_class = fakes.FakeResource(None, {}) + + +class TestNetworkV2(utils.TestCommand): + + def setUp(self): + super(TestNetworkV2, self).setUp() + + self.namespace = argparse.Namespace() + + self.app.client_manager.session = mock.Mock() + + self.app.client_manager.network = FakeNetworkV2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + + self.app.client_manager.identity = ( + identity_fakes_v3.FakeIdentityv3Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + ) + + +class FakeAddressScope(object): + """Fake one or more address scopes.""" + + @staticmethod + def create_one_address_scope(attrs=None): + """Create a fake address scope. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object with name, id, etc. + """ + attrs = attrs or {} + + # Set default attributes. + address_scope_attrs = { + 'name': 'address-scope-name-' + uuid.uuid4().hex, + 'id': 'address-scope-id-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'shared': False, + 'ip_version': 4, + } + + # Overwrite default attributes. + address_scope_attrs.update(attrs) + + address_scope = fakes.FakeResource( + info=copy.deepcopy(address_scope_attrs), + loaded=True) + + # Set attributes with special mapping in OpenStack SDK. + address_scope.project_id = address_scope_attrs['tenant_id'] + + return address_scope + + @staticmethod + def create_address_scopes(attrs=None, count=2): + """Create multiple fake address scopes. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of address scopes to fake + :return: + A list of FakeResource objects faking the address scopes + """ + address_scopes = [] + for i in range(0, count): + address_scopes.append( + FakeAddressScope.create_one_address_scope(attrs)) + + return address_scopes + + @staticmethod + def get_address_scopes(address_scopes=None, count=2): + """Get an iterable MagicMock object with a list of faked address scopes. + + If address scopes list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List address scopes: + A list of FakeResource objects faking address scopes + :param int count: + The number of address scopes to fake + :return: + An iterable Mock object with side_effect set to a list of faked + address scopes + """ + if address_scopes is None: + address_scopes = FakeAddressScope.create_address_scopes(count) + return mock.MagicMock(side_effect=address_scopes) + + +class FakeAvailabilityZone(object): + """Fake one or more network availability zones (AZs).""" + + @staticmethod + def create_one_availability_zone(attrs=None): + """Create a fake AZ. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object with name, state, etc. + """ + attrs = attrs or {} + + # Set default attributes. + availability_zone = { + 'name': uuid.uuid4().hex, + 'state': 'available', + 'resource': 'network', + } + + # Overwrite default attributes. + availability_zone.update(attrs) + + availability_zone = fakes.FakeResource( + info=copy.deepcopy(availability_zone), + loaded=True) + return availability_zone + + @staticmethod + def create_availability_zones(attrs=None, count=2): + """Create multiple fake AZs. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of AZs to fake + :return: + A list of FakeResource objects faking the AZs + """ + availability_zones = [] + for i in range(0, count): + availability_zone = \ + FakeAvailabilityZone.create_one_availability_zone(attrs) + availability_zones.append(availability_zone) + + return availability_zones + + +class FakeIPAvailability(object): + """Fake one or more network ip availabilities.""" + + @staticmethod + def create_one_ip_availability(): + """Create a fake list with ip availability stats of a network. + + :return: + A FakeResource object with network_name, network_id, etc. + """ + + # Set default attributes. + network_ip_availability = { + 'network_id': 'network-id-' + uuid.uuid4().hex, + 'network_name': 'network-name-' + uuid.uuid4().hex, + 'tenant_id': '', + 'subnet_ip_availability': [], + 'total_ips': 254, + 'used_ips': 6, + } + + network_ip_availability = fakes.FakeResource( + info=copy.deepcopy(network_ip_availability), + loaded=True) + return network_ip_availability + + @staticmethod + def create_ip_availability(count=2): + """Create fake list of ip availability stats of multiple networks. + + :param int count: + The number of networks to fake + :return: + A list of FakeResource objects faking network ip availability stats + """ + network_ip_availabilities = [] + for i in range(0, count): + network_ip_availability = \ + FakeIPAvailability.create_one_ip_availability() + network_ip_availabilities.append(network_ip_availability) + + return network_ip_availabilities + + +class FakeExtension(object): + """Fake one or more extension.""" + + @staticmethod + def create_one_extension(attrs=None): + """Create a fake extension. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object with name, namespace, etc. + """ + attrs = attrs or {} + + # Set default attributes. + extension_info = { + 'name': 'name-' + uuid.uuid4().hex, + 'namespace': 'http://docs.openstack.org/network/', + 'description': 'description-' + uuid.uuid4().hex, + 'updated': '2013-07-09T12:00:0-00:00', + 'alias': 'Dystopian', + 'links': '[{"href":''"https://github.com/os/network", "type"}]', + } + + # Overwrite default attributes. + extension_info.update(attrs) + + extension = fakes.FakeResource( + info=copy.deepcopy(extension_info), + loaded=True) + return extension + + +class FakeNetwork(object): + """Fake one or more networks.""" + + @staticmethod + def create_one_network(attrs=None): + """Create a fake network. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, name, etc. + """ + attrs = attrs or {} + + # Set default attributes. + network_attrs = { + 'id': 'network-id-' + uuid.uuid4().hex, + 'name': 'network-name-' + uuid.uuid4().hex, + 'status': 'ACTIVE', + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'admin_state_up': True, + 'shared': False, + 'subnets': ['a', 'b'], + 'provider_network_type': 'vlan', + 'router:external': True, + 'availability_zones': [], + 'availability_zone_hints': [], + 'is_default': False, + 'port_security_enabled': True, + } + + # Overwrite default attributes. + network_attrs.update(attrs) + + network = fakes.FakeResource(info=copy.deepcopy(network_attrs), + loaded=True) + + # Set attributes with special mapping in OpenStack SDK. + network.project_id = network_attrs['tenant_id'] + network.is_router_external = network_attrs['router:external'] + network.is_port_security_enabled = \ + network_attrs['port_security_enabled'] + + return network + + @staticmethod + def create_networks(attrs=None, count=2): + """Create multiple fake networks. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of networks to fake + :return: + A list of FakeResource objects faking the networks + """ + networks = [] + for i in range(0, count): + networks.append(FakeNetwork.create_one_network(attrs)) + + return networks + + @staticmethod + def get_networks(networks=None, count=2): + """Get an iterable MagicMock object with a list of faked networks. + + If networks list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List networks: + A list of FakeResource objects faking networks + :param int count: + The number of networks to fake + :return: + An iterable Mock object with side_effect set to a list of faked + networks + """ + if networks is None: + networks = FakeNetwork.create_networks(count) + return mock.MagicMock(side_effect=networks) + + +class FakeNetworkSegment(object): + """Fake one or more network segments.""" + + @staticmethod + def create_one_network_segment(attrs=None): + """Create a fake network segment. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object faking the network segment + """ + attrs = attrs or {} + + # Set default attributes. + network_segment_attrs = { + 'id': 'network-segment-id-' + uuid.uuid4().hex, + 'network_id': 'network-id-' + uuid.uuid4().hex, + 'network_type': 'vlan', + 'physical_network': 'physical-network-name-' + uuid.uuid4().hex, + 'segmentation_id': 1024, + } + + # Overwrite default attributes. + network_segment_attrs.update(attrs) + + network_segment = fakes.FakeResource( + info=copy.deepcopy(network_segment_attrs), + loaded=True + ) + + return network_segment + + @staticmethod + def create_network_segments(attrs=None, count=2): + """Create multiple fake network segments. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of network segments to fake + :return: + A list of FakeResource objects faking the network segments + """ + network_segments = [] + for i in range(0, count): + network_segments.append( + FakeNetworkSegment.create_one_network_segment(attrs) + ) + return network_segments + + +class FakePort(object): + """Fake one or more ports.""" + + @staticmethod + def create_one_port(attrs=None): + """Create a fake port. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, name, etc. + """ + attrs = attrs or {} + + # Set default attributes. + port_attrs = { + 'admin_state_up': True, + 'allowed_address_pairs': [{}], + 'binding:host_id': 'binding-host-id-' + uuid.uuid4().hex, + 'binding:profile': {}, + 'binding:vif_details': {}, + 'binding:vif_type': 'ovs', + 'binding:vnic_type': 'normal', + 'device_id': 'device-id-' + uuid.uuid4().hex, + 'device_owner': 'compute:nova', + 'dns_assignment': [{}], + 'dns_name': 'dns-name-' + uuid.uuid4().hex, + 'extra_dhcp_opts': [{}], + 'fixed_ips': [{}], + 'id': 'port-id-' + uuid.uuid4().hex, + 'mac_address': 'fa:16:3e:a9:4e:72', + 'name': 'port-name-' + uuid.uuid4().hex, + 'network_id': 'network-id-' + uuid.uuid4().hex, + 'port_security_enabled': True, + 'security_groups': [], + 'status': 'ACTIVE', + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + } + + # Overwrite default attributes. + port_attrs.update(attrs) + + port = fakes.FakeResource(info=copy.deepcopy(port_attrs), + loaded=True) + + # Set attributes with special mappings in OpenStack SDK. + port.project_id = port_attrs['tenant_id'] + port.binding_host_id = port_attrs['binding:host_id'] + port.binding_profile = port_attrs['binding:profile'] + port.binding_vif_details = port_attrs['binding:vif_details'] + port.binding_vif_type = port_attrs['binding:vif_type'] + port.binding_vnic_type = port_attrs['binding:vnic_type'] + + return port + + @staticmethod + def create_ports(attrs=None, count=2): + """Create multiple fake ports. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of ports to fake + :return: + A list of FakeResource objects faking the ports + """ + ports = [] + for i in range(0, count): + ports.append(FakePort.create_one_port(attrs)) + + return ports + + @staticmethod + def get_ports(ports=None, count=2): + """Get an iterable MagicMock object with a list of faked ports. + + If ports list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List ports: + A list of FakeResource objects faking ports + :param int count: + The number of ports to fake + :return: + An iterable Mock object with side_effect set to a list of faked + ports + """ + if ports is None: + ports = FakePort.create_ports(count) + return mock.MagicMock(side_effect=ports) + + +class FakeNetworkAgent(object): + """Fake one or more network agents.""" + + @staticmethod + def create_one_network_agent(attrs=None): + """Create a fake network agent + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, agent_type, and so on. + """ + attrs = attrs or {} + + # Set default attributes + agent_attrs = { + 'id': 'agent-id-' + uuid.uuid4().hex, + 'agent_type': 'agent-type-' + uuid.uuid4().hex, + 'host': 'host-' + uuid.uuid4().hex, + 'availability_zone': 'zone-' + uuid.uuid4().hex, + 'alive': True, + 'admin_state_up': True, + 'binary': 'binary-' + uuid.uuid4().hex, + 'configurations': {'subnet': 2, 'networks': 1}, + } + agent_attrs.update(attrs) + agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs), + loaded=True) + return agent + + @staticmethod + def create_network_agents(attrs=None, count=2): + """Create multiple fake network agents. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of network agents to fake + :return: + A list of FakeResource objects faking the network agents + """ + agents = [] + for i in range(0, count): + agents.append(FakeNetworkAgent.create_one_network_agent(attrs)) + + return agents + + @staticmethod + def get_network_agents(agents=None, count=2): + """Get an iterable MagicMock object with a list of faked network agents. + + If network agents list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List agents: + A list of FakeResource objects faking network agents + :param int count: + The number of network agents to fake + :return: + An iterable Mock object with side_effect set to a list of faked + network agents + """ + if agents is None: + agents = FakeNetworkAgent.create_network_agents(count) + return mock.MagicMock(side_effect=agents) + + +class FakeNetworkRBAC(object): + """Fake one or more network rbac policies.""" + + @staticmethod + def create_one_network_rbac(attrs=None): + """Create a fake network rbac + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, action, target_tenant, + tenant_id, type + """ + attrs = attrs or {} + + # Set default attributes + rbac_attrs = { + 'id': 'rbac-id-' + uuid.uuid4().hex, + 'object_type': 'network', + 'object_id': 'object-id-' + uuid.uuid4().hex, + 'action': 'access_as_shared', + 'target_tenant': 'target-tenant-' + uuid.uuid4().hex, + 'tenant_id': 'tenant-id-' + uuid.uuid4().hex, + } + rbac_attrs.update(attrs) + rbac = fakes.FakeResource(info=copy.deepcopy(rbac_attrs), + loaded=True) + # Set attributes with special mapping in OpenStack SDK. + rbac.project_id = rbac_attrs['tenant_id'] + rbac.target_project_id = rbac_attrs['target_tenant'] + return rbac + + @staticmethod + def create_network_rbacs(attrs=None, count=2): + """Create multiple fake network rbac policies. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of rbac policies to fake + :return: + A list of FakeResource objects faking the rbac policies + """ + rbac_policies = [] + for i in range(0, count): + rbac_policies.append(FakeNetworkRBAC. + create_one_network_rbac(attrs)) + + return rbac_policies + + @staticmethod + def get_network_rbacs(rbac_policies=None, count=2): + """Get an iterable MagicMock object with a list of faked rbac policies. + + If rbac policies list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List rbac_policies: + A list of FakeResource objects faking rbac policies + :param int count: + The number of rbac policies to fake + :return: + An iterable Mock object with side_effect set to a list of faked + rbac policies + """ + if rbac_policies is None: + rbac_policies = FakeNetworkRBAC.create_network_rbacs(count) + return mock.MagicMock(side_effect=rbac_policies) + + +class FakeRouter(object): + """Fake one or more routers.""" + + @staticmethod + def create_one_router(attrs=None): + """Create a fake router. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, name, admin_state_up, + status, tenant_id + """ + attrs = attrs or {} + + # Set default attributes. + router_attrs = { + 'id': 'router-id-' + uuid.uuid4().hex, + 'name': 'router-name-' + uuid.uuid4().hex, + 'status': 'ACTIVE', + 'admin_state_up': True, + 'distributed': False, + 'ha': False, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'routes': [], + 'external_gateway_info': {}, + 'availability_zone_hints': [], + 'availability_zones': [], + } + + # Overwrite default attributes. + router_attrs.update(attrs) + + router = fakes.FakeResource(info=copy.deepcopy(router_attrs), + loaded=True) + + # Set attributes with special mapping in OpenStack SDK. + router.project_id = router_attrs['tenant_id'] + + return router + + @staticmethod + def create_routers(attrs=None, count=2): + """Create multiple fake routers. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of routers to fake + :return: + A list of FakeResource objects faking the routers + """ + routers = [] + for i in range(0, count): + routers.append(FakeRouter.create_one_router(attrs)) + + return routers + + @staticmethod + def get_routers(routers=None, count=2): + """Get an iterable MagicMock object with a list of faked routers. + + If routers list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List routers: + A list of FakeResource objects faking routers + :param int count: + The number of routers to fake + :return: + An iterable Mock object with side_effect set to a list of faked + routers + """ + if routers is None: + routers = FakeRouter.create_routers(count) + return mock.MagicMock(side_effect=routers) + + +class FakeSecurityGroup(object): + """Fake one or more security groups.""" + + @staticmethod + def create_one_security_group(attrs=None): + """Create a fake security group. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, name, etc. + """ + attrs = attrs or {} + + # Set default attributes. + security_group_attrs = { + 'id': 'security-group-id-' + uuid.uuid4().hex, + 'name': 'security-group-name-' + uuid.uuid4().hex, + 'description': 'security-group-description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'security_group_rules': [], + } + + # Overwrite default attributes. + security_group_attrs.update(attrs) + + security_group = fakes.FakeResource( + info=copy.deepcopy(security_group_attrs), + loaded=True) + + # Set attributes with special mapping in OpenStack SDK. + security_group.project_id = security_group_attrs['tenant_id'] + + return security_group + + @staticmethod + def create_security_groups(attrs=None, count=2): + """Create multiple fake security groups. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of security groups to fake + :return: + A list of FakeResource objects faking the security groups + """ + security_groups = [] + for i in range(0, count): + security_groups.append( + FakeSecurityGroup.create_one_security_group(attrs)) + + return security_groups + + @staticmethod + def get_security_groups(security_groups=None, count=2): + """Get an iterable MagicMock object with a list of faked security groups. + + If security groups list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List security groups: + A list of FakeResource objects faking security groups + :param int count: + The number of security groups to fake + :return: + An iterable Mock object with side_effect set to a list of faked + security groups + """ + if security_groups is None: + security_groups = FakeSecurityGroup.create_security_groups(count) + return mock.MagicMock(side_effect=security_groups) + + +class FakeSecurityGroupRule(object): + """Fake one or more security group rules.""" + + @staticmethod + def create_one_security_group_rule(attrs=None): + """Create a fake security group rule. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, etc. + """ + attrs = attrs or {} + + # Set default attributes. + security_group_rule_attrs = { + 'direction': 'ingress', + 'ethertype': 'IPv4', + 'id': 'security-group-rule-id-' + uuid.uuid4().hex, + 'port_range_max': None, + 'port_range_min': None, + 'protocol': 'tcp', + 'remote_group_id': None, + 'remote_ip_prefix': '0.0.0.0/0', + 'security_group_id': 'security-group-id-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + } + + # Overwrite default attributes. + security_group_rule_attrs.update(attrs) + + security_group_rule = fakes.FakeResource( + info=copy.deepcopy(security_group_rule_attrs), + loaded=True) + + # Set attributes with special mapping in OpenStack SDK. + security_group_rule.project_id = security_group_rule_attrs['tenant_id'] + + return security_group_rule + + @staticmethod + def create_security_group_rules(attrs=None, count=2): + """Create multiple fake security group rules. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of security group rules to fake + :return: + A list of FakeResource objects faking the security group rules + """ + security_group_rules = [] + for i in range(0, count): + security_group_rules.append( + FakeSecurityGroupRule.create_one_security_group_rule(attrs)) + + return security_group_rules + + @staticmethod + def get_security_group_rules(security_group_rules=None, count=2): + """Get an iterable MagicMock object with a list of faked security group rules. + + If security group rules list is provided, then initialize the Mock + object with the list. Otherwise create one. + + :param List security group rules: + A list of FakeResource objects faking security group rules + :param int count: + The number of security group rules to fake + :return: + An iterable Mock object with side_effect set to a list of faked + security group rules + """ + if security_group_rules is None: + security_group_rules = ( + FakeSecurityGroupRule.create_security_group_rules(count)) + return mock.MagicMock(side_effect=security_group_rules) + + +class FakeSubnet(object): + """Fake one or more subnets.""" + + @staticmethod + def create_one_subnet(attrs=None): + """Create a fake subnet. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object faking the subnet + """ + attrs = attrs or {} + + # Set default attributes. + project_id = 'project-id-' + uuid.uuid4().hex + subnet_attrs = { + 'id': 'subnet-id-' + uuid.uuid4().hex, + 'name': 'subnet-name-' + uuid.uuid4().hex, + 'network_id': 'network-id-' + uuid.uuid4().hex, + 'cidr': '10.10.10.0/24', + 'tenant_id': project_id, + 'enable_dhcp': True, + 'dns_nameservers': [], + 'allocation_pools': [], + 'host_routes': [], + 'ip_version': 4, + 'gateway_ip': '10.10.10.1', + 'ipv6_address_mode': None, + 'ipv6_ra_mode': None, + 'segment_id': None, + 'service_types': [], + 'subnetpool_id': None, + } + + # Overwrite default attributes. + subnet_attrs.update(attrs) + + subnet = fakes.FakeResource(info=copy.deepcopy(subnet_attrs), + loaded=True) + + # Set attributes with special mappings in OpenStack SDK. + subnet.project_id = subnet_attrs['tenant_id'] + + return subnet + + @staticmethod + def create_subnets(attrs=None, count=2): + """Create multiple fake subnets. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of subnets to fake + :return: + A list of FakeResource objects faking the subnets + """ + subnets = [] + for i in range(0, count): + subnets.append(FakeSubnet.create_one_subnet(attrs)) + + return subnets + + @staticmethod + def get_subnets(subnets=None, count=2): + """Get an iterable MagicMock object with a list of faked subnets. + + If subnets list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List subnets: + A list of FakeResource objects faking subnets + :param int count: + The number of subnets to fake + :return: + An iterable Mock object with side_effect set to a list of faked + subnets + """ + if subnets is None: + subnets = FakeSubnet.create_subnets(count) + return mock.MagicMock(side_effect=subnets) + + +class FakeFloatingIP(object): + """Fake one or more floating ip.""" + + @staticmethod + def create_one_floating_ip(attrs=None): + """Create a fake floating ip. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object, with id, ip, and so on + """ + attrs = attrs or {} + + # Set default attributes. + floating_ip_attrs = { + 'id': 'floating-ip-id-' + uuid.uuid4().hex, + 'floating_ip_address': '1.0.9.0', + 'fixed_ip_address': '2.0.9.0', + 'dns_domain': None, + 'dns_name': None, + 'status': 'DOWN', + 'floating_network_id': 'network-id-' + uuid.uuid4().hex, + 'router_id': 'router-id-' + uuid.uuid4().hex, + 'port_id': 'port-id-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + } + + # Overwrite default attributes. + floating_ip_attrs.update(attrs) + + floating_ip = fakes.FakeResource( + info=copy.deepcopy(floating_ip_attrs), + loaded=True + ) + + # Set attributes with special mappings in OpenStack SDK. + floating_ip.project_id = floating_ip_attrs['tenant_id'] + + return floating_ip + + @staticmethod + def create_floating_ips(attrs=None, count=2): + """Create multiple fake floating ips. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of floating ips to fake + :return: + A list of FakeResource objects faking the floating ips + """ + floating_ips = [] + for i in range(0, count): + floating_ips.append(FakeFloatingIP.create_one_floating_ip(attrs)) + return floating_ips + + @staticmethod + def get_floating_ips(floating_ips=None, count=2): + """Get an iterable MagicMock object with a list of faked floating ips. + + If floating_ips list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List floating ips: + A list of FakeResource objects faking floating ips + :param int count: + The number of floating ips to fake + :return: + An iterable Mock object with side_effect set to a list of faked + floating ips + """ + if floating_ips is None: + floating_ips = FakeFloatingIP.create_floating_ips(count) + return mock.MagicMock(side_effect=floating_ips) + + +class FakeSubnetPool(object): + """Fake one or more subnet pools.""" + + @staticmethod + def create_one_subnet_pool(attrs=None): + """Create a fake subnet pool. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object faking the subnet pool + """ + attrs = attrs or {} + + # Set default attributes. + subnet_pool_attrs = { + 'id': 'subnet-pool-id-' + uuid.uuid4().hex, + 'name': 'subnet-pool-name-' + uuid.uuid4().hex, + 'prefixes': ['10.0.0.0/24', '10.1.0.0/24'], + 'default_prefixlen': '8', + 'address_scope_id': 'address-scope-id-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'is_default': False, + 'shared': False, + 'max_prefixlen': '32', + 'min_prefixlen': '8', + 'default_quota': None, + 'ip_version': '4', + } + + # Overwrite default attributes. + subnet_pool_attrs.update(attrs) + + subnet_pool = fakes.FakeResource( + info=copy.deepcopy(subnet_pool_attrs), + loaded=True + ) + + # Set attributes with special mapping in OpenStack SDK. + subnet_pool.project_id = subnet_pool_attrs['tenant_id'] + + return subnet_pool + + @staticmethod + def create_subnet_pools(attrs=None, count=2): + """Create multiple fake subnet pools. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of subnet pools to fake + :return: + A list of FakeResource objects faking the subnet pools + """ + subnet_pools = [] + for i in range(0, count): + subnet_pools.append( + FakeSubnetPool.create_one_subnet_pool(attrs) + ) + + return subnet_pools + + @staticmethod + def get_subnet_pools(subnet_pools=None, count=2): + """Get an iterable MagicMock object with a list of faked subnet pools. + + If subnet_pools list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List subnet pools: + A list of FakeResource objects faking subnet pools + :param int count: + The number of subnet pools to fake + :return: + An iterable Mock object with side_effect set to a list of faked + subnet pools + """ + if subnet_pools is None: + subnet_pools = FakeSubnetPool.create_subnet_pools(count) + return mock.MagicMock(side_effect=subnet_pools) diff --git a/openstackclient/tests/unit/network/v2/test_address_scope.py b/openstackclient/tests/unit/network/v2/test_address_scope.py new file mode 100644 index 00000000..6d3f4011 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_address_scope.py @@ -0,0 +1,399 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import address_scope +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestAddressScope(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestAddressScope, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestCreateAddressScope(TestAddressScope): + + project = identity_fakes_v3.FakeProject.create_one_project() + domain = identity_fakes_v3.FakeDomain.create_one_domain() + # The new address scope created. + new_address_scope = ( + network_fakes.FakeAddressScope.create_one_address_scope( + attrs={ + 'tenant_id': project.id, + } + )) + columns = ( + 'id', + 'ip_version', + 'name', + 'project_id', + 'shared' + ) + data = ( + new_address_scope.id, + new_address_scope.ip_version, + new_address_scope.name, + new_address_scope.project_id, + new_address_scope.shared, + ) + + def setUp(self): + super(TestCreateAddressScope, self).setUp() + self.network.create_address_scope = mock.Mock( + return_value=self.new_address_scope) + + # Get the command object to test + self.cmd = address_scope.CreateAddressScope(self.app, self.namespace) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self.new_address_scope.name, + ] + verifylist = [ + ('project', None), + ('ip_version', self.new_address_scope.ip_version), + ('name', self.new_address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_address_scope.assert_called_once_with(**{ + 'ip_version': self.new_address_scope.ip_version, + 'name': self.new_address_scope.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + '--ip-version', str(self.new_address_scope.ip_version), + '--share', + '--project', self.project.name, + '--project-domain', self.domain.name, + self.new_address_scope.name, + ] + verifylist = [ + ('ip_version', self.new_address_scope.ip_version), + ('share', True), + ('project', self.project.name), + ('project_domain', self.domain.name), + ('name', self.new_address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_address_scope.assert_called_once_with(**{ + 'ip_version': self.new_address_scope.ip_version, + 'shared': True, + 'tenant_id': self.project.id, + 'name': self.new_address_scope.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_no_share(self): + arglist = [ + '--no-share', + self.new_address_scope.name, + ] + verifylist = [ + ('no_share', True), + ('name', self.new_address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_address_scope.assert_called_once_with(**{ + 'ip_version': self.new_address_scope.ip_version, + 'shared': False, + 'name': self.new_address_scope.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteAddressScope(TestAddressScope): + + # The address scope to delete. + _address_scopes = ( + network_fakes.FakeAddressScope.create_address_scopes(count=2)) + + def setUp(self): + super(TestDeleteAddressScope, self).setUp() + self.network.delete_address_scope = mock.Mock(return_value=None) + self.network.find_address_scope = ( + network_fakes.FakeAddressScope.get_address_scopes( + address_scopes=self._address_scopes) + ) + + # Get the command object to test + self.cmd = address_scope.DeleteAddressScope(self.app, self.namespace) + + def test_address_scope_delete(self): + arglist = [ + self._address_scopes[0].name, + ] + verifylist = [ + ('address_scope', [self._address_scopes[0].name]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_address_scope.assert_called_once_with( + self._address_scopes[0].name, ignore_missing=False) + self.network.delete_address_scope.assert_called_once_with( + self._address_scopes[0]) + self.assertIsNone(result) + + def test_multi_address_scopes_delete(self): + arglist = [] + verifylist = [] + + for a in self._address_scopes: + arglist.append(a.name) + verifylist = [ + ('address_scope', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for a in self._address_scopes: + calls.append(call(a)) + self.network.delete_address_scope.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_address_scopes_delete_with_exception(self): + arglist = [ + self._address_scopes[0].name, + 'unexist_address_scope', + ] + verifylist = [ + ('address_scope', + [self._address_scopes[0].name, 'unexist_address_scope']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._address_scopes[0], exceptions.CommandError] + self.network.find_address_scope = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 address scopes failed to delete.', str(e)) + + self.network.find_address_scope.assert_any_call( + self._address_scopes[0].name, ignore_missing=False) + self.network.find_address_scope.assert_any_call( + 'unexist_address_scope', ignore_missing=False) + self.network.delete_address_scope.assert_called_once_with( + self._address_scopes[0] + ) + + +class TestListAddressScope(TestAddressScope): + + # The address scopes to list up. + address_scopes = ( + network_fakes.FakeAddressScope.create_address_scopes(count=3)) + columns = ( + 'ID', + 'Name', + 'IP Version', + 'Shared', + 'Project', + ) + data = [] + for scope in address_scopes: + data.append(( + scope.id, + scope.name, + scope.ip_version, + scope.shared, + scope.project_id, + )) + + def setUp(self): + super(TestListAddressScope, self).setUp() + self.network.address_scopes = mock.Mock( + return_value=self.address_scopes) + + # Get the command object to test + self.cmd = address_scope.ListAddressScope(self.app, self.namespace) + + def test_address_scope_list(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.address_scopes.assert_called_once_with(**{}) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetAddressScope(TestAddressScope): + + # The address scope to set. + _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() + + def setUp(self): + super(TestSetAddressScope, self).setUp() + self.network.update_address_scope = mock.Mock(return_value=None) + self.network.find_address_scope = mock.Mock( + return_value=self._address_scope) + + # Get the command object to test + self.cmd = address_scope.SetAddressScope(self.app, self.namespace) + + def test_set_nothing(self): + arglist = [self._address_scope.name, ] + verifylist = [ + ('address_scope', self._address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_address_scope.assert_called_with( + self._address_scope, **attrs) + self.assertIsNone(result) + + def test_set_name_and_share(self): + arglist = [ + '--name', 'new_address_scope', + '--share', + self._address_scope.name, + ] + verifylist = [ + ('name', 'new_address_scope'), + ('share', True), + ('address_scope', self._address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'name': "new_address_scope", + 'shared': True, + } + self.network.update_address_scope.assert_called_with( + self._address_scope, **attrs) + self.assertIsNone(result) + + def test_set_no_share(self): + arglist = [ + '--no-share', + self._address_scope.name, + ] + verifylist = [ + ('no_share', True), + ('address_scope', self._address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'shared': False, + } + self.network.update_address_scope.assert_called_with( + self._address_scope, **attrs) + self.assertIsNone(result) + + +class TestShowAddressScope(TestAddressScope): + + # The address scope to show. + _address_scope = ( + network_fakes.FakeAddressScope.create_one_address_scope()) + columns = ( + 'id', + 'ip_version', + 'name', + 'project_id', + 'shared', + ) + data = ( + _address_scope.id, + _address_scope.ip_version, + _address_scope.name, + _address_scope.project_id, + _address_scope.shared, + ) + + def setUp(self): + super(TestShowAddressScope, self).setUp() + self.network.find_address_scope = mock.Mock( + return_value=self._address_scope) + + # Get the command object to test + self.cmd = address_scope.ShowAddressScope(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._address_scope.name, + ] + verifylist = [ + ('address_scope', self._address_scope.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_address_scope.assert_called_once_with( + self._address_scope.name, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip.py new file mode 100644 index 00000000..a40e48f4 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_floating_ip.py @@ -0,0 +1,565 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import floating_ip +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +# Tests for Neutron network +# +class TestFloatingIPNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestFloatingIPNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestCreateFloatingIPNetwork(TestFloatingIPNetwork): + + # Fake data for option tests. + floating_network = network_fakes.FakeNetwork.create_one_network() + subnet = network_fakes.FakeSubnet.create_one_subnet() + port = network_fakes.FakePort.create_one_port() + + # The floating ip to be deleted. + floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip( + attrs={ + 'floating_network_id': floating_network.id, + 'port_id': port.id, + } + ) + + columns = ( + 'dns_domain', + 'dns_name', + 'fixed_ip_address', + 'floating_ip_address', + 'floating_network_id', + 'id', + 'port_id', + 'project_id', + 'router_id', + 'status', + ) + + data = ( + floating_ip.dns_domain, + floating_ip.dns_name, + floating_ip.fixed_ip_address, + floating_ip.floating_ip_address, + floating_ip.floating_network_id, + floating_ip.id, + floating_ip.port_id, + floating_ip.project_id, + floating_ip.router_id, + floating_ip.status, + ) + + def setUp(self): + super(TestCreateFloatingIPNetwork, self).setUp() + + self.network.create_ip = mock.Mock(return_value=self.floating_ip) + + self.network.find_network = mock.Mock( + return_value=self.floating_network) + self.network.find_subnet = mock.Mock(return_value=self.subnet) + self.network.find_port = mock.Mock(return_value=self.port) + + # Get the command object to test + self.cmd = floating_ip.CreateFloatingIP(self.app, self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self.floating_ip.floating_network_id, + ] + verifylist = [ + ('network', self.floating_ip.floating_network_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_ip.assert_called_once_with(**{ + 'floating_network_id': self.floating_ip.floating_network_id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + '--subnet', self.subnet.id, + '--port', self.floating_ip.port_id, + '--floating-ip-address', self.floating_ip.floating_ip_address, + '--fixed-ip-address', self.floating_ip.fixed_ip_address, + self.floating_ip.floating_network_id, + ] + verifylist = [ + ('subnet', self.subnet.id), + ('port', self.floating_ip.port_id), + ('floating_ip_address', self.floating_ip.floating_ip_address), + ('fixed_ip_address', self.floating_ip.fixed_ip_address), + ('network', self.floating_ip.floating_network_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_ip.assert_called_once_with(**{ + 'subnet_id': self.subnet.id, + 'port_id': self.floating_ip.port_id, + 'floating_ip_address': self.floating_ip.floating_ip_address, + 'fixed_ip_address': self.floating_ip.fixed_ip_address, + 'floating_network_id': self.floating_ip.floating_network_id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): + + # The floating ips to be deleted. + floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2) + + def setUp(self): + super(TestDeleteFloatingIPNetwork, self).setUp() + + self.network.delete_ip = mock.Mock(return_value=None) + self.network.find_ip = ( + network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) + + # Get the command object to test + self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace) + + def test_floating_ip_delete(self): + arglist = [ + self.floating_ips[0].id, + ] + verifylist = [ + ('floating_ip', [self.floating_ips[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.find_ip.assert_called_once_with( + self.floating_ips[0].id, ignore_missing=False) + self.network.delete_ip.assert_called_once_with(self.floating_ips[0]) + self.assertIsNone(result) + + def test_multi_floating_ips_delete(self): + arglist = [] + verifylist = [] + + for f in self.floating_ips: + arglist.append(f.id) + verifylist = [ + ('floating_ip', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for f in self.floating_ips: + calls.append(call(f)) + self.network.delete_ip.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_floating_ips_delete_with_exception(self): + arglist = [ + self.floating_ips[0].id, + 'unexist_floating_ip', + ] + verifylist = [ + ('floating_ip', + [self.floating_ips[0].id, 'unexist_floating_ip']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.floating_ips[0], exceptions.CommandError] + self.network.find_ip = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) + + self.network.find_ip.assert_any_call( + self.floating_ips[0].id, ignore_missing=False) + self.network.find_ip.assert_any_call( + 'unexist_floating_ip', ignore_missing=False) + self.network.delete_ip.assert_called_once_with( + self.floating_ips[0] + ) + + +class TestListFloatingIPNetwork(TestFloatingIPNetwork): + + # The floating ips to list up + floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=3) + + columns = ( + 'ID', + 'Floating IP Address', + 'Fixed IP Address', + 'Port', + ) + + data = [] + for ip in floating_ips: + data.append(( + ip.id, + ip.floating_ip_address, + ip.fixed_ip_address, + ip.port_id, + )) + + def setUp(self): + super(TestListFloatingIPNetwork, self).setUp() + + self.network.ips = mock.Mock(return_value=self.floating_ips) + + # Get the command object to test + self.cmd = floating_ip.ListFloatingIP(self.app, self.namespace) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ips.assert_called_once_with(**{}) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowFloatingIPNetwork(TestFloatingIPNetwork): + + # The floating ip to display. + floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip() + + columns = ( + 'dns_domain', + 'dns_name', + 'fixed_ip_address', + 'floating_ip_address', + 'floating_network_id', + 'id', + 'port_id', + 'project_id', + 'router_id', + 'status', + ) + + data = ( + floating_ip.dns_domain, + floating_ip.dns_name, + floating_ip.fixed_ip_address, + floating_ip.floating_ip_address, + floating_ip.floating_network_id, + floating_ip.id, + floating_ip.port_id, + floating_ip.tenant_id, + floating_ip.router_id, + floating_ip.status, + ) + + def setUp(self): + super(TestShowFloatingIPNetwork, self).setUp() + + self.network.find_ip = mock.Mock(return_value=self.floating_ip) + + # Get the command object to test + self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace) + + def test_floating_ip_show(self): + arglist = [ + self.floating_ip.id, + ] + verifylist = [ + ('floating_ip', self.floating_ip.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_ip.assert_called_once_with( + self.floating_ip.id, + ignore_missing=False + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +# Tests for Nova network +# +class TestFloatingIPCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestFloatingIPCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestCreateFloatingIPCompute(TestFloatingIPCompute): + + # The floating ip to be deleted. + floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() + + columns = ( + 'fixed_ip', + 'id', + 'instance_id', + 'ip', + 'pool', + ) + + data = ( + floating_ip.fixed_ip, + floating_ip.id, + floating_ip.instance_id, + floating_ip.ip, + floating_ip.pool, + ) + + def setUp(self): + super(TestCreateFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.create.return_value = self.floating_ip + + # Get the command object to test + self.cmd = floating_ip.CreateFloatingIP(self.app, None) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self.floating_ip.pool, + ] + verifylist = [ + ('network', self.floating_ip.pool), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.create.assert_called_once_with( + self.floating_ip.pool) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteFloatingIPCompute(TestFloatingIPCompute): + + # The floating ips to be deleted. + floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2) + + def setUp(self): + super(TestDeleteFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.delete.return_value = None + + # Return value of utils.find_resource() + self.compute.floating_ips.get = ( + compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) + + # Get the command object to test + self.cmd = floating_ip.DeleteFloatingIP(self.app, None) + + def test_floating_ip_delete(self): + arglist = [ + self.floating_ips[0].id, + ] + verifylist = [ + ('floating_ip', [self.floating_ips[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.delete.assert_called_once_with( + self.floating_ips[0].id + ) + self.assertIsNone(result) + + def test_multi_floating_ips_delete(self): + arglist = [] + verifylist = [] + + for f in self.floating_ips: + arglist.append(f.id) + verifylist = [ + ('floating_ip', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for f in self.floating_ips: + calls.append(call(f.id)) + self.compute.floating_ips.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_floating_ips_delete_with_exception(self): + arglist = [ + self.floating_ips[0].id, + 'unexist_floating_ip', + ] + verifylist = [ + ('floating_ip', + [self.floating_ips[0].id, 'unexist_floating_ip']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.floating_ips[0], exceptions.CommandError] + self.compute.floating_ips.get = ( + mock.MagicMock(side_effect=find_mock_result) + ) + self.compute.floating_ips.find.side_effect = exceptions.NotFound(None) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) + + self.compute.floating_ips.get.assert_any_call( + self.floating_ips[0].id) + self.compute.floating_ips.get.assert_any_call( + 'unexist_floating_ip') + self.compute.floating_ips.delete.assert_called_once_with( + self.floating_ips[0].id + ) + + +class TestListFloatingIPCompute(TestFloatingIPCompute): + + # The floating ips to be list up + floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3) + + columns = ( + 'ID', + 'Floating IP Address', + 'Fixed IP Address', + 'Server', + 'Pool', + ) + + data = [] + for ip in floating_ips: + data.append(( + ip.id, + ip.ip, + ip.fixed_ip, + ip.instance_id, + ip.pool, + )) + + def setUp(self): + super(TestListFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.list.return_value = self.floating_ips + + # Get the command object to test + self.cmd = floating_ip.ListFloatingIP(self.app, None) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowFloatingIPCompute(TestFloatingIPCompute): + + # The floating ip to display. + floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() + + columns = ( + 'fixed_ip', + 'id', + 'instance_id', + 'ip', + 'pool', + ) + + data = ( + floating_ip.fixed_ip, + floating_ip.id, + floating_ip.instance_id, + floating_ip.ip, + floating_ip.pool, + ) + + def setUp(self): + super(TestShowFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Return value of utils.find_resource() + self.compute.floating_ips.get.return_value = self.floating_ip + + # Get the command object to test + self.cmd = floating_ip.ShowFloatingIP(self.app, None) + + def test_floating_ip_show(self): + arglist = [ + self.floating_ip.id, + ] + verifylist = [ + ('floating_ip', self.floating_ip.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py new file mode 100644 index 00000000..11d01d36 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py @@ -0,0 +1,97 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from osc_lib import exceptions + +from openstackclient.network.v2 import floating_ip_pool +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes + + +# Tests for Network API v2 +# +class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestFloatingIPPoolNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork): + + def setUp(self): + super(TestListFloatingIPPoolNetwork, self).setUp() + + # Get the command object to test + self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, + self.namespace) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + +# Tests for Compute network +# +class TestFloatingIPPoolCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestFloatingIPPoolCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestListFloatingIPPoolCompute(TestFloatingIPPoolCompute): + + # The floating ip pools to list up + floating_ip_pools = \ + compute_fakes.FakeFloatingIPPool.create_floating_ip_pools(count=3) + + columns = ( + 'Name', + ) + + data = [] + for pool in floating_ip_pools: + data.append(( + pool.name, + )) + + def setUp(self): + super(TestListFloatingIPPoolCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ip_pools.list.return_value = \ + self.floating_ip_pools + + # Get the command object to test + self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.floating_ip_pools.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_ip_availability.py b/openstackclient/tests/unit/network/v2/test_ip_availability.py new file mode 100644 index 00000000..c929ab82 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_ip_availability.py @@ -0,0 +1,172 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from osc_lib import utils as common_utils + +from openstackclient.network.v2 import ip_availability +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestIPAvailability(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestIPAvailability, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + + self.project = identity_fakes.FakeProject.create_one_project() + self.projects_mock.get.return_value = self.project + + +class TestListIPAvailability(TestIPAvailability): + + _ip_availability = \ + network_fakes.FakeIPAvailability.create_ip_availability(count=3) + columns = ( + 'Network ID', + 'Network Name', + 'Total IPs', + 'Used IPs', + ) + data = [] + for net in _ip_availability: + data.append(( + net.network_id, + net.network_name, + net.total_ips, + net.used_ips, + )) + + def setUp(self): + super(TestListIPAvailability, self).setUp() + + self.cmd = ip_availability.ListIPAvailability( + self.app, self.namespace) + self.network.network_ip_availabilities = mock.Mock( + return_value=self._ip_availability) + + def test_list_no_options(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'ip_version': 4} + + self.network.network_ip_availabilities.assert_called_once_with( + **filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_list_ip_version(self): + arglist = [ + '--ip-version', str(4), + ] + verifylist = [ + ('ip_version', 4) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'ip_version': 4} + + self.network.network_ip_availabilities.assert_called_once_with( + **filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_list_project(self): + arglist = [ + '--project', self.project.name + ] + verifylist = [ + ('project', self.project.name) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'tenant_id': self.project.id, + 'ip_version': 4} + + self.network.network_ip_availabilities.assert_called_once_with( + **filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowIPAvailability(TestIPAvailability): + + _ip_availability = \ + network_fakes.FakeIPAvailability.create_one_ip_availability() + + columns = ( + 'network_id', + 'network_name', + 'project_id', + 'subnet_ip_availability', + 'total_ips', + 'used_ips', + ) + data = ( + _ip_availability.network_id, + _ip_availability.network_name, + _ip_availability.tenant_id, + common_utils.format_list( + _ip_availability.subnet_ip_availability), + _ip_availability.total_ips, + _ip_availability.used_ips, + ) + + def setUp(self): + super(TestShowIPAvailability, self).setUp() + + self.network.find_network_ip_availability = mock.Mock( + return_value=self._ip_availability) + + # Get the command object to test + self.cmd = ip_availability.ShowIPAvailability( + self.app, self.namespace) + + def test_show_no_option(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._ip_availability.network_name, + ] + verifylist = [ + ('network', self._ip_availability.network_name) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + self.network.find_network_ip_availability.assert_called_once_with( + self._ip_availability.network_name, + ignore_missing=False) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_network.py b/openstackclient/tests/unit/network/v2/test_network.py new file mode 100644 index 00000000..84ead093 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_network.py @@ -0,0 +1,1059 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.network.v2 import network +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes_v2 +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +# Tests for Neutron network +# +class TestNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestCreateNetworkIdentityV3(TestNetwork): + + project = identity_fakes_v3.FakeProject.create_one_project() + domain = identity_fakes_v3.FakeDomain.create_one_domain() + # The new network created. + _network = network_fakes.FakeNetwork.create_one_network( + attrs={ + 'tenant_id': project.id, + 'availability_zone_hints': ["nova"], + } + ) + + columns = ( + 'admin_state_up', + 'availability_zone_hints', + 'availability_zones', + 'id', + 'is_default', + 'name', + 'port_security_enabled', + 'project_id', + 'provider_network_type', + 'router:external', + 'shared', + 'status', + 'subnets', + ) + + data = ( + network._format_admin_state(_network.admin_state_up), + utils.format_list(_network.availability_zone_hints), + utils.format_list(_network.availability_zones), + _network.id, + _network.is_default, + _network.name, + _network.is_port_security_enabled, + _network.project_id, + _network.provider_network_type, + network._format_router_external(_network.is_router_external), + _network.shared, + _network.status, + utils.format_list(_network.subnets), + ) + + def setUp(self): + super(TestCreateNetworkIdentityV3, self).setUp() + + self.network.create_network = mock.Mock(return_value=self._network) + + # Get the command object to test + self.cmd = network.CreateNetwork(self.app, self.namespace) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self._network.name, + ] + verifylist = [ + ('name', self._network.name), + ('enable', True), + ('share', None), + ('project', None), + ('external', False), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_network.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self._network.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + "--disable", + "--share", + "--project", self.project.name, + "--project-domain", self.domain.name, + "--availability-zone-hint", "nova", + "--external", "--default", + "--provider-network-type", "vlan", + "--provider-physical-network", "physnet1", + "--provider-segment", "400", + "--transparent-vlan", + "--enable-port-security", + self._network.name, + ] + verifylist = [ + ('disable', True), + ('share', True), + ('project', self.project.name), + ('project_domain', self.domain.name), + ('availability_zone_hints', ["nova"]), + ('external', True), + ('default', True), + ('provider_network_type', 'vlan'), + ('physical_network', 'physnet1'), + ('segmentation_id', '400'), + ('transparent_vlan', True), + ('enable_port_security', True), + ('name', self._network.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_network.assert_called_once_with(**{ + 'admin_state_up': False, + 'availability_zone_hints': ["nova"], + 'name': self._network.name, + 'shared': True, + 'tenant_id': self.project.id, + 'is_default': True, + 'router:external': True, + 'provider:network_type': 'vlan', + 'provider:physical_network': 'physnet1', + 'provider:segmentation_id': '400', + 'vlan_transparent': True, + 'port_security_enabled': True, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_other_options(self): + arglist = [ + "--enable", + "--no-share", + "--disable-port-security", + self._network.name, + ] + verifylist = [ + ('enable', True), + ('no_share', True), + ('name', self._network.name), + ('external', False), + ('disable_port_security', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_network.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self._network.name, + 'shared': False, + 'port_security_enabled': False, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestCreateNetworkIdentityV2(TestNetwork): + + project = identity_fakes_v2.FakeProject.create_one_project() + # The new network created. + _network = network_fakes.FakeNetwork.create_one_network( + attrs={'tenant_id': project.id} + ) + + columns = ( + 'admin_state_up', + 'availability_zone_hints', + 'availability_zones', + 'id', + 'is_default', + 'name', + 'port_security_enabled', + 'project_id', + 'provider_network_type', + 'router:external', + 'shared', + 'status', + 'subnets', + ) + + data = ( + network._format_admin_state(_network.admin_state_up), + utils.format_list(_network.availability_zone_hints), + utils.format_list(_network.availability_zones), + _network.id, + _network.is_default, + _network.name, + _network.is_port_security_enabled, + _network.project_id, + _network.provider_network_type, + network._format_router_external(_network.is_router_external), + _network.shared, + _network.status, + utils.format_list(_network.subnets), + ) + + def setUp(self): + super(TestCreateNetworkIdentityV2, self).setUp() + + self.network.create_network = mock.Mock(return_value=self._network) + + # Get the command object to test + self.cmd = network.CreateNetwork(self.app, self.namespace) + + # Set identity client v2. And get a shortcut to Identity client. + identity_client = identity_fakes_v2.FakeIdentityv2Client( + endpoint=fakes.AUTH_URL, + token=fakes.AUTH_TOKEN, + ) + self.app.client_manager.identity = identity_client + self.identity = self.app.client_manager.identity + + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.identity.tenants + self.projects_mock.get.return_value = self.project + + # There is no DomainManager Mock in fake identity v2. + + def test_create_with_project_identityv2(self): + arglist = [ + "--project", self.project.name, + self._network.name, + ] + verifylist = [ + ('enable', True), + ('share', None), + ('name', self._network.name), + ('project', self.project.name), + ('external', False), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_network.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self._network.name, + 'tenant_id': self.project.id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_with_domain_identityv2(self): + arglist = [ + "--project", self.project.name, + "--project-domain", "domain-name", + self._network.name, + ] + verifylist = [ + ('enable', True), + ('share', None), + ('project', self.project.name), + ('project_domain', "domain-name"), + ('name', self._network.name), + ('external', False), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises( + AttributeError, + self.cmd.take_action, + parsed_args, + ) + + +class TestDeleteNetwork(TestNetwork): + + def setUp(self): + super(TestDeleteNetwork, self).setUp() + + # The networks to delete + self._networks = network_fakes.FakeNetwork.create_networks(count=3) + + self.network.delete_network = mock.Mock(return_value=None) + + self.network.find_network = network_fakes.FakeNetwork.get_networks( + networks=self._networks) + + # Get the command object to test + self.cmd = network.DeleteNetwork(self.app, self.namespace) + + def test_delete_one_network(self): + arglist = [ + self._networks[0].name, + ] + verifylist = [ + ('network', [self._networks[0].name]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.delete_network.assert_called_once_with(self._networks[0]) + self.assertIsNone(result) + + def test_delete_multiple_networks(self): + arglist = [] + for n in self._networks: + arglist.append(n.id) + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for n in self._networks: + calls.append(call(n)) + self.network.delete_network.assert_has_calls(calls) + self.assertIsNone(result) + + def test_delete_multiple_networks_exception(self): + arglist = [ + self._networks[0].id, + 'xxxx-yyyy-zzzz', + self._networks[1].id, + ] + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # Fake exception in find_network() + ret_find = [ + self._networks[0], + exceptions.NotFound('404'), + self._networks[1], + ] + self.network.find_network = mock.Mock(side_effect=ret_find) + + # Fake exception in delete_network() + ret_delete = [ + None, + exceptions.NotFound('404'), + ] + self.network.delete_network = mock.Mock(side_effect=ret_delete) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + # The second call of find_network() should fail. So delete_network() + # was only called twice. + calls = [ + call(self._networks[0]), + call(self._networks[1]), + ] + self.network.delete_network.assert_has_calls(calls) + + +class TestListNetwork(TestNetwork): + + # The networks going to be listed up. + _network = network_fakes.FakeNetwork.create_networks(count=3) + + columns = ( + 'ID', + 'Name', + 'Subnets', + ) + columns_long = ( + 'ID', + 'Name', + 'Status', + 'Project', + 'State', + 'Shared', + 'Subnets', + 'Network Type', + 'Router Type', + 'Availability Zones', + ) + + data = [] + for net in _network: + data.append(( + net.id, + net.name, + utils.format_list(net.subnets), + )) + + data_long = [] + for net in _network: + data_long.append(( + net.id, + net.name, + net.status, + net.project_id, + network._format_admin_state(net.admin_state_up), + net.shared, + utils.format_list(net.subnets), + net.provider_network_type, + network._format_router_external(net.is_router_external), + utils.format_list(net.availability_zones), + )) + + def setUp(self): + super(TestListNetwork, self).setUp() + + # Get the command object to test + self.cmd = network.ListNetwork(self.app, self.namespace) + + self.network.networks = mock.Mock(return_value=self._network) + + def test_network_list_no_options(self): + arglist = [] + verifylist = [ + ('external', False), + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.network.networks.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_list_external(self): + arglist = [ + '--external', + ] + verifylist = [ + ('external', True), + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.network.networks.assert_called_once_with( + **{'router:external': True} + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_network_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ('external', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.network.networks.assert_called_once_with() + self.assertEqual(self.columns_long, columns) + self.assertEqual(self.data_long, list(data)) + + +class TestSetNetwork(TestNetwork): + + # The network to set. + _network = network_fakes.FakeNetwork.create_one_network() + + def setUp(self): + super(TestSetNetwork, self).setUp() + + self.network.update_network = mock.Mock(return_value=None) + + self.network.find_network = mock.Mock(return_value=self._network) + + # Get the command object to test + self.cmd = network.SetNetwork(self.app, self.namespace) + + def test_set_this(self): + arglist = [ + self._network.name, + '--enable', + '--name', 'noob', + '--share', + '--external', + '--default', + '--provider-network-type', 'vlan', + '--provider-physical-network', 'physnet1', + '--provider-segment', '400', + '--no-transparent-vlan', + '--enable-port-security', + ] + verifylist = [ + ('network', self._network.name), + ('enable', True), + ('name', 'noob'), + ('share', True), + ('external', True), + ('default', True), + ('provider_network_type', 'vlan'), + ('physical_network', 'physnet1'), + ('segmentation_id', '400'), + ('no_transparent_vlan', True), + ('enable_port_security', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'name': 'noob', + 'admin_state_up': True, + 'shared': True, + 'router:external': True, + 'is_default': True, + 'provider:network_type': 'vlan', + 'provider:physical_network': 'physnet1', + 'provider:segmentation_id': '400', + 'vlan_transparent': False, + 'port_security_enabled': True, + } + self.network.update_network.assert_called_once_with( + self._network, **attrs) + self.assertIsNone(result) + + def test_set_that(self): + arglist = [ + self._network.name, + '--disable', + '--no-share', + '--internal', + '--disable-port-security', + ] + verifylist = [ + ('network', self._network.name), + ('disable', True), + ('no_share', True), + ('internal', True), + ('disable_port_security', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': False, + 'shared': False, + 'router:external': False, + 'port_security_enabled': False, + } + self.network.update_network.assert_called_once_with( + self._network, **attrs) + self.assertIsNone(result) + + def test_set_nothing(self): + arglist = [self._network.name, ] + verifylist = [('network', self._network.name), ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_network.assert_called_once_with( + self._network, **attrs) + self.assertIsNone(result) + + +class TestShowNetwork(TestNetwork): + + # The network to show. + _network = network_fakes.FakeNetwork.create_one_network() + + columns = ( + 'admin_state_up', + 'availability_zone_hints', + 'availability_zones', + 'id', + 'is_default', + 'name', + 'port_security_enabled', + 'project_id', + 'provider_network_type', + 'router:external', + 'shared', + 'status', + 'subnets', + ) + + data = ( + network._format_admin_state(_network.admin_state_up), + utils.format_list(_network.availability_zone_hints), + utils.format_list(_network.availability_zones), + _network.id, + _network.is_default, + _network.name, + _network.is_port_security_enabled, + _network.project_id, + _network.provider_network_type, + network._format_router_external(_network.is_router_external), + _network.shared, + _network.status, + utils.format_list(_network.subnets), + ) + + def setUp(self): + super(TestShowNetwork, self).setUp() + + self.network.find_network = mock.Mock(return_value=self._network) + + # Get the command object to test + self.cmd = network.ShowNetwork(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._network.name, + ] + verifylist = [ + ('network', self._network.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_network.assert_called_once_with( + self._network.name, ignore_missing=False) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +# Tests for Nova network +# +class TestNetworkCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestNetworkCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestCreateNetworkCompute(TestNetworkCompute): + + # The network to create. + _network = compute_fakes.FakeNetwork.create_one_network() + + columns = ( + 'bridge', + 'bridge_interface', + 'broadcast', + 'cidr', + 'cidr_v6', + 'created_at', + 'deleted', + 'deleted_at', + 'dhcp_server', + 'dhcp_start', + 'dns1', + 'dns2', + 'enable_dhcp', + 'gateway', + 'gateway_v6', + 'host', + 'id', + 'injected', + 'label', + 'mtu', + 'multi_host', + 'netmask', + 'netmask_v6', + 'priority', + 'project_id', + 'rxtx_base', + 'share_address', + 'updated_at', + 'vlan', + 'vpn_private_address', + 'vpn_public_address', + 'vpn_public_port', + ) + + data = ( + _network.bridge, + _network.bridge_interface, + _network.broadcast, + _network.cidr, + _network.cidr_v6, + _network.created_at, + _network.deleted, + _network.deleted_at, + _network.dhcp_server, + _network.dhcp_start, + _network.dns1, + _network.dns2, + _network.enable_dhcp, + _network.gateway, + _network.gateway_v6, + _network.host, + _network.id, + _network.injected, + _network.label, + _network.mtu, + _network.multi_host, + _network.netmask, + _network.netmask_v6, + _network.priority, + _network.project_id, + _network.rxtx_base, + _network.share_address, + _network.updated_at, + _network.vlan, + _network.vpn_private_address, + _network.vpn_public_address, + _network.vpn_public_port, + ) + + def setUp(self): + super(TestCreateNetworkCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.networks.create.return_value = self._network + + # Get the command object to test + self.cmd = network.CreateNetwork(self.app, None) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should raise exception here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + "--subnet", self._network.cidr, + self._network.label, + ] + verifylist = [ + ('subnet', self._network.cidr), + ('name', self._network.label), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.networks.create.assert_called_once_with(**{ + 'cidr': self._network.cidr, + 'label': self._network.label, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteNetworkCompute(TestNetworkCompute): + + def setUp(self): + super(TestDeleteNetworkCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # The networks to delete + self._networks = compute_fakes.FakeNetwork.create_networks(count=3) + + self.compute.networks.delete.return_value = None + + # Return value of utils.find_resource() + self.compute.networks.get = \ + compute_fakes.FakeNetwork.get_networks(networks=self._networks) + + # Get the command object to test + self.cmd = network.DeleteNetwork(self.app, None) + + def test_delete_one_network(self): + arglist = [ + self._networks[0].label, + ] + verifylist = [ + ('network', [self._networks[0].label]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.networks.delete.assert_called_once_with( + self._networks[0].id) + self.assertIsNone(result) + + def test_delete_multiple_networks(self): + arglist = [] + for n in self._networks: + arglist.append(n.label) + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for n in self._networks: + calls.append(call(n.id)) + self.compute.networks.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_delete_multiple_networks_exception(self): + arglist = [ + self._networks[0].id, + 'xxxx-yyyy-zzzz', + self._networks[1].id, + ] + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # Fake exception in utils.find_resource() + # In compute v2, we use utils.find_resource() to find a network. + # It calls get() several times, but find() only one time. So we + # choose to fake get() always raise exception, then pass through. + # And fake find() to find the real network or not. + self.compute.networks.get.side_effect = Exception() + ret_find = [ + self._networks[0], + Exception(), + self._networks[1], + ] + self.compute.networks.find.side_effect = ret_find + + # Fake exception in delete() + ret_delete = [ + None, + Exception(), + ] + self.compute.networks.delete = mock.Mock(side_effect=ret_delete) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + # The second call of utils.find_resource() should fail. So delete() + # was only called twice. + calls = [ + call(self._networks[0].id), + call(self._networks[1].id), + ] + self.compute.networks.delete.assert_has_calls(calls) + + +class TestListNetworkCompute(TestNetworkCompute): + + # The networks going to be listed up. + _networks = compute_fakes.FakeNetwork.create_networks(count=3) + + columns = ( + 'ID', + 'Name', + 'Subnet', + ) + + data = [] + for net in _networks: + data.append(( + net.id, + net.label, + net.cidr, + )) + + def setUp(self): + super(TestListNetworkCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.networks.list.return_value = self._networks + + # Get the command object to test + self.cmd = network.ListNetwork(self.app, None) + + def test_network_list_no_options(self): + arglist = [] + verifylist = [ + ('external', False), + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.compute.networks.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowNetworkCompute(TestNetworkCompute): + + # The network to show. + _network = compute_fakes.FakeNetwork.create_one_network() + + columns = ( + 'bridge', + 'bridge_interface', + 'broadcast', + 'cidr', + 'cidr_v6', + 'created_at', + 'deleted', + 'deleted_at', + 'dhcp_server', + 'dhcp_start', + 'dns1', + 'dns2', + 'enable_dhcp', + 'gateway', + 'gateway_v6', + 'host', + 'id', + 'injected', + 'label', + 'mtu', + 'multi_host', + 'netmask', + 'netmask_v6', + 'priority', + 'project_id', + 'rxtx_base', + 'share_address', + 'updated_at', + 'vlan', + 'vpn_private_address', + 'vpn_public_address', + 'vpn_public_port', + ) + + data = ( + _network.bridge, + _network.bridge_interface, + _network.broadcast, + _network.cidr, + _network.cidr_v6, + _network.created_at, + _network.deleted, + _network.deleted_at, + _network.dhcp_server, + _network.dhcp_start, + _network.dns1, + _network.dns2, + _network.enable_dhcp, + _network.gateway, + _network.gateway_v6, + _network.host, + _network.id, + _network.injected, + _network.label, + _network.mtu, + _network.multi_host, + _network.netmask, + _network.netmask_v6, + _network.priority, + _network.project_id, + _network.rxtx_base, + _network.share_address, + _network.updated_at, + _network.vlan, + _network.vpn_private_address, + _network.vpn_public_address, + _network.vpn_public_port, + ) + + def setUp(self): + super(TestShowNetworkCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Return value of utils.find_resource() + self.compute.networks.get.return_value = self._network + + # Get the command object to test + self.cmd = network.ShowNetwork(self.app, None) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._network.label, + ] + verifylist = [ + ('network', self._network.label), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_network_agent.py b/openstackclient/tests/unit/network/v2/test_network_agent.py new file mode 100644 index 00000000..2f17f41b --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_network_agent.py @@ -0,0 +1,294 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.network.v2 import network_agent +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestNetworkAgent(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNetworkAgent, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestDeleteNetworkAgent(TestNetworkAgent): + + network_agents = ( + network_fakes.FakeNetworkAgent.create_network_agents(count=2)) + + def setUp(self): + super(TestDeleteNetworkAgent, self).setUp() + self.network.delete_agent = mock.Mock(return_value=None) + self.network.get_agent = ( + network_fakes.FakeNetworkAgent.get_network_agents( + agents=self.network_agents) + ) + + # Get the command object to test + self.cmd = network_agent.DeleteNetworkAgent(self.app, self.namespace) + + def test_network_agent_delete(self): + arglist = [ + self.network_agents[0].id, + ] + verifylist = [ + ('network_agent', [self.network_agents[0].id]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.get_agent.assert_called_once_with( + self.network_agents[0].id, ignore_missing=False) + self.network.delete_agent.assert_called_once_with( + self.network_agents[0]) + self.assertIsNone(result) + + def test_multi_network_agents_delete(self): + arglist = [] + verifylist = [] + + for n in self.network_agents: + arglist.append(n.id) + verifylist = [ + ('network_agent', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for n in self.network_agents: + calls.append(call(n)) + self.network.delete_agent.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_network_agents_delete_with_exception(self): + arglist = [ + self.network_agents[0].id, + 'unexist_network_agent', + ] + verifylist = [ + ('network_agent', + [self.network_agents[0].id, 'unexist_network_agent']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.network_agents[0], exceptions.CommandError] + self.network.get_agent = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 network agents failed to delete.', str(e)) + + self.network.get_agent.assert_any_call( + self.network_agents[0].id, ignore_missing=False) + self.network.get_agent.assert_any_call( + 'unexist_network_agent', ignore_missing=False) + self.network.delete_agent.assert_called_once_with( + self.network_agents[0] + ) + + +class TestListNetworkAgent(TestNetworkAgent): + + network_agents = ( + network_fakes.FakeNetworkAgent.create_network_agents(count=3)) + + columns = ( + 'ID', + 'Agent Type', + 'Host', + 'Availability Zone', + 'Alive', + 'State', + 'Binary' + ) + data = [] + for agent in network_agents: + data.append(( + agent.id, + agent.agent_type, + agent.host, + agent.availability_zone, + agent.alive, + network_agent._format_admin_state(agent.admin_state_up), + agent.binary, + )) + + def setUp(self): + super(TestListNetworkAgent, self).setUp() + self.network.agents = mock.Mock( + return_value=self.network_agents) + + # Get the command object to test + self.cmd = network_agent.ListNetworkAgent(self.app, self.namespace) + + def test_network_agents_list(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.agents.assert_called_once_with(**{}) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetNetworkAgent(TestNetworkAgent): + + _network_agent = ( + network_fakes.FakeNetworkAgent.create_one_network_agent()) + + def setUp(self): + super(TestSetNetworkAgent, self).setUp() + self.network.update_agent = mock.Mock(return_value=None) + self.network.get_agent = mock.Mock(return_value=self._network_agent) + + # Get the command object to test + self.cmd = network_agent.SetNetworkAgent(self.app, self.namespace) + + def test_set_nothing(self): + arglist = [ + self._network_agent.id, + ] + verifylist = [ + ('network_agent', self._network_agent.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_agent.assert_called_once_with( + self._network_agent, **attrs) + self.assertIsNone(result) + + def test_set_all(self): + arglist = [ + '--description', 'new_description', + '--enable', + self._network_agent.id, + ] + verifylist = [ + ('description', 'new_description'), + ('enable', True), + ('disable', False), + ('network_agent', self._network_agent.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'description': 'new_description', + 'admin_state_up': True, + } + self.network.update_agent.assert_called_once_with( + self._network_agent, **attrs) + self.assertIsNone(result) + + def test_set_with_disable(self): + arglist = [ + '--disable', + self._network_agent.id, + ] + verifylist = [ + ('enable', False), + ('disable', True), + ('network_agent', self._network_agent.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': False, + } + self.network.update_agent.assert_called_once_with( + self._network_agent, **attrs) + self.assertIsNone(result) + + +class TestShowNetworkAgent(TestNetworkAgent): + + _network_agent = ( + network_fakes.FakeNetworkAgent.create_one_network_agent()) + + columns = ( + 'admin_state_up', + 'agent_type', + 'alive', + 'availability_zone', + 'binary', + 'configurations', + 'host', + 'id', + ) + data = ( + network_agent._format_admin_state(_network_agent.admin_state_up), + _network_agent.agent_type, + _network_agent.alive, + _network_agent.availability_zone, + _network_agent.binary, + utils.format_dict(_network_agent.configurations), + _network_agent.host, + _network_agent.id, + ) + + def setUp(self): + super(TestShowNetworkAgent, self).setUp() + self.network.get_agent = mock.Mock( + return_value=self._network_agent) + + # Get the command object to test + self.cmd = network_agent.ShowNetworkAgent(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._network_agent.id, + ] + verifylist = [ + ('network_agent', self._network_agent.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.get_agent.assert_called_once_with( + self._network_agent.id, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_network_rbac.py b/openstackclient/tests/unit/network/v2/test_network_rbac.py new file mode 100644 index 00000000..1cd18a09 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_network_rbac.py @@ -0,0 +1,430 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import network_rbac +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestNetworkRBAC(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNetworkRBAC, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + + +class TestCreateNetworkRBAC(TestNetworkRBAC): + + network_object = network_fakes.FakeNetwork.create_one_network() + project = identity_fakes_v3.FakeProject.create_one_project() + rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( + attrs={'tenant_id': project.id, + 'target_tenant': project.id, + 'object_id': network_object.id} + ) + + columns = ( + 'action', + 'id', + 'object_id', + 'object_type', + 'project_id', + 'target_project_id', + ) + + data = [ + rbac_policy.action, + rbac_policy.id, + rbac_policy.object_id, + rbac_policy.object_type, + rbac_policy.tenant_id, + rbac_policy.target_tenant, + ] + + def setUp(self): + super(TestCreateNetworkRBAC, self).setUp() + + # Get the command object to test + self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace) + + self.network.create_rbac_policy = mock.Mock( + return_value=self.rbac_policy) + self.network.find_network = mock.Mock( + return_value=self.network_object) + self.projects_mock.get.return_value = self.project + + def test_network_rbac_create_no_type(self): + arglist = [ + '--action', self.rbac_policy.action, + self.rbac_policy.object_id, + ] + verifylist = [ + ('action', self.rbac_policy.action), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_no_action(self): + arglist = [ + '--type', self.rbac_policy.object_type, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_invalid_type(self): + arglist = [ + '--action', self.rbac_policy.action, + '--type', 'invalid_type', + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('action', self.rbac_policy.action), + ('type', 'invalid_type'), + ('target-project', self.rbac_policy.target_tenant), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_invalid_action(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', 'invalid_action', + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', 'invalid_action'), + ('target-project', self.rbac_policy.target_tenant), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', self.rbac_policy.action, + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', self.rbac_policy.action), + ('target_project', self.rbac_policy.target_tenant), + ('rbac_object', self.rbac_policy.object_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_rbac_policy.assert_called_with(**{ + 'object_id': self.rbac_policy.object_id, + 'object_type': self.rbac_policy.object_type, + 'action': self.rbac_policy.action, + 'target_tenant': self.rbac_policy.target_tenant, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_network_rbac_create_all_options(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', self.rbac_policy.action, + '--target-project', self.rbac_policy.target_tenant, + '--project', self.rbac_policy.tenant_id, + '--project-domain', self.project.domain_id, + '--target-project-domain', self.project.domain_id, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', self.rbac_policy.action), + ('target_project', self.rbac_policy.target_tenant), + ('project', self.rbac_policy.tenant_id), + ('project_domain', self.project.domain_id), + ('target_project_domain', self.project.domain_id), + ('rbac_object', self.rbac_policy.object_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_rbac_policy.assert_called_with(**{ + 'object_id': self.rbac_policy.object_id, + 'object_type': self.rbac_policy.object_type, + 'action': self.rbac_policy.action, + 'target_tenant': self.rbac_policy.target_tenant, + 'tenant_id': self.rbac_policy.tenant_id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestDeleteNetworkRBAC(TestNetworkRBAC): + + rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2) + + def setUp(self): + super(TestDeleteNetworkRBAC, self).setUp() + self.network.delete_rbac_policy = mock.Mock(return_value=None) + self.network.find_rbac_policy = ( + network_fakes.FakeNetworkRBAC.get_network_rbacs( + rbac_policies=self.rbac_policies) + ) + + # Get the command object to test + self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace) + + def test_network_rbac_delete(self): + arglist = [ + self.rbac_policies[0].id, + ] + verifylist = [ + ('rbac_policy', [self.rbac_policies[0].id]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_rbac_policy.assert_called_once_with( + self.rbac_policies[0].id, ignore_missing=False) + self.network.delete_rbac_policy.assert_called_once_with( + self.rbac_policies[0]) + self.assertIsNone(result) + + def test_multi_network_rbacs_delete(self): + arglist = [] + verifylist = [] + + for r in self.rbac_policies: + arglist.append(r.id) + verifylist = [ + ('rbac_policy', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for r in self.rbac_policies: + calls.append(call(r)) + self.network.delete_rbac_policy.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_network_policies_delete_with_exception(self): + arglist = [ + self.rbac_policies[0].id, + 'unexist_rbac_policy', + ] + verifylist = [ + ('rbac_policy', + [self.rbac_policies[0].id, 'unexist_rbac_policy']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.rbac_policies[0], exceptions.CommandError] + self.network.find_rbac_policy = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e)) + + self.network.find_rbac_policy.assert_any_call( + self.rbac_policies[0].id, ignore_missing=False) + self.network.find_rbac_policy.assert_any_call( + 'unexist_rbac_policy', ignore_missing=False) + self.network.delete_rbac_policy.assert_called_once_with( + self.rbac_policies[0] + ) + + +class TestListNetworkRABC(TestNetworkRBAC): + + # The network rbac policies going to be listed up. + rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=3) + + columns = ( + 'ID', + 'Object Type', + 'Object ID', + ) + + data = [] + for r in rbac_policies: + data.append(( + r.id, + r.object_type, + r.object_id, + )) + + def setUp(self): + super(TestListNetworkRABC, self).setUp() + + # Get the command object to test + self.cmd = network_rbac.ListNetworkRBAC(self.app, self.namespace) + + self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies) + + def test_network_rbac_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.rbac_policies.assert_called_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetNetworkRBAC(TestNetworkRBAC): + + project = identity_fakes_v3.FakeProject.create_one_project() + rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( + attrs={'target_tenant': project.id}) + + def setUp(self): + super(TestSetNetworkRBAC, self).setUp() + + # Get the command object to test + self.cmd = network_rbac.SetNetworkRBAC(self.app, self.namespace) + + self.network.find_rbac_policy = mock.Mock( + return_value=self.rbac_policy) + self.network.update_rbac_policy = mock.Mock(return_value=None) + self.projects_mock.get.return_value = self.project + + def test_network_rbac_set_nothing(self): + arglist = [ + self.rbac_policy.id, + ] + verifylist = [ + ('rbac_policy', self.rbac_policy.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_rbac_policy.assert_called_once_with( + self.rbac_policy.id, ignore_missing=False + ) + attrs = {} + self.network.update_rbac_policy.assert_called_once_with( + self.rbac_policy, **attrs) + self.assertIsNone(result) + + def test_network_rbac_set(self): + arglist = [ + '--target-project', self.project.id, + self.rbac_policy.id, + ] + verifylist = [ + ('target_project', self.project.id), + ('rbac_policy', self.rbac_policy.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_rbac_policy.assert_called_once_with( + self.rbac_policy.id, ignore_missing=False + ) + attrs = {'target_tenant': self.project.id} + self.network.update_rbac_policy.assert_called_once_with( + self.rbac_policy, **attrs) + self.assertIsNone(result) + + +class TestShowNetworkRBAC(TestNetworkRBAC): + + rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac() + + columns = ( + 'action', + 'id', + 'object_id', + 'object_type', + 'project_id', + 'target_project_id', + ) + + data = [ + rbac_policy.action, + rbac_policy.id, + rbac_policy.object_id, + rbac_policy.object_type, + rbac_policy.tenant_id, + rbac_policy.target_tenant, + ] + + def setUp(self): + super(TestShowNetworkRBAC, self).setUp() + + # Get the command object to test + self.cmd = network_rbac.ShowNetworkRBAC(self.app, self.namespace) + + self.network.find_rbac_policy = mock.Mock( + return_value=self.rbac_policy) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_show_all_options(self): + arglist = [ + self.rbac_policy.id, + ] + verifylist = [ + ('rbac_policy', self.rbac_policy.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_rbac_policy.assert_called_with( + self.rbac_policy.id, ignore_missing=False + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_network_segment.py b/openstackclient/tests/unit/network/v2/test_network_segment.py new file mode 100644 index 00000000..b9fce078 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_network_segment.py @@ -0,0 +1,200 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from osc_lib import exceptions + +from openstackclient.network.v2 import network_segment +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestNetworkSegment(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNetworkSegment, self).setUp() + + # Enable beta commands. + self.app.options.os_beta_command = True + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestListNetworkSegment(TestNetworkSegment): + _network = network_fakes.FakeNetwork.create_one_network() + _network_segments = \ + network_fakes.FakeNetworkSegment.create_network_segments(count=3) + + columns = ( + 'ID', + 'Network', + 'Network Type', + 'Segment', + ) + columns_long = columns + ( + 'Physical Network', + ) + + data = [] + for _network_segment in _network_segments: + data.append(( + _network_segment.id, + _network_segment.network_id, + _network_segment.network_type, + _network_segment.segmentation_id, + )) + + data_long = [] + for _network_segment in _network_segments: + data_long.append(( + _network_segment.id, + _network_segment.network_id, + _network_segment.network_type, + _network_segment.segmentation_id, + _network_segment.physical_network, + )) + + def setUp(self): + super(TestListNetworkSegment, self).setUp() + + # Get the command object to test + self.cmd = network_segment.ListNetworkSegment(self.app, self.namespace) + + self.network.find_network = mock.Mock(return_value=self._network) + self.network.segments = mock.Mock(return_value=self._network_segments) + + def test_list_no_option(self): + arglist = [] + verifylist = [ + ('long', False), + ('network', None), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.segments.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_list_no_beta_commands(self): + self.app.options.os_beta_command = False + parsed_args = self.check_parser(self.cmd, [], []) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ('network', None), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.segments.assert_called_once_with() + self.assertEqual(self.columns_long, columns) + self.assertEqual(self.data_long, list(data)) + + def test_list_network(self): + arglist = [ + '--network', + self._network.id, + ] + verifylist = [ + ('long', False), + ('network', self._network.id) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.segments.assert_called_once_with( + **{'network_id': self._network.id} + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowNetworkSegment(TestNetworkSegment): + + # The network segment to show. + _network_segment = \ + network_fakes.FakeNetworkSegment.create_one_network_segment() + + columns = ( + 'id', + 'network_id', + 'network_type', + 'physical_network', + 'segmentation_id', + ) + + data = ( + _network_segment.id, + _network_segment.network_id, + _network_segment.network_type, + _network_segment.physical_network, + _network_segment.segmentation_id, + ) + + def setUp(self): + super(TestShowNetworkSegment, self).setUp() + + self.network.find_segment = mock.Mock( + return_value=self._network_segment + ) + + # Get the command object to test + self.cmd = network_segment.ShowNetworkSegment(self.app, self.namespace) + + def test_show_no_options(self): + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, [], []) + + def test_show_no_beta_commands(self): + arglist = [ + self._network_segment.id, + ] + verifylist = [ + ('network_segment', self._network_segment.id), + ] + self.app.options.os_beta_command = False + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_show_all_options(self): + arglist = [ + self._network_segment.id, + ] + verifylist = [ + ('network_segment', self._network_segment.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_segment.assert_called_once_with( + self._network_segment.id, + ignore_missing=False + ) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_port.py b/openstackclient/tests/unit/network/v2/test_port.py new file mode 100644 index 00000000..d5d7f330 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_port.py @@ -0,0 +1,696 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import argparse +import mock + +from mock import call +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.network.v2 import port +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestPort(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestPort, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + def _get_common_cols_data(self, fake_port): + columns = ( + 'admin_state_up', + 'allowed_address_pairs', + 'binding_host_id', + 'binding_profile', + 'binding_vif_details', + 'binding_vif_type', + 'binding_vnic_type', + 'device_id', + 'device_owner', + 'dns_assignment', + 'dns_name', + 'extra_dhcp_opts', + 'fixed_ips', + 'id', + 'mac_address', + 'name', + 'network_id', + 'port_security_enabled', + 'project_id', + 'security_groups', + 'status', + ) + + data = ( + port._format_admin_state(fake_port.admin_state_up), + utils.format_list_of_dicts(fake_port.allowed_address_pairs), + fake_port.binding_host_id, + utils.format_dict(fake_port.binding_profile), + utils.format_dict(fake_port.binding_vif_details), + fake_port.binding_vif_type, + fake_port.binding_vnic_type, + fake_port.device_id, + fake_port.device_owner, + utils.format_list_of_dicts(fake_port.dns_assignment), + fake_port.dns_name, + utils.format_list_of_dicts(fake_port.extra_dhcp_opts), + utils.format_list_of_dicts(fake_port.fixed_ips), + fake_port.id, + fake_port.mac_address, + fake_port.name, + fake_port.network_id, + fake_port.port_security_enabled, + fake_port.project_id, + utils.format_list(fake_port.security_groups), + fake_port.status, + ) + + return columns, data + + +class TestCreatePort(TestPort): + + _port = network_fakes.FakePort.create_one_port() + + def setUp(self): + super(TestCreatePort, self).setUp() + + self.network.create_port = mock.Mock(return_value=self._port) + fake_net = network_fakes.FakeNetwork.create_one_network({ + 'id': self._port.network_id, + }) + self.network.find_network = mock.Mock(return_value=fake_net) + self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() + self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) + # Get the command object to test + self.cmd = port.CreatePort(self.app, self.namespace) + + def test_create_default_options(self): + arglist = [ + '--network', self._port.network_id, + 'test-port', + ] + verifylist = [ + ('network', self._port.network_id,), + ('enable', True), + ('name', 'test-port'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_port.assert_called_once_with(**{ + 'admin_state_up': True, + 'network_id': self._port.network_id, + 'name': 'test-port', + }) + + ref_columns, ref_data = self._get_common_cols_data(self._port) + self.assertEqual(ref_columns, columns) + self.assertEqual(ref_data, data) + + def test_create_full_options(self): + arglist = [ + '--mac-address', 'aa:aa:aa:aa:aa:aa', + '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2' + % self.fake_subnet.id, + '--device', 'deviceid', + '--device-owner', 'fakeowner', + '--disable', + '--vnic-type', 'macvtap', + '--binding-profile', 'foo=bar', + '--binding-profile', 'foo2=bar2', + '--network', self._port.network_id, + 'test-port', + + ] + verifylist = [ + ('mac_address', 'aa:aa:aa:aa:aa:aa'), + ( + 'fixed_ip', + [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}] + ), + ('device', 'deviceid'), + ('device_owner', 'fakeowner'), + ('disable', True), + ('vnic_type', 'macvtap'), + ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), + ('network', self._port.network_id), + ('name', 'test-port'), + + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_port.assert_called_once_with(**{ + 'mac_address': 'aa:aa:aa:aa:aa:aa', + 'fixed_ips': [{'subnet_id': self.fake_subnet.id, + 'ip_address': '10.0.0.2'}], + 'device_id': 'deviceid', + 'device_owner': 'fakeowner', + 'admin_state_up': False, + 'binding:vnic_type': 'macvtap', + 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, + 'network_id': self._port.network_id, + 'name': 'test-port', + }) + + ref_columns, ref_data = self._get_common_cols_data(self._port) + self.assertEqual(ref_columns, columns) + self.assertEqual(ref_data, data) + + def test_create_invalid_json_binding_profile(self): + arglist = [ + '--network', self._port.network_id, + '--binding-profile', '{"parent_name":"fake_parent"', + 'test-port', + ] + self.assertRaises(argparse.ArgumentTypeError, + self.check_parser, + self.cmd, + arglist, + None) + + def test_create_invalid_key_value_binding_profile(self): + arglist = [ + '--network', self._port.network_id, + '--binding-profile', 'key', + 'test-port', + ] + self.assertRaises(argparse.ArgumentTypeError, + self.check_parser, + self.cmd, + arglist, + None) + + def test_create_json_binding_profile(self): + arglist = [ + '--network', self._port.network_id, + '--binding-profile', '{"parent_name":"fake_parent"}', + '--binding-profile', '{"tag":42}', + 'test-port', + ] + verifylist = [ + ('network', self._port.network_id,), + ('enable', True), + ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}), + ('name', 'test-port'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_port.assert_called_once_with(**{ + 'admin_state_up': True, + 'network_id': self._port.network_id, + 'binding:profile': {'parent_name': 'fake_parent', 'tag': 42}, + 'name': 'test-port', + }) + + ref_columns, ref_data = self._get_common_cols_data(self._port) + self.assertEqual(ref_columns, columns) + self.assertEqual(ref_data, data) + + +class TestDeletePort(TestPort): + + # Ports to delete. + _ports = network_fakes.FakePort.create_ports(count=2) + + def setUp(self): + super(TestDeletePort, self).setUp() + + self.network.delete_port = mock.Mock(return_value=None) + self.network.find_port = network_fakes.FakePort.get_ports( + ports=self._ports) + # Get the command object to test + self.cmd = port.DeletePort(self.app, self.namespace) + + def test_port_delete(self): + arglist = [ + self._ports[0].name, + ] + verifylist = [ + ('port', [self._ports[0].name]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_port.assert_called_once_with( + self._ports[0].name, ignore_missing=False) + self.network.delete_port.assert_called_once_with(self._ports[0]) + self.assertIsNone(result) + + def test_multi_ports_delete(self): + arglist = [] + verifylist = [] + + for p in self._ports: + arglist.append(p.name) + verifylist = [ + ('port', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for p in self._ports: + calls.append(call(p)) + self.network.delete_port.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_ports_delete_with_exception(self): + arglist = [ + self._ports[0].name, + 'unexist_port', + ] + verifylist = [ + ('port', + [self._ports[0].name, 'unexist_port']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._ports[0], exceptions.CommandError] + self.network.find_port = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 ports failed to delete.', str(e)) + + self.network.find_port.assert_any_call( + self._ports[0].name, ignore_missing=False) + self.network.find_port.assert_any_call( + 'unexist_port', ignore_missing=False) + self.network.delete_port.assert_called_once_with( + self._ports[0] + ) + + +class TestListPort(TestPort): + + _ports = network_fakes.FakePort.create_ports(count=3) + + columns = ( + 'ID', + 'Name', + 'MAC Address', + 'Fixed IP Addresses', + ) + + data = [] + for prt in _ports: + data.append(( + prt.id, + prt.name, + prt.mac_address, + utils.format_list_of_dicts(prt.fixed_ips), + )) + + def setUp(self): + super(TestListPort, self).setUp() + + # Get the command object to test + self.cmd = port.ListPort(self.app, self.namespace) + self.network.ports = mock.Mock(return_value=self._ports) + fake_router = network_fakes.FakeRouter.create_one_router({ + 'id': 'fake-router-id', + }) + self.network.find_router = mock.Mock(return_value=fake_router) + + def test_port_list_no_options(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ports.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_port_list_router_opt(self): + arglist = [ + '--router', 'fake-router-name', + ] + + verifylist = [ + ('router', 'fake-router-name') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ports.assert_called_once_with(**{ + 'device_id': 'fake-router-id' + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_port_list_device_owner_opt(self): + arglist = [ + '--device-owner', self._ports[0].device_owner, + ] + + verifylist = [ + ('device_owner', self._ports[0].device_owner) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ports.assert_called_once_with(**{ + 'device_owner': self._ports[0].device_owner + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_port_list_all_opt(self): + arglist = [ + '--device-owner', self._ports[0].device_owner, + '--router', 'fake-router-name', + ] + + verifylist = [ + ('device_owner', self._ports[0].device_owner), + ('router', 'fake-router-name') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ports.assert_called_once_with(**{ + 'device_owner': self._ports[0].device_owner, + 'device_id': 'fake-router-id' + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetPort(TestPort): + + _port = network_fakes.FakePort.create_one_port() + + def setUp(self): + super(TestSetPort, self).setUp() + self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() + self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) + self.network.find_port = mock.Mock(return_value=self._port) + self.network.update_port = mock.Mock(return_value=None) + + # Get the command object to test + self.cmd = port.SetPort(self.app, self.namespace) + + def test_set_fixed_ip(self): + arglist = [ + '--fixed-ip', 'ip-address=10.0.0.11', + self._port.name, + ] + verifylist = [ + ('fixed_ip', [{'ip-address': '10.0.0.11'}]), + ('port', self._port.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'fixed_ips': [{'ip_address': '10.0.0.11'}], + } + self.network.update_port.assert_called_once_with(self._port, **attrs) + self.assertIsNone(result) + + def test_append_fixed_ip(self): + _testport = network_fakes.FakePort.create_one_port( + {'fixed_ips': [{'ip_address': '0.0.0.1'}]}) + self.network.find_port = mock.Mock(return_value=_testport) + arglist = [ + '--fixed-ip', 'ip-address=10.0.0.12', + _testport.name, + ] + verifylist = [ + ('fixed_ip', [{'ip-address': '10.0.0.12'}]), + ('port', _testport.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'fixed_ips': [ + {'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}], + } + self.network.update_port.assert_called_once_with(_testport, **attrs) + self.assertIsNone(result) + + def test_set_this(self): + arglist = [ + '--disable', + '--no-fixed-ip', + '--no-binding-profile', + self._port.name, + ] + verifylist = [ + ('disable', True), + ('no_binding_profile', True), + ('no_fixed_ip', True), + ('port', self._port.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': False, + 'binding:profile': {}, + 'fixed_ips': [], + } + self.network.update_port.assert_called_once_with(self._port, **attrs) + self.assertIsNone(result) + + def test_set_that(self): + arglist = [ + '--enable', + '--vnic-type', 'macvtap', + '--binding-profile', 'foo=bar', + '--host', 'binding-host-id-xxxx', + '--name', 'newName', + self._port.name, + ] + verifylist = [ + ('enable', True), + ('vnic_type', 'macvtap'), + ('binding_profile', {'foo': 'bar'}), + ('host', 'binding-host-id-xxxx'), + ('name', 'newName'), + ('port', self._port.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': True, + 'binding:vnic_type': 'macvtap', + 'binding:profile': {'foo': 'bar'}, + 'binding:host_id': 'binding-host-id-xxxx', + 'name': 'newName', + } + self.network.update_port.assert_called_once_with(self._port, **attrs) + self.assertIsNone(result) + + def test_set_nothing(self): + arglist = [ + self._port.name, + ] + verifylist = [ + ('port', self._port.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_port.assert_called_once_with(self._port, **attrs) + self.assertIsNone(result) + + def test_set_invalid_json_binding_profile(self): + arglist = [ + '--binding-profile', '{"parent_name"}', + 'test-port', + ] + self.assertRaises(argparse.ArgumentTypeError, + self.check_parser, + self.cmd, + arglist, + None) + + def test_set_invalid_key_value_binding_profile(self): + arglist = [ + '--binding-profile', 'key', + 'test-port', + ] + self.assertRaises(argparse.ArgumentTypeError, + self.check_parser, + self.cmd, + arglist, + None) + + def test_set_mixed_binding_profile(self): + arglist = [ + '--binding-profile', 'foo=bar', + '--binding-profile', '{"foo2": "bar2"}', + self._port.name, + ] + verifylist = [ + ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}), + ('port', self._port.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'}, + } + self.network.update_port.assert_called_once_with(self._port, **attrs) + self.assertIsNone(result) + + +class TestShowPort(TestPort): + + # The port to show. + _port = network_fakes.FakePort.create_one_port() + + def setUp(self): + super(TestShowPort, self).setUp() + + self.network.find_port = mock.Mock(return_value=self._port) + + # Get the command object to test + self.cmd = port.ShowPort(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._port.name, + ] + verifylist = [ + ('port', self._port.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_port.assert_called_once_with( + self._port.name, ignore_missing=False) + + ref_columns, ref_data = self._get_common_cols_data(self._port) + self.assertEqual(ref_columns, columns) + self.assertEqual(ref_data, data) + + +class TestUnsetPort(TestPort): + + def setUp(self): + super(TestUnsetPort, self).setUp() + self._testport = network_fakes.FakePort.create_one_port( + {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', + 'ip_address': '0.0.0.1'}, + {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', + 'ip_address': '1.0.0.0'}], + 'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'}}) + self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet( + {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'}) + self.network.find_subnet = mock.Mock(return_value=self.fake_subnet) + self.network.find_port = mock.Mock(return_value=self._testport) + self.network.update_port = mock.Mock(return_value=None) + # Get the command object to test + self.cmd = port.UnsetPort(self.app, self.namespace) + + def test_unset_port_parameters(self): + arglist = [ + '--fixed-ip', + 'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0', + '--binding-profile', 'Superman', + self._testport.name, + ] + verifylist = [ + ('fixed_ip', [{ + 'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152', + 'ip-address': '1.0.0.0'}]), + ('binding_profile', ['Superman']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'fixed_ips': [{ + 'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152', + 'ip_address': '0.0.0.1'}], + 'binding:profile': {'batman': 'Joker'} + } + self.network.update_port.assert_called_once_with( + self._testport, **attrs) + self.assertIsNone(result) + + def test_unset_port_fixed_ip_not_existent(self): + arglist = [ + '--fixed-ip', 'ip-address=1.0.0.1', + '--binding-profile', 'Superman', + self._testport.name, + ] + verifylist = [ + ('fixed_ip', [{'ip-address': '1.0.0.1'}]), + ('binding_profile', ['Superman']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, + parsed_args) + + def test_unset_port_binding_profile_not_existent(self): + arglist = [ + '--fixed-ip', 'ip-address=1.0.0.0', + '--binding-profile', 'Neo', + self._testport.name, + ] + verifylist = [ + ('fixed_ip', [{'ip-address': '1.0.0.0'}]), + ('binding_profile', ['Neo']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, + parsed_args) diff --git a/openstackclient/tests/unit/network/v2/test_router.py b/openstackclient/tests/unit/network/v2/test_router.py new file mode 100644 index 00000000..26fe655e --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_router.py @@ -0,0 +1,751 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils as osc_utils + +from openstackclient.network.v2 import router +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestRouter(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestRouter, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestAddPortToRouter(TestRouter): + '''Add port to Router ''' + + _port = network_fakes.FakePort.create_one_port() + _router = network_fakes.FakeRouter.create_one_router( + attrs={'port': _port.id}) + + def setUp(self): + super(TestAddPortToRouter, self).setUp() + self.network.router_add_interface = mock.Mock() + self.cmd = router.AddPortToRouter(self.app, self.namespace) + self.network.find_router = mock.Mock(return_value=self._router) + self.network.find_port = mock.Mock(return_value=self._port) + + def test_add_port_no_option(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_add_port_required_options(self): + arglist = [ + self._router.id, + self._router.port, + ] + verifylist = [ + ('router', self._router.id), + ('port', self._router.port), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.router_add_interface.assert_called_with(self._router, **{ + 'port_id': self._router.port, + }) + self.assertIsNone(result) + + +class TestAddSubnetToRouter(TestRouter): + '''Add subnet to Router ''' + + _subnet = network_fakes.FakeSubnet.create_one_subnet() + _router = network_fakes.FakeRouter.create_one_router( + attrs={'subnet': _subnet.id}) + + def setUp(self): + super(TestAddSubnetToRouter, self).setUp() + self.network.router_add_interface = mock.Mock() + self.cmd = router.AddSubnetToRouter(self.app, self.namespace) + self.network.find_router = mock.Mock(return_value=self._router) + self.network.find_subnet = mock.Mock(return_value=self._subnet) + + def test_add_subnet_no_option(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_add_subnet_required_options(self): + arglist = [ + self._router.id, + self._router.subnet, + ] + verifylist = [ + ('router', self._router.id), + ('subnet', self._router.subnet), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.router_add_interface.assert_called_with( + self._router, **{'subnet_id': self._router.subnet}) + + self.assertIsNone(result) + + +class TestCreateRouter(TestRouter): + + # The new router created. + new_router = network_fakes.FakeRouter.create_one_router() + + columns = ( + 'admin_state_up', + 'availability_zone_hints', + 'availability_zones', + 'distributed', + 'external_gateway_info', + 'ha', + 'id', + 'name', + 'project_id', + 'routes', + 'status', + ) + data = ( + router._format_admin_state(new_router.admin_state_up), + osc_utils.format_list(new_router.availability_zone_hints), + osc_utils.format_list(new_router.availability_zones), + new_router.distributed, + router._format_external_gateway_info(new_router.external_gateway_info), + new_router.ha, + new_router.id, + new_router.name, + new_router.tenant_id, + router._format_routes(new_router.routes), + new_router.status, + ) + + def setUp(self): + super(TestCreateRouter, self).setUp() + + self.network.create_router = mock.Mock(return_value=self.new_router) + + # Get the command object to test + self.cmd = router.CreateRouter(self.app, self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self.new_router.name, + ] + verifylist = [ + ('name', self.new_router.name), + ('enable', True), + ('distributed', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_router.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self.new_router.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_with_AZ_hints(self): + arglist = [ + self.new_router.name, + '--availability-zone-hint', 'fake-az', + '--availability-zone-hint', 'fake-az2', + ] + verifylist = [ + ('name', self.new_router.name), + ('availability_zone_hints', ['fake-az', 'fake-az2']), + ('enable', True), + ('distributed', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + self.network.create_router.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self.new_router.name, + 'availability_zone_hints': ['fake-az', 'fake-az2'], + }) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteRouter(TestRouter): + + # The routers to delete. + _routers = network_fakes.FakeRouter.create_routers(count=2) + + def setUp(self): + super(TestDeleteRouter, self).setUp() + + self.network.delete_router = mock.Mock(return_value=None) + + self.network.find_router = ( + network_fakes.FakeRouter.get_routers(self._routers)) + + # Get the command object to test + self.cmd = router.DeleteRouter(self.app, self.namespace) + + def test_router_delete(self): + arglist = [ + self._routers[0].name, + ] + verifylist = [ + ('router', [self._routers[0].name]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.delete_router.assert_called_once_with(self._routers[0]) + self.assertIsNone(result) + + def test_multi_routers_delete(self): + arglist = [] + verifylist = [] + + for r in self._routers: + arglist.append(r.name) + verifylist = [ + ('router', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for r in self._routers: + calls.append(call(r)) + self.network.delete_router.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_routers_delete_with_exception(self): + arglist = [ + self._routers[0].name, + 'unexist_router', + ] + verifylist = [ + ('router', + [self._routers[0].name, 'unexist_router']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._routers[0], exceptions.CommandError] + self.network.find_router = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 routers failed to delete.', str(e)) + + self.network.find_router.assert_any_call( + self._routers[0].name, ignore_missing=False) + self.network.find_router.assert_any_call( + 'unexist_router', ignore_missing=False) + self.network.delete_router.assert_called_once_with( + self._routers[0] + ) + + +class TestListRouter(TestRouter): + + # The routers going to be listed up. + routers = network_fakes.FakeRouter.create_routers(count=3) + + columns = ( + 'ID', + 'Name', + 'Status', + 'State', + 'Distributed', + 'HA', + 'Project', + ) + columns_long = columns + ( + 'Routes', + 'External gateway info', + 'Availability zones' + ) + + data = [] + for r in routers: + data.append(( + r.id, + r.name, + r.status, + router._format_admin_state(r.admin_state_up), + r.distributed, + r.ha, + r.tenant_id, + )) + data_long = [] + for i in range(0, len(routers)): + r = routers[i] + data_long.append( + data[i] + ( + router._format_routes(r.routes), + router._format_external_gateway_info(r.external_gateway_info), + osc_utils.format_list(r.availability_zones), + ) + ) + + def setUp(self): + super(TestListRouter, self).setUp() + + # Get the command object to test + self.cmd = router.ListRouter(self.app, self.namespace) + + self.network.routers = mock.Mock(return_value=self.routers) + + def test_router_list_no_options(self): + arglist = [] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.network.routers.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_router_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.network.routers.assert_called_once_with() + self.assertEqual(self.columns_long, columns) + self.assertEqual(self.data_long, list(data)) + + +class TestRemovePortFromRouter(TestRouter): + '''Remove port from a Router ''' + + _port = network_fakes.FakePort.create_one_port() + _router = network_fakes.FakeRouter.create_one_router( + attrs={'port': _port.id}) + + def setUp(self): + super(TestRemovePortFromRouter, self).setUp() + self.network.router_remove_interface = mock.Mock() + self.cmd = router.RemovePortFromRouter(self.app, self.namespace) + self.network.find_router = mock.Mock(return_value=self._router) + self.network.find_port = mock.Mock(return_value=self._port) + + def test_remove_port_no_option(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_remove_port_required_options(self): + arglist = [ + self._router.id, + self._router.port, + ] + verifylist = [ + ('router', self._router.id), + ('port', self._router.port), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.router_remove_interface.assert_called_with( + self._router, **{'port_id': self._router.port}) + self.assertIsNone(result) + + +class TestRemoveSubnetFromRouter(TestRouter): + '''Remove subnet from Router ''' + + _subnet = network_fakes.FakeSubnet.create_one_subnet() + _router = network_fakes.FakeRouter.create_one_router( + attrs={'subnet': _subnet.id}) + + def setUp(self): + super(TestRemoveSubnetFromRouter, self).setUp() + self.network.router_remove_interface = mock.Mock() + self.cmd = router.RemoveSubnetFromRouter(self.app, self.namespace) + self.network.find_router = mock.Mock(return_value=self._router) + self.network.find_subnet = mock.Mock(return_value=self._subnet) + + def test_remove_subnet_no_option(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_remove_subnet_required_options(self): + arglist = [ + self._router.id, + self._router.subnet, + ] + verifylist = [ + ('subnet', self._router.subnet), + ('router', self._router.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.router_remove_interface.assert_called_with( + self._router, **{'subnet_id': self._router.subnet}) + self.assertIsNone(result) + + +class TestSetRouter(TestRouter): + + # The router to set. + _default_route = {'destination': '10.20.20.0/24', 'nexthop': '10.20.30.1'} + _router = network_fakes.FakeRouter.create_one_router( + attrs={'routes': [_default_route]} + ) + + def setUp(self): + super(TestSetRouter, self).setUp() + + self.network.update_router = mock.Mock(return_value=None) + + self.network.find_router = mock.Mock(return_value=self._router) + + # Get the command object to test + self.cmd = router.SetRouter(self.app, self.namespace) + + def test_set_this(self): + arglist = [ + self._router.name, + '--enable', + '--distributed', + '--name', 'noob', + ] + verifylist = [ + ('router', self._router.name), + ('enable', True), + ('distributed', True), + ('name', 'noob'), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': True, + 'distributed': True, + 'name': 'noob', + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_that(self): + arglist = [ + self._router.name, + '--disable', + '--centralized', + ] + verifylist = [ + ('router', self._router.name), + ('disable', True), + ('centralized', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': False, + 'distributed': False, + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_distributed_centralized(self): + arglist = [ + self._router.name, + '--distributed', + '--centralized', + ] + verifylist = [ + ('router', self._router.name), + ('distributed', True), + ('distributed', False), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_set_route(self): + arglist = [ + self._router.name, + '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', + ] + verifylist = [ + ('router', self._router.name), + ('routes', [{'destination': '10.20.30.0/24', + 'gateway': '10.20.30.1'}]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'routes': self._router.routes + [{'destination': '10.20.30.0/24', + 'nexthop': '10.20.30.1'}], + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_no_route(self): + arglist = [ + self._router.name, + '--no-route', + ] + verifylist = [ + ('router', self._router.name), + ('no_route', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'routes': [], + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_route_no_route(self): + arglist = [ + self._router.name, + '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', + '--no-route', + ] + verifylist = [ + ('router', self._router.name), + ('routes', [{'destination': '10.20.30.0/24', + 'gateway': '10.20.30.1'}]), + ('no_route', True), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_set_clear_routes(self): + arglist = [ + self._router.name, + '--clear-routes', + ] + verifylist = [ + ('router', self._router.name), + ('clear_routes', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'routes': [], + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_route_clear_routes(self): + arglist = [ + self._router.name, + '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', + '--clear-routes', + ] + verifylist = [ + ('router', self._router.name), + ('routes', [{'destination': '10.20.30.0/24', + 'gateway': '10.20.30.1'}]), + ('clear_routes', True), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_set_nothing(self): + arglist = [ + self._router.name, + ] + verifylist = [ + ('router', self._router.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + +class TestShowRouter(TestRouter): + + # The router to set. + _router = network_fakes.FakeRouter.create_one_router() + + columns = ( + 'admin_state_up', + 'availability_zone_hints', + 'availability_zones', + 'distributed', + 'external_gateway_info', + 'ha', + 'id', + 'name', + 'project_id', + 'routes', + 'status', + ) + data = ( + router._format_admin_state(_router.admin_state_up), + osc_utils.format_list(_router.availability_zone_hints), + osc_utils.format_list(_router.availability_zones), + _router.distributed, + router._format_external_gateway_info(_router.external_gateway_info), + _router.ha, + _router.id, + _router.name, + _router.tenant_id, + router._format_routes(_router.routes), + _router.status, + ) + + def setUp(self): + super(TestShowRouter, self).setUp() + + self.network.find_router = mock.Mock(return_value=self._router) + + # Get the command object to test + self.cmd = router.ShowRouter(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._router.name, + ] + verifylist = [ + ('router', self._router.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_router.assert_called_once_with( + self._router.name, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestUnsetRouter(TestRouter): + + def setUp(self): + super(TestUnsetRouter, self).setUp() + self._testrouter = network_fakes.FakeRouter.create_one_router( + {'routes': [{"destination": "192.168.101.1/24", + "gateway": "172.24.4.3"}, + {"destination": "192.168.101.2/24", + "gateway": "172.24.4.3"}], }) + self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet() + self.network.find_router = mock.Mock(return_value=self._testrouter) + self.network.update_router = mock.Mock(return_value=None) + # Get the command object to test + self.cmd = router.UnsetRouter(self.app, self.namespace) + + def test_unset_router_params(self): + arglist = [ + '--route', 'destination=192.168.101.1/24,gateway=172.24.4.3', + self._testrouter.name, + ] + verifylist = [ + ('routes', [ + {"destination": "192.168.101.1/24", "gateway": "172.24.4.3"}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'routes': [{"destination": "192.168.101.2/24", + "nexthop": "172.24.4.3"}], + } + self.network.update_router.assert_called_once_with( + self._testrouter, **attrs) + self.assertIsNone(result) + + def test_unset_router_wrong_routes(self): + arglist = [ + '--route', 'destination=192.168.101.1/24,gateway=172.24.4.2', + self._testrouter.name, + ] + verifylist = [ + ('routes', [ + {"destination": "192.168.101.1/24", "gateway": "172.24.4.2"}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/unit/network/v2/test_security_group.py b/openstackclient/tests/unit/network/v2/test_security_group.py new file mode 100644 index 00000000..4c5b2972 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group.py @@ -0,0 +1,770 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import security_group +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestSecurityGroupNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestSecurityGroupCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group to be created. + _security_group = \ + network_fakes.FakeSecurityGroup.create_one_security_group() + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.project_id, + '', + ) + + def setUp(self): + super(TestCreateSecurityGroupNetwork, self).setUp() + + self.network.create_security_group = mock.Mock( + return_value=self._security_group) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + # Get the command object to test + self.cmd = security_group.CreateSecurityGroup(self.app, self.namespace) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_min_options(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('name', self._security_group.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group.assert_called_once_with(**{ + 'description': self._security_group.name, + 'name': self._security_group.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + '--description', self._security_group.description, + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group.name, + ] + verifylist = [ + ('description', self._security_group.description), + ('name', self._security_group.name), + ('project', self.project.name), + ('project_domain', self.domain.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group.assert_called_once_with(**{ + 'description': self._security_group.description, + 'name': self._security_group.name, + 'tenant_id': self.project.id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.tenant_id, + '', + ) + + def setUp(self): + super(TestCreateSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.create.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group.CreateSecurityGroup(self.app, None) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_network_options(self): + arglist = [ + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group.name, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_min_options(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('name', self._security_group.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.create.assert_called_once_with( + self._security_group.name, + self._security_group.name) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + '--description', self._security_group.description, + self._security_group.name, + ] + verifylist = [ + ('description', self._security_group.description), + ('name', self._security_group.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.create.assert_called_once_with( + self._security_group.name, + self._security_group.description) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): + + # The security groups to be deleted. + _security_groups = \ + network_fakes.FakeSecurityGroup.create_security_groups() + + def setUp(self): + super(TestDeleteSecurityGroupNetwork, self).setUp() + + self.network.delete_security_group = mock.Mock(return_value=None) + + self.network.find_security_group = ( + network_fakes.FakeSecurityGroup.get_security_groups( + self._security_groups) + ) + + # Get the command object to test + self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace) + + def test_security_group_delete(self): + arglist = [ + self._security_groups[0].name, + ] + verifylist = [ + ('group', [self._security_groups[0].name]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.delete_security_group.assert_called_once_with( + self._security_groups[0]) + self.assertIsNone(result) + + def test_multi_security_groups_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_groups: + arglist.append(s.name) + verifylist = [ + ('group', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_groups: + calls.append(call(s)) + self.network.delete_security_group.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_groups_delete_with_exception(self): + arglist = [ + self._security_groups[0].name, + 'unexist_security_group', + ] + verifylist = [ + ('group', + [self._security_groups[0].name, 'unexist_security_group']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._security_groups[0], exceptions.CommandError] + self.network.find_security_group = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 groups failed to delete.', str(e)) + + self.network.find_security_group.assert_any_call( + self._security_groups[0].name, ignore_missing=False) + self.network.find_security_group.assert_any_call( + 'unexist_security_group', ignore_missing=False) + self.network.delete_security_group.assert_called_once_with( + self._security_groups[0] + ) + + +class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): + + # The security groups to be deleted. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups() + + def setUp(self): + super(TestDeleteSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.delete = mock.Mock(return_value=None) + + self.compute.security_groups.get = ( + compute_fakes.FakeSecurityGroup.get_security_groups( + self._security_groups) + ) + + # Get the command object to test + self.cmd = security_group.DeleteSecurityGroup(self.app, None) + + def test_security_group_delete(self): + arglist = [ + self._security_groups[0].id, + ] + verifylist = [ + ('group', [self._security_groups[0].id]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.delete.assert_called_once_with( + self._security_groups[0].id) + self.assertIsNone(result) + + def test_multi_security_groups_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_groups: + arglist.append(s.id) + verifylist = [ + ('group', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_groups: + calls.append(call(s.id)) + self.compute.security_groups.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_groups_delete_with_exception(self): + arglist = [ + self._security_groups[0].id, + 'unexist_security_group', + ] + verifylist = [ + ('group', + [self._security_groups[0].id, 'unexist_security_group']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._security_groups[0], exceptions.CommandError] + self.compute.security_groups.get = ( + mock.MagicMock(side_effect=find_mock_result) + ) + self.compute.security_groups.find.side_effect = ( + exceptions.NotFound(None)) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 groups failed to delete.', str(e)) + + self.compute.security_groups.get.assert_any_call( + self._security_groups[0].id) + self.compute.security_groups.get.assert_any_call( + 'unexist_security_group') + self.compute.security_groups.delete.assert_called_once_with( + self._security_groups[0].id + ) + + +class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): + + # The security group to be listed. + _security_groups = \ + network_fakes.FakeSecurityGroup.create_security_groups(count=3) + + columns = ( + 'ID', + 'Name', + 'Description', + 'Project', + ) + + data = [] + for grp in _security_groups: + data.append(( + grp.id, + grp.name, + grp.description, + grp.tenant_id, + )) + + def setUp(self): + super(TestListSecurityGroupNetwork, self).setUp() + + self.network.security_groups = mock.Mock( + return_value=self._security_groups) + + # Get the command object to test + self.cmd = security_group.ListSecurityGroup(self.app, self.namespace) + + def test_security_group_list_no_options(self): + arglist = [] + verifylist = [ + ('all_projects', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.security_groups.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_security_group_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.security_groups.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestListSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be listed. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups(count=3) + + columns = ( + 'ID', + 'Name', + 'Description', + ) + columns_all_projects = ( + 'ID', + 'Name', + 'Description', + 'Project', + ) + + data = [] + for grp in _security_groups: + data.append(( + grp.id, + grp.name, + grp.description, + )) + data_all_projects = [] + for grp in _security_groups: + data_all_projects.append(( + grp.id, + grp.name, + grp.description, + grp.tenant_id, + )) + + def setUp(self): + super(TestListSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + self.compute.security_groups.list.return_value = self._security_groups + + # Get the command object to test + self.cmd = security_group.ListSecurityGroup(self.app, None) + + def test_security_group_list_no_options(self): + arglist = [] + verifylist = [ + ('all_projects', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': False}} + self.compute.security_groups.list.assert_called_once_with(**kwargs) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_security_group_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': True}} + self.compute.security_groups.list.assert_called_once_with(**kwargs) + self.assertEqual(self.columns_all_projects, columns) + self.assertEqual(self.data_all_projects, list(data)) + + +class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): + + # The security group to be set. + _security_group = \ + network_fakes.FakeSecurityGroup.create_one_security_group() + + def setUp(self): + super(TestSetSecurityGroupNetwork, self).setUp() + + self.network.update_security_group = mock.Mock(return_value=None) + + self.network.find_security_group = mock.Mock( + return_value=self._security_group) + + # Get the command object to test + self.cmd = security_group.SetSecurityGroup(self.app, self.namespace) + + def test_set_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_set_no_updates(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('group', self._security_group.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.update_security_group.assert_called_once_with( + self._security_group, + **{} + ) + self.assertIsNone(result) + + def test_set_all_options(self): + new_name = 'new-' + self._security_group.name + new_description = 'new-' + self._security_group.description + arglist = [ + '--name', new_name, + '--description', new_description, + self._security_group.name, + ] + verifylist = [ + ('description', new_description), + ('group', self._security_group.name), + ('name', new_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'description': new_description, + 'name': new_name, + } + self.network.update_security_group.assert_called_once_with( + self._security_group, + **attrs + ) + self.assertIsNone(result) + + +class TestSetSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be set. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def setUp(self): + super(TestSetSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.update = mock.Mock(return_value=None) + + self.compute.security_groups.get = mock.Mock( + return_value=self._security_group) + + # Get the command object to test + self.cmd = security_group.SetSecurityGroup(self.app, None) + + def test_set_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_set_no_updates(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('group', self._security_group.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.update.assert_called_once_with( + self._security_group, + self._security_group.name, + self._security_group.description + ) + self.assertIsNone(result) + + def test_set_all_options(self): + new_name = 'new-' + self._security_group.name + new_description = 'new-' + self._security_group.description + arglist = [ + '--name', new_name, + '--description', new_description, + self._security_group.name, + ] + verifylist = [ + ('description', new_description), + ('group', self._security_group.name), + ('name', new_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.update.assert_called_once_with( + self._security_group, + new_name, + new_description + ) + self.assertIsNone(result) + + +class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): + + # The security group rule to be shown with the group. + _security_group_rule = \ + network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + # The security group to be shown. + _security_group = \ + network_fakes.FakeSecurityGroup.create_one_security_group( + attrs={'security_group_rules': [_security_group_rule._info]} + ) + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.project_id, + security_group._format_network_security_group_rules( + [_security_group_rule._info]), + ) + + def setUp(self): + super(TestShowSecurityGroupNetwork, self).setUp() + + self.network.find_security_group = mock.Mock( + return_value=self._security_group) + + # Get the command object to test + self.cmd = security_group.ShowSecurityGroup(self.app, self.namespace) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group.id, + ] + verifylist = [ + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_security_group.assert_called_once_with( + self._security_group.id, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestShowSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group rule to be shown with the group. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group( + attrs={'rules': [_security_group_rule._info]} + ) + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.tenant_id, + security_group._format_compute_security_group_rules( + [_security_group_rule._info]), + ) + + def setUp(self): + super(TestShowSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group.ShowSecurityGroup(self.app, None) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group.id, + ] + verifylist = [ + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.get.assert_called_once_with( + self._security_group.id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_security_group_rule.py new file mode 100644 index 00000000..51e18a65 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule.py @@ -0,0 +1,1177 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network import utils as network_utils +from openstackclient.network.v2 import security_group_rule +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestSecurityGroupRuleNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupRuleCompute, self).setUp() + + # Get a shortcut to the network client + self.compute = self.app.client_manager.compute + + +class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group rule to be created. + _security_group_rule = None + + # The security group that will contain the rule created. + _security_group = \ + network_fakes.FakeSecurityGroup.create_one_security_group() + + expected_columns = ( + 'direction', + 'ethertype', + 'id', + 'port_range_max', + 'port_range_min', + 'project_id', + 'protocol', + 'remote_group_id', + 'remote_ip_prefix', + 'security_group_id', + ) + + expected_data = None + + def _setup_security_group_rule(self, attrs=None): + self._security_group_rule = \ + network_fakes.FakeSecurityGroupRule.create_one_security_group_rule( + attrs) + self.network.create_security_group_rule = mock.Mock( + return_value=self._security_group_rule) + self.expected_data = ( + self._security_group_rule.direction, + self._security_group_rule.ethertype, + self._security_group_rule.id, + self._security_group_rule.port_range_max, + self._security_group_rule.port_range_min, + self._security_group_rule.project_id, + self._security_group_rule.protocol, + self._security_group_rule.remote_group_id, + self._security_group_rule.remote_ip_prefix, + self._security_group_rule.security_group_id, + ) + + def setUp(self): + super(TestCreateSecurityGroupRuleNetwork, self).setUp() + + self.network.find_security_group = mock.Mock( + return_value=self._security_group) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + # Get the command object to test + self.cmd = security_group_rule.CreateSecurityGroupRule( + self.app, self.namespace) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_all_source_options(self): + arglist = [ + '--src-ip', '10.10.0.0/24', + '--src-group', self._security_group.id, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_bad_ethertype(self): + arglist = [ + '--ethertype', 'foo', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_all_protocol_options(self): + arglist = [ + '--protocol', 'tcp', + '--proto', 'tcp', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_all_port_range_options(self): + arglist = [ + '--dst-port', '80:80', + '--icmp-type', '3', + '--icmp-code', '1', + self._security_group.id, + ] + verifylist = [ + ('dst_port', (80, 80)), + ('icmp_type', 3), + ('icmp_code', 1), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_default_rule(self): + self._setup_security_group_rule({ + 'port_range_max': 443, + 'port_range_min': 443, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.port_range_min), + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.port_range_min, + self._security_group_rule.port_range_max)), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_max': self._security_group_rule.port_range_max, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_proto_option(self): + self._setup_security_group_rule({ + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + }) + arglist = [ + '--proto', self._security_group_rule.protocol, + '--src-ip', self._security_group_rule.remote_ip_prefix, + self._security_group.id, + ] + verifylist = [ + ('proto', self._security_group_rule.protocol), + ('protocol', None), + ('src_ip', self._security_group_rule.remote_ip_prefix), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_source_group(self): + self._setup_security_group_rule({ + 'port_range_max': 22, + 'port_range_min': 22, + 'remote_group_id': self._security_group.id, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.port_range_min), + '--ingress', + '--src-group', self._security_group.name, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.port_range_min, + self._security_group_rule.port_range_max)), + ('ingress', True), + ('src_group', self._security_group.name), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_max': self._security_group_rule.port_range_max, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'remote_group_id': self._security_group_rule.remote_group_id, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_source_ip(self): + self._setup_security_group_rule({ + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + }) + arglist = [ + '--protocol', self._security_group_rule.protocol, + '--src-ip', self._security_group_rule.remote_ip_prefix, + self._security_group.id, + ] + verifylist = [ + ('protocol', self._security_group_rule.protocol), + ('src_ip', self._security_group_rule.remote_ip_prefix), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_network_options(self): + self._setup_security_group_rule({ + 'direction': 'egress', + 'ethertype': 'IPv6', + 'port_range_max': 443, + 'port_range_min': 443, + 'protocol': '6', + 'remote_group_id': None, + 'remote_ip_prefix': None, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.port_range_min), + '--egress', + '--ethertype', self._security_group_rule.ethertype, + '--project', self.project.name, + '--project-domain', self.domain.name, + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.port_range_min, + self._security_group_rule.port_range_max)), + ('egress', True), + ('ethertype', self._security_group_rule.ethertype), + ('project', self.project.name), + ('project_domain', self.domain.name), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_max': self._security_group_rule.port_range_max, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'security_group_id': self._security_group.id, + 'tenant_id': self.project.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_tcp_with_icmp_type(self): + arglist = [ + '--protocol', 'tcp', + '--icmp-type', '15', + self._security_group.id, + ] + verifylist = [ + ('protocol', 'tcp'), + ('icmp_type', 15), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_icmp_code(self): + arglist = [ + '--protocol', '1', + '--icmp-code', '1', + self._security_group.id, + ] + verifylist = [ + ('protocol', '1'), + ('icmp_code', 1), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_icmp_type(self): + self._setup_security_group_rule({ + 'port_range_min': 15, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_ipv6_icmp_type_code(self): + self._setup_security_group_rule({ + 'ethertype': 'IPv6', + 'port_range_min': 139, + 'port_range_max': 2, + 'protocol': 'ipv6-icmp', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--icmp-code', str(self._security_group_rule.port_range_max), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', self._security_group_rule.port_range_max), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'port_range_max': self._security_group_rule.port_range_max, + 'protocol': self._security_group_rule.protocol, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmpv6_type(self): + self._setup_security_group_rule({ + 'ethertype': 'IPv6', + 'port_range_min': 139, + 'protocol': 'icmpv6', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + +class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group rule to be created. + _security_group_rule = None + + # The security group that will contain the rule created. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def _setup_security_group_rule(self, attrs=None): + self._security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( + attrs) + self.compute.security_group_rules.create.return_value = \ + self._security_group_rule + expected_columns, expected_data = \ + security_group_rule._format_security_group_rule_show( + self._security_group_rule._info) + return expected_columns, expected_data + + def setUp(self): + super(TestCreateSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_all_source_options(self): + arglist = [ + '--src-ip', '10.10.0.0/24', + '--src-group', self._security_group.id, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_bad_protocol(self): + arglist = [ + '--protocol', 'foo', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_all_protocol_options(self): + arglist = [ + '--protocol', 'tcp', + '--proto', 'tcp', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_network_options(self): + arglist = [ + '--ingress', + '--ethertype', 'IPv4', + '--icmp-type', '3', + '--icmp-code', '11', + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_default_rule(self): + expected_columns, expected_data = self._setup_security_group_rule() + dst_port = str(self._security_group_rule.from_port) + ':' + \ + str(self._security_group_rule.to_port) + arglist = [ + '--dst-port', dst_port, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_source_group(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'from_port': 22, + 'to_port': 22, + 'group': {'name': self._security_group.name}, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.from_port), + '--src-group', self._security_group.name, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('src_group', self._security_group.name), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_source_ip(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_proto_option(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--proto', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('proto', self._security_group_rule.ip_protocol), + ('protocol', None), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + +class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): + + # The security group rules to be deleted. + _security_group_rules = \ + network_fakes.FakeSecurityGroupRule.create_security_group_rules( + count=2) + + def setUp(self): + super(TestDeleteSecurityGroupRuleNetwork, self).setUp() + + self.network.delete_security_group_rule = mock.Mock(return_value=None) + + self.network.find_security_group_rule = ( + network_fakes.FakeSecurityGroupRule.get_security_group_rules( + self._security_group_rules) + ) + + # Get the command object to test + self.cmd = security_group_rule.DeleteSecurityGroupRule( + self.app, self.namespace) + + def test_security_group_rule_delete(self): + arglist = [ + self._security_group_rules[0].id, + ] + verifylist = [ + ('rule', [self._security_group_rules[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.delete_security_group_rule.assert_called_once_with( + self._security_group_rules[0]) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_group_rules: + arglist.append(s.id) + verifylist = [ + ('rule', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_group_rules: + calls.append(call(s)) + self.network.delete_security_group_rule.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete_with_exception(self): + arglist = [ + self._security_group_rules[0].id, + 'unexist_rule', + ] + verifylist = [ + ('rule', + [self._security_group_rules[0].id, 'unexist_rule']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [ + self._security_group_rules[0], exceptions.CommandError] + self.network.find_security_group_rule = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 rules failed to delete.', str(e)) + + self.network.find_security_group_rule.assert_any_call( + self._security_group_rules[0].id, ignore_missing=False) + self.network.find_security_group_rule.assert_any_call( + 'unexist_rule', ignore_missing=False) + self.network.delete_security_group_rule.assert_called_once_with( + self._security_group_rules[0] + ) + + +class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be deleted. + _security_group_rules = \ + compute_fakes.FakeSecurityGroupRule.create_security_group_rules( + count=2) + + def setUp(self): + super(TestDeleteSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) + + def test_security_group_rule_delete(self): + arglist = [ + self._security_group_rules[0].id, + ] + verifylist = [ + ('rule', [self._security_group_rules[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.delete.assert_called_once_with( + self._security_group_rules[0].id) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_group_rules: + arglist.append(s.id) + verifylist = [ + ('rule', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_group_rules: + calls.append(call(s.id)) + self.compute.security_group_rules.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete_with_exception(self): + arglist = [ + self._security_group_rules[0].id, + 'unexist_rule', + ] + verifylist = [ + ('rule', + [self._security_group_rules[0].id, 'unexist_rule']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [None, exceptions.CommandError] + self.compute.security_group_rules.delete = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 rules failed to delete.', str(e)) + + self.compute.security_group_rules.delete.assert_any_call( + self._security_group_rules[0].id) + self.compute.security_group_rules.delete.assert_any_call( + 'unexist_rule') + + +class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): + + # The security group to hold the rules. + _security_group = \ + network_fakes.FakeSecurityGroup.create_one_security_group() + + # The security group rule to be listed. + _security_group_rule_tcp = \ + network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'protocol': 'tcp', + 'port_range_max': 80, + 'port_range_min': 80, + 'security_group_id': _security_group.id, + }) + _security_group_rule_icmp = \ + network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + 'security_group_id': _security_group.id, + }) + _security_group.security_group_rules = [_security_group_rule_tcp._info, + _security_group_rule_icmp._info] + _security_group_rules = [_security_group_rule_tcp, + _security_group_rule_icmp] + + expected_columns_with_group_and_long = ( + 'ID', + 'IP Protocol', + 'IP Range', + 'Port Range', + 'Direction', + 'Ethertype', + 'Remote Security Group', + ) + expected_columns_no_group = ( + 'ID', + 'IP Protocol', + 'IP Range', + 'Port Range', + 'Remote Security Group', + 'Security Group', + ) + + expected_data_with_group_and_long = [] + expected_data_no_group = [] + for _security_group_rule in _security_group_rules: + expected_data_with_group_and_long.append(( + _security_group_rule.id, + _security_group_rule.protocol, + _security_group_rule.remote_ip_prefix, + security_group_rule._format_network_port_range( + _security_group_rule), + _security_group_rule.direction, + _security_group_rule.ethertype, + _security_group_rule.remote_group_id, + )) + expected_data_no_group.append(( + _security_group_rule.id, + _security_group_rule.protocol, + _security_group_rule.remote_ip_prefix, + security_group_rule._format_network_port_range( + _security_group_rule), + _security_group_rule.remote_group_id, + _security_group_rule.security_group_id, + )) + + def setUp(self): + super(TestListSecurityGroupRuleNetwork, self).setUp() + + self.network.find_security_group = mock.Mock( + return_value=self._security_group) + self.network.security_group_rules = mock.Mock( + return_value=self._security_group_rules) + + # Get the command object to test + self.cmd = security_group_rule.ListSecurityGroupRule( + self.app, self.namespace) + + def test_list_default(self): + self._security_group_rule_tcp.port_range_min = 80 + parsed_args = self.check_parser(self.cmd, [], []) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.security_group_rules.assert_called_once_with(**{}) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_list_with_group_and_long(self): + self._security_group_rule_tcp.port_range_min = 80 + arglist = [ + '--long', + self._security_group.id, + ] + verifylist = [ + ('long', True), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.security_group_rules.assert_called_once_with(**{ + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns_with_group_and_long, columns) + self.assertEqual(self.expected_data_with_group_and_long, list(data)) + + def test_list_with_ignored_options(self): + self._security_group_rule_tcp.port_range_min = 80 + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.security_group_rules.assert_called_once_with(**{}) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + +class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group to hold the rules. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + # The security group rule to be listed. + _security_group_rule_tcp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'tcp', + 'from_port': 80, + 'to_port': 80, + 'group': {'name': _security_group.name}, + }) + _security_group_rule_icmp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + 'group': {'name': _security_group.name}, + }) + _security_group.rules = [_security_group_rule_tcp._info, + _security_group_rule_icmp._info] + + expected_columns_with_group = ( + 'ID', + 'IP Protocol', + 'IP Range', + 'Port Range', + 'Remote Security Group', + ) + expected_columns_no_group = \ + expected_columns_with_group + ('Security Group',) + + expected_data_with_group = [] + expected_data_no_group = [] + for _security_group_rule in _security_group.rules: + rule = network_utils.transform_compute_security_group_rule( + _security_group_rule + ) + expected_rule_with_group = ( + rule['id'], + rule['ip_protocol'], + rule['ip_range'], + rule['port_range'], + rule['remote_security_group'], + ) + expected_rule_no_group = expected_rule_with_group + \ + (_security_group_rule['parent_group_id'],) + expected_data_with_group.append(expected_rule_with_group) + expected_data_no_group.append(expected_rule_no_group) + + def setUp(self): + super(TestListSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = \ + self._security_group + self.compute.security_groups.list.return_value = \ + [self._security_group] + + # Get the command object to test + self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) + + def test_list_default(self): + parsed_args = self.check_parser(self.cmd, [], []) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_list_with_group(self): + arglist = [ + self._security_group.id, + ] + verifylist = [ + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.get.assert_called_once_with( + self._security_group.id + ) + self.assertEqual(self.expected_columns_with_group, columns) + self.assertEqual(self.expected_data_with_group, list(data)) + + def test_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': True} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_list_with_ignored_options(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + +class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): + + # The security group rule to be shown. + _security_group_rule = \ + network_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + columns = ( + 'direction', + 'ethertype', + 'id', + 'port_range_max', + 'port_range_min', + 'project_id', + 'protocol', + 'remote_group_id', + 'remote_ip_prefix', + 'security_group_id', + ) + + data = ( + _security_group_rule.direction, + _security_group_rule.ethertype, + _security_group_rule.id, + _security_group_rule.port_range_max, + _security_group_rule.port_range_min, + _security_group_rule.project_id, + _security_group_rule.protocol, + _security_group_rule.remote_group_id, + _security_group_rule.remote_ip_prefix, + _security_group_rule.security_group_id, + ) + + def setUp(self): + super(TestShowSecurityGroupRuleNetwork, self).setUp() + + self.network.find_security_group_rule = mock.Mock( + return_value=self._security_group_rule) + + # Get the command object to test + self.cmd = security_group_rule.ShowSecurityGroupRule( + self.app, self.namespace) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group_rule.id, + ] + verifylist = [ + ('rule', self._security_group_rule.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_security_group_rule.assert_called_once_with( + self._security_group_rule.id, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be shown. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + columns, data = \ + security_group_rule._format_security_group_rule_show( + _security_group_rule._info) + + def setUp(self): + super(TestShowSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Build a security group fake customized for this test. + security_group_rules = [self._security_group_rule._info] + security_group = fakes.FakeResource( + info=copy.deepcopy({'rules': security_group_rules}), + loaded=True) + security_group.rules = security_group_rules + self.compute.security_groups.list.return_value = [security_group] + + # Get the command object to test + self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group_rule.id, + ] + verifylist = [ + ('rule', self._security_group_rule.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_subnet.py b/openstackclient/tests/unit/network/v2/test_subnet.py new file mode 100644 index 00000000..e31db469 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_subnet.py @@ -0,0 +1,978 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.network.v2 import subnet as subnet_v2 +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSubnet(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestSubnet, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestCreateSubnet(TestSubnet): + + project = identity_fakes_v3.FakeProject.create_one_project() + domain = identity_fakes_v3.FakeDomain.create_one_domain() + # An IPv4 subnet to be created with mostly default values + _subnet = network_fakes.FakeSubnet.create_one_subnet( + attrs={ + 'tenant_id': project.id, + } + ) + + # Subnet pool to be used to create a subnet from a pool + _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() + + # An IPv4 subnet to be created using a specific subnet pool + _subnet_from_pool = network_fakes.FakeSubnet.create_one_subnet( + attrs={ + 'tenant_id': project.id, + 'subnetpool_id': _subnet_pool.id, + 'dns_nameservers': ['8.8.8.8', + '8.8.4.4'], + 'host_routes': [{'destination': '10.20.20.0/24', + 'nexthop': '10.20.20.1'}, + {'destination': '10.30.30.0/24', + 'nexthop': '10.30.30.1'}], + 'service_types': ['network:router_gateway', + 'network:floatingip_agent_gateway'], + } + ) + + # An IPv6 subnet to be created with most options specified + _subnet_ipv6 = network_fakes.FakeSubnet.create_one_subnet( + attrs={ + 'tenant_id': project.id, + 'cidr': 'fe80:0:0:a00a::/64', + 'enable_dhcp': True, + 'dns_nameservers': ['fe80:27ff:a00a:f00f::ffff', + 'fe80:37ff:a00a:f00f::ffff'], + 'allocation_pools': [{'start': 'fe80::a00a:0:c0de:0:100', + 'end': 'fe80::a00a:0:c0de:0:f000'}, + {'start': 'fe80::a00a:0:c0de:1:100', + 'end': 'fe80::a00a:0:c0de:1:f000'}], + 'host_routes': [{'destination': 'fe80:27ff:a00a:f00f::/64', + 'nexthop': 'fe80:27ff:a00a:f00f::1'}, + {'destination': 'fe80:37ff:a00a:f00f::/64', + 'nexthop': 'fe80:37ff:a00a:f00f::1'}], + 'ip_version': 6, + 'gateway_ip': 'fe80::a00a:0:c0de:0:1', + 'ipv6_address_mode': 'slaac', + 'ipv6_ra_mode': 'slaac', + 'subnetpool_id': 'None', + 'service_types': ['network:router_gateway', + 'network:floatingip_agent_gateway'], + } + ) + + # The network to be returned from find_network + _network = network_fakes.FakeNetwork.create_one_network( + attrs={ + 'id': _subnet.network_id, + } + ) + + # The network segment to be returned from find_segment + _network_segment = \ + network_fakes.FakeNetworkSegment.create_one_network_segment( + attrs={ + 'network_id': _subnet.network_id, + } + ) + + columns = ( + 'allocation_pools', + 'cidr', + 'dns_nameservers', + 'enable_dhcp', + 'gateway_ip', + 'host_routes', + 'id', + 'ip_version', + 'ipv6_address_mode', + 'ipv6_ra_mode', + 'name', + 'network_id', + 'project_id', + 'segment_id', + 'service_types', + 'subnetpool_id', + ) + + data = ( + subnet_v2._format_allocation_pools(_subnet.allocation_pools), + _subnet.cidr, + utils.format_list(_subnet.dns_nameservers), + _subnet.enable_dhcp, + _subnet.gateway_ip, + subnet_v2._format_host_routes(_subnet.host_routes), + _subnet.id, + _subnet.ip_version, + _subnet.ipv6_address_mode, + _subnet.ipv6_ra_mode, + _subnet.name, + _subnet.network_id, + _subnet.project_id, + _subnet.segment_id, + utils.format_list(_subnet.service_types), + _subnet.subnetpool_id, + ) + + data_subnet_pool = ( + subnet_v2._format_allocation_pools(_subnet_from_pool.allocation_pools), + _subnet_from_pool.cidr, + utils.format_list(_subnet_from_pool.dns_nameservers), + _subnet_from_pool.enable_dhcp, + _subnet_from_pool.gateway_ip, + subnet_v2._format_host_routes(_subnet_from_pool.host_routes), + _subnet_from_pool.id, + _subnet_from_pool.ip_version, + _subnet_from_pool.ipv6_address_mode, + _subnet_from_pool.ipv6_ra_mode, + _subnet_from_pool.name, + _subnet_from_pool.network_id, + _subnet_from_pool.project_id, + _subnet_from_pool.segment_id, + utils.format_list(_subnet_from_pool.service_types), + _subnet_from_pool.subnetpool_id, + ) + + data_ipv6 = ( + subnet_v2._format_allocation_pools(_subnet_ipv6.allocation_pools), + _subnet_ipv6.cidr, + utils.format_list(_subnet_ipv6.dns_nameservers), + _subnet_ipv6.enable_dhcp, + _subnet_ipv6.gateway_ip, + subnet_v2._format_host_routes(_subnet_ipv6.host_routes), + _subnet_ipv6.id, + _subnet_ipv6.ip_version, + _subnet_ipv6.ipv6_address_mode, + _subnet_ipv6.ipv6_ra_mode, + _subnet_ipv6.name, + _subnet_ipv6.network_id, + _subnet_ipv6.project_id, + _subnet_ipv6.segment_id, + utils.format_list(_subnet_ipv6.service_types), + _subnet_ipv6.subnetpool_id, + ) + + def setUp(self): + super(TestCreateSubnet, self).setUp() + + # Get the command object to test + self.cmd = subnet_v2.CreateSubnet(self.app, self.namespace) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + # Mock SDK calls for all tests. + self.network.find_network = mock.Mock(return_value=self._network) + self.network.find_segment = mock.Mock( + return_value=self._network_segment + ) + self.network.find_subnet_pool = mock.Mock( + return_value=self._subnet_pool + ) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Testing that a call without the required argument will fail and + # throw a "ParserExecption" + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_default_options(self): + # Mock SDK calls for this test. + self.network.create_subnet = mock.Mock(return_value=self._subnet) + self._network.id = self._subnet.network_id + + arglist = [ + "--subnet-range", self._subnet.cidr, + "--network", self._subnet.network_id, + self._subnet.name, + ] + verifylist = [ + ('name', self._subnet.name), + ('subnet_range', self._subnet.cidr), + ('network', self._subnet.network_id), + ('ip_version', self._subnet.ip_version), + ('gateway', 'auto'), + + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_subnet.assert_called_once_with(**{ + 'cidr': self._subnet.cidr, + 'ip_version': self._subnet.ip_version, + 'name': self._subnet.name, + 'network_id': self._subnet.network_id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_from_subnet_pool_options(self): + # Mock SDK calls for this test. + self.network.create_subnet = \ + mock.Mock(return_value=self._subnet_from_pool) + self._network.id = self._subnet_from_pool.network_id + + arglist = [ + self._subnet_from_pool.name, + "--subnet-pool", self._subnet_from_pool.subnetpool_id, + "--prefix-length", '24', + "--network", self._subnet_from_pool.network_id, + "--ip-version", str(self._subnet_from_pool.ip_version), + "--gateway", self._subnet_from_pool.gateway_ip, + "--dhcp", + ] + + for dns_addr in self._subnet_from_pool.dns_nameservers: + arglist.append('--dns-nameserver') + arglist.append(dns_addr) + + for host_route in self._subnet_from_pool.host_routes: + arglist.append('--host-route') + value = 'gateway=' + host_route.get('nexthop', '') + \ + ',destination=' + host_route.get('destination', '') + arglist.append(value) + + for service_type in self._subnet_from_pool.service_types: + arglist.append('--service-type') + arglist.append(service_type) + + verifylist = [ + ('name', self._subnet_from_pool.name), + ('prefix_length', '24'), + ('network', self._subnet_from_pool.network_id), + ('ip_version', self._subnet_from_pool.ip_version), + ('gateway', self._subnet_from_pool.gateway_ip), + ('dns_nameservers', self._subnet_from_pool.dns_nameservers), + ('dhcp', self._subnet_from_pool.enable_dhcp), + ('host_routes', subnet_v2.convert_entries_to_gateway( + self._subnet_from_pool.host_routes)), + ('subnet_pool', self._subnet_from_pool.subnetpool_id), + ('service_types', self._subnet_from_pool.service_types), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_subnet.assert_called_once_with(**{ + 'dns_nameservers': self._subnet_from_pool.dns_nameservers, + 'enable_dhcp': self._subnet_from_pool.enable_dhcp, + 'gateway_ip': self._subnet_from_pool.gateway_ip, + 'host_routes': self._subnet_from_pool.host_routes, + 'ip_version': self._subnet_from_pool.ip_version, + 'name': self._subnet_from_pool.name, + 'network_id': self._subnet_from_pool.network_id, + 'prefixlen': '24', + 'subnetpool_id': self._subnet_from_pool.subnetpool_id, + 'service_types': self._subnet_from_pool.service_types, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data_subnet_pool, data) + + def test_create_options_subnet_range_ipv6(self): + # Mock SDK calls for this test. + self.network.create_subnet = mock.Mock(return_value=self._subnet_ipv6) + self._network.id = self._subnet_ipv6.network_id + + arglist = [ + self._subnet_ipv6.name, + "--subnet-range", self._subnet_ipv6.cidr, + "--network", self._subnet_ipv6.network_id, + "--ip-version", str(self._subnet_ipv6.ip_version), + "--ipv6-ra-mode", self._subnet_ipv6.ipv6_ra_mode, + "--ipv6-address-mode", self._subnet_ipv6.ipv6_address_mode, + "--gateway", self._subnet_ipv6.gateway_ip, + "--dhcp", + ] + + for dns_addr in self._subnet_ipv6.dns_nameservers: + arglist.append('--dns-nameserver') + arglist.append(dns_addr) + + for host_route in self._subnet_ipv6.host_routes: + arglist.append('--host-route') + value = 'gateway=' + host_route.get('nexthop', '') + \ + ',destination=' + host_route.get('destination', '') + arglist.append(value) + + for pool in self._subnet_ipv6.allocation_pools: + arglist.append('--allocation-pool') + value = 'start=' + pool.get('start', '') + \ + ',end=' + pool.get('end', '') + arglist.append(value) + + for service_type in self._subnet_ipv6.service_types: + arglist.append('--service-type') + arglist.append(service_type) + + verifylist = [ + ('name', self._subnet_ipv6.name), + ('subnet_range', self._subnet_ipv6.cidr), + ('network', self._subnet_ipv6.network_id), + ('ip_version', self._subnet_ipv6.ip_version), + ('ipv6_ra_mode', self._subnet_ipv6.ipv6_ra_mode), + ('ipv6_address_mode', self._subnet_ipv6.ipv6_address_mode), + ('gateway', self._subnet_ipv6.gateway_ip), + ('dns_nameservers', self._subnet_ipv6.dns_nameservers), + ('dhcp', self._subnet_ipv6.enable_dhcp), + ('host_routes', subnet_v2.convert_entries_to_gateway( + self._subnet_ipv6.host_routes)), + ('allocation_pools', self._subnet_ipv6.allocation_pools), + ('service_types', self._subnet_ipv6.service_types), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_subnet.assert_called_once_with(**{ + 'cidr': self._subnet_ipv6.cidr, + 'dns_nameservers': self._subnet_ipv6.dns_nameservers, + 'enable_dhcp': self._subnet_ipv6.enable_dhcp, + 'gateway_ip': self._subnet_ipv6.gateway_ip, + 'host_routes': self._subnet_ipv6.host_routes, + 'ip_version': self._subnet_ipv6.ip_version, + 'ipv6_address_mode': self._subnet_ipv6.ipv6_address_mode, + 'ipv6_ra_mode': self._subnet_ipv6.ipv6_ra_mode, + 'name': self._subnet_ipv6.name, + 'network_id': self._subnet_ipv6.network_id, + 'allocation_pools': self._subnet_ipv6.allocation_pools, + 'service_types': self._subnet_ipv6.service_types, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data_ipv6, data) + + def test_create_no_beta_command_options(self): + arglist = [ + "--subnet-range", self._subnet.cidr, + "--network-segment", self._network_segment.id, + "--network", self._subnet.network_id, + self._subnet.name, + ] + verifylist = [ + ('name', self._subnet.name), + ('subnet_range', self._subnet.cidr), + ('network-segment', self._network_segment.id), + ('network', self._subnet.network_id), + ] + self.app.options.os_beta_command = False + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_create_with_network_segment(self): + # Mock SDK calls for this test. + self.network.create_subnet = mock.Mock(return_value=self._subnet) + self._network.id = self._subnet.network_id + + arglist = [ + "--subnet-range", self._subnet.cidr, + "--network-segment", self._network_segment.id, + "--network", self._subnet.network_id, + self._subnet.name, + ] + verifylist = [ + ('name', self._subnet.name), + ('subnet_range', self._subnet.cidr), + ('network_segment', self._network_segment.id), + ('network', self._subnet.network_id), + ('ip_version', self._subnet.ip_version), + ('gateway', 'auto'), + + ] + + self.app.options.os_beta_command = True + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_subnet.assert_called_once_with(**{ + 'cidr': self._subnet.cidr, + 'ip_version': self._subnet.ip_version, + 'name': self._subnet.name, + 'network_id': self._subnet.network_id, + 'segment_id': self._network_segment.id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSubnet(TestSubnet): + + # The subnets to delete. + _subnets = network_fakes.FakeSubnet.create_subnets(count=2) + + def setUp(self): + super(TestDeleteSubnet, self).setUp() + + self.network.delete_subnet = mock.Mock(return_value=None) + + self.network.find_subnet = ( + network_fakes.FakeSubnet.get_subnets(self._subnets)) + + # Get the command object to test + self.cmd = subnet_v2.DeleteSubnet(self.app, self.namespace) + + def test_subnet_delete(self): + arglist = [ + self._subnets[0].name, + ] + verifylist = [ + ('subnet', [self._subnets[0].name]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.delete_subnet.assert_called_once_with(self._subnets[0]) + self.assertIsNone(result) + + def test_multi_subnets_delete(self): + arglist = [] + verifylist = [] + + for s in self._subnets: + arglist.append(s.name) + verifylist = [ + ('subnet', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._subnets: + calls.append(call(s)) + self.network.delete_subnet.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_subnets_delete_with_exception(self): + arglist = [ + self._subnets[0].name, + 'unexist_subnet', + ] + verifylist = [ + ('subnet', + [self._subnets[0].name, 'unexist_subnet']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._subnets[0], exceptions.CommandError] + self.network.find_subnet = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 subnets failed to delete.', str(e)) + + self.network.find_subnet.assert_any_call( + self._subnets[0].name, ignore_missing=False) + self.network.find_subnet.assert_any_call( + 'unexist_subnet', ignore_missing=False) + self.network.delete_subnet.assert_called_once_with( + self._subnets[0] + ) + + +class TestListSubnet(TestSubnet): + # The subnets going to be listed up. + _subnet = network_fakes.FakeSubnet.create_subnets(count=3) + + columns = ( + 'ID', + 'Name', + 'Network', + 'Subnet', + ) + columns_long = columns + ( + 'Project', + 'DHCP', + 'Name Servers', + 'Allocation Pools', + 'Host Routes', + 'IP Version', + 'Gateway', + 'Service Types', + ) + + data = [] + for subnet in _subnet: + data.append(( + subnet.id, + subnet.name, + subnet.network_id, + subnet.cidr, + )) + + data_long = [] + for subnet in _subnet: + data_long.append(( + subnet.id, + subnet.name, + subnet.network_id, + subnet.cidr, + subnet.tenant_id, + subnet.enable_dhcp, + utils.format_list(subnet.dns_nameservers), + subnet_v2._format_allocation_pools(subnet.allocation_pools), + utils.format_list(subnet.host_routes), + subnet.ip_version, + subnet.gateway_ip, + utils.format_list(subnet.service_types), + )) + + def setUp(self): + super(TestListSubnet, self).setUp() + + # Get the command object to test + self.cmd = subnet_v2.ListSubnet(self.app, self.namespace) + + self.network.subnets = mock.Mock(return_value=self._subnet) + + def test_subnet_list_no_options(self): + arglist = [] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.subnets.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.subnets.assert_called_once_with() + self.assertEqual(self.columns_long, columns) + self.assertEqual(self.data_long, list(data)) + + def test_subnet_list_ip_version(self): + arglist = [ + '--ip-version', str(4), + ] + verifylist = [ + ('ip_version', 4), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'ip_version': 4} + + self.network.subnets.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_list_dhcp(self): + arglist = [ + '--dhcp', + ] + verifylist = [ + ('dhcp', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'enable_dhcp': True} + + self.network.subnets.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_list_no_dhcp(self): + arglist = [ + '--no-dhcp', + ] + verifylist = [ + ('no_dhcp', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'enable_dhcp': False} + + self.network.subnets.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_list_service_type(self): + arglist = [ + '--service-type', 'network:router_gateway', + ] + verifylist = [ + ('service_types', ['network:router_gateway']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'service_types': ['network:router_gateway']} + + self.network.subnets.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_list_service_type_multiple(self): + arglist = [ + '--service-type', 'network:router_gateway', + '--service-type', 'network:floatingip_agent_gateway', + ] + verifylist = [ + ('service_types', ['network:router_gateway', + 'network:floatingip_agent_gateway']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'service_types': ['network:router_gateway', + 'network:floatingip_agent_gateway']} + + self.network.subnets.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetSubnet(TestSubnet): + + _subnet = network_fakes.FakeSubnet.create_one_subnet() + + def setUp(self): + super(TestSetSubnet, self).setUp() + self.network.update_subnet = mock.Mock(return_value=None) + self.network.find_subnet = mock.Mock(return_value=self._subnet) + self.cmd = subnet_v2.SetSubnet(self.app, self.namespace) + + def test_set_this(self): + arglist = [ + "--name", "new_subnet", + "--dhcp", + "--gateway", self._subnet.gateway_ip, + self._subnet.name, + ] + verifylist = [ + ('name', "new_subnet"), + ('dhcp', True), + ('gateway', self._subnet.gateway_ip), + ('subnet', self._subnet.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'enable_dhcp': True, + 'gateway_ip': self._subnet.gateway_ip, + 'name': "new_subnet", + } + self.network.update_subnet.assert_called_with(self._subnet, **attrs) + self.assertIsNone(result) + + def test_set_that(self): + arglist = [ + "--name", "new_subnet", + "--no-dhcp", + "--gateway", "none", + self._subnet.name, + ] + verifylist = [ + ('name', "new_subnet"), + ('no_dhcp', True), + ('gateway', "none"), + ('subnet', self._subnet.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'enable_dhcp': False, + 'gateway_ip': None, + 'name': "new_subnet", + } + self.network.update_subnet.assert_called_with(self._subnet, **attrs) + self.assertIsNone(result) + + def test_set_nothing(self): + arglist = [self._subnet.name, ] + verifylist = [('subnet', self._subnet.name)] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_subnet.assert_called_with(self._subnet, **attrs) + self.assertIsNone(result) + + def test_append_options(self): + _testsubnet = network_fakes.FakeSubnet.create_one_subnet( + {'dns_nameservers': ["10.0.0.1"], + 'service_types': ["network:router_gateway"]}) + self.network.find_subnet = mock.Mock(return_value=_testsubnet) + arglist = [ + '--dns-nameserver', '10.0.0.2', + '--service-type', 'network:floatingip_agent_gateway', + _testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['10.0.0.2']), + ('service_types', ['network:floatingip_agent_gateway']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = { + 'dns_nameservers': ['10.0.0.2', '10.0.0.1'], + 'service_types': ['network:floatingip_agent_gateway', + 'network:router_gateway'], + } + self.network.update_subnet.assert_called_once_with( + _testsubnet, **attrs) + self.assertIsNone(result) + + +class TestShowSubnet(TestSubnet): + # The subnets to be shown + _subnet = network_fakes.FakeSubnet.create_one_subnet() + + columns = ( + 'allocation_pools', + 'cidr', + 'dns_nameservers', + 'enable_dhcp', + 'gateway_ip', + 'host_routes', + 'id', + 'ip_version', + 'ipv6_address_mode', + 'ipv6_ra_mode', + 'name', + 'network_id', + 'project_id', + 'segment_id', + 'service_types', + 'subnetpool_id', + ) + + data = ( + subnet_v2._format_allocation_pools(_subnet.allocation_pools), + _subnet.cidr, + utils.format_list(_subnet.dns_nameservers), + _subnet.enable_dhcp, + _subnet.gateway_ip, + utils.format_list(_subnet.host_routes), + _subnet.id, + _subnet.ip_version, + _subnet.ipv6_address_mode, + _subnet.ipv6_ra_mode, + _subnet.name, + _subnet.network_id, + _subnet.tenant_id, + _subnet.segment_id, + utils.format_list(_subnet.service_types), + _subnet.subnetpool_id, + ) + + def setUp(self): + super(TestShowSubnet, self).setUp() + + # Get the command object to test + self.cmd = subnet_v2.ShowSubnet(self.app, self.namespace) + + self.network.find_subnet = mock.Mock(return_value=self._subnet) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Testing that a call without the required argument will fail and + # throw a "ParserExecption" + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._subnet.name, + ] + verifylist = [ + ('subnet', self._subnet.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_subnet.assert_called_once_with( + self._subnet.name, ignore_missing=False) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestUnsetSubnet(TestSubnet): + + def setUp(self): + super(TestUnsetSubnet, self).setUp() + self._testsubnet = network_fakes.FakeSubnet.create_one_subnet( + {'dns_nameservers': ['8.8.8.8', + '8.8.8.4'], + 'host_routes': [{'destination': '10.20.20.0/24', + 'nexthop': '10.20.20.1'}, + {'destination': '10.30.30.30/24', + 'nexthop': '10.30.30.1'}], + 'allocation_pools': [{'start': '8.8.8.100', + 'end': '8.8.8.150'}, + {'start': '8.8.8.160', + 'end': '8.8.8.170'}], + 'service_types': ['network:router_gateway', + 'network:floatingip_agent_gateway'], }) + self.network.find_subnet = mock.Mock(return_value=self._testsubnet) + self.network.update_subnet = mock.Mock(return_value=None) + # Get the command object to test + self.cmd = subnet_v2.UnsetSubnet(self.app, self.namespace) + + def test_unset_subnet_params(self): + arglist = [ + '--dns-nameserver', '8.8.8.8', + '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', + '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', + '--service-type', 'network:router_gateway', + self._testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['8.8.8.8']), + ('host_routes', [{ + "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), + ('allocation_pools', [{ + 'start': '8.8.8.100', 'end': '8.8.8.150'}]), + ('service_types', ['network:router_gateway']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'dns_nameservers': ['8.8.8.4'], + 'host_routes': [{ + "destination": "10.20.20.0/24", "nexthop": "10.20.20.1"}], + 'allocation_pools': [{'start': '8.8.8.160', 'end': '8.8.8.170'}], + 'service_types': ['network:floatingip_agent_gateway'], + } + self.network.update_subnet.assert_called_once_with( + self._testsubnet, **attrs) + self.assertIsNone(result) + + def test_unset_subnet_wrong_host_routes(self): + arglist = [ + '--dns-nameserver', '8.8.8.8', + '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.2', + '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', + self._testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['8.8.8.8']), + ('host_routes', [{ + "destination": "10.30.30.30/24", "gateway": "10.30.30.2"}]), + ('allocation_pools', [{ + 'start': '8.8.8.100', 'end': '8.8.8.150'}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, parsed_args) + + def test_unset_subnet_wrong_allocation_pool(self): + arglist = [ + '--dns-nameserver', '8.8.8.8', + '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', + '--allocation-pool', 'start=8.8.8.100,end=8.8.8.156', + self._testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['8.8.8.8']), + ('host_routes', [{ + "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), + ('allocation_pools', [{ + 'start': '8.8.8.100', 'end': '8.8.8.156'}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, parsed_args) + + def test_unset_subnet_wrong_dns_nameservers(self): + arglist = [ + '--dns-nameserver', '8.8.8.1', + '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', + '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', + self._testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['8.8.8.1']), + ('host_routes', [{ + "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), + ('allocation_pools', [{ + 'start': '8.8.8.100', 'end': '8.8.8.150'}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, parsed_args) + + def test_unset_subnet_wrong_service_type(self): + arglist = [ + '--dns-nameserver', '8.8.8.8', + '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1', + '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150', + '--service-type', 'network:dhcp', + self._testsubnet.name, + ] + verifylist = [ + ('dns_nameservers', ['8.8.8.8']), + ('host_routes', [{ + "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]), + ('allocation_pools', [{ + 'start': '8.8.8.100', 'end': '8.8.8.150'}]), + ('service_types', ['network:dhcp']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, parsed_args) diff --git a/openstackclient/tests/unit/network/v2/test_subnet_pool.py b/openstackclient/tests/unit/network/v2/test_subnet_pool.py new file mode 100644 index 00000000..0d9494c0 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_subnet_pool.py @@ -0,0 +1,722 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import argparse +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.network.v2 import subnet_pool +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSubnetPool(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestSubnetPool, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + + +class TestCreateSubnetPool(TestSubnetPool): + + project = identity_fakes_v3.FakeProject.create_one_project() + domain = identity_fakes_v3.FakeDomain.create_one_domain() + # The new subnet pool to create. + _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() + + _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() + + columns = ( + 'address_scope_id', + 'default_prefixlen', + 'default_quota', + 'id', + 'ip_version', + 'is_default', + 'max_prefixlen', + 'min_prefixlen', + 'name', + 'prefixes', + 'project_id', + 'shared', + ) + data = ( + _subnet_pool.address_scope_id, + _subnet_pool.default_prefixlen, + _subnet_pool.default_quota, + _subnet_pool.id, + _subnet_pool.ip_version, + _subnet_pool.is_default, + _subnet_pool.max_prefixlen, + _subnet_pool.min_prefixlen, + _subnet_pool.name, + utils.format_list(_subnet_pool.prefixes), + _subnet_pool.project_id, + _subnet_pool.shared, + ) + + def setUp(self): + super(TestCreateSubnetPool, self).setUp() + + self.network.create_subnet_pool = mock.Mock( + return_value=self._subnet_pool) + + # Get the command object to test + self.cmd = subnet_pool.CreateSubnetPool(self.app, self.namespace) + + self.network.find_address_scope = mock.Mock( + return_value=self._address_scope) + + self.projects_mock.get.return_value = self.project + self.domains_mock.get.return_value = self.domain + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_no_pool_prefix(self): + """Make sure --pool-prefix is a required argument""" + arglist = [ + self._subnet_pool.name, + ] + verifylist = [ + ('name', self._subnet_pool.name), + ] + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + '--pool-prefix', '10.0.10.0/24', + self._subnet_pool.name, + ] + verifylist = [ + ('prefixes', ['10.0.10.0/24']), + ('name', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_subnet_pool.assert_called_once_with(**{ + 'prefixes': ['10.0.10.0/24'], + 'name': self._subnet_pool.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_prefixlen_options(self): + arglist = [ + '--default-prefix-length', self._subnet_pool.default_prefixlen, + '--max-prefix-length', self._subnet_pool.max_prefixlen, + '--min-prefix-length', self._subnet_pool.min_prefixlen, + '--pool-prefix', '10.0.10.0/24', + self._subnet_pool.name, + ] + verifylist = [ + ('default_prefix_length', + int(self._subnet_pool.default_prefixlen)), + ('max_prefix_length', int(self._subnet_pool.max_prefixlen)), + ('min_prefix_length', int(self._subnet_pool.min_prefixlen)), + ('name', self._subnet_pool.name), + ('prefixes', ['10.0.10.0/24']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_subnet_pool.assert_called_once_with(**{ + 'default_prefixlen': int(self._subnet_pool.default_prefixlen), + 'max_prefixlen': int(self._subnet_pool.max_prefixlen), + 'min_prefixlen': int(self._subnet_pool.min_prefixlen), + 'prefixes': ['10.0.10.0/24'], + 'name': self._subnet_pool.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_len_negative(self): + arglist = [ + self._subnet_pool.name, + '--min-prefix-length', '-16', + ] + verifylist = [ + ('subnet_pool', self._subnet_pool.name), + ('min_prefix_length', '-16'), + ] + + self.assertRaises(argparse.ArgumentTypeError, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_project_domain(self): + arglist = [ + '--pool-prefix', '10.0.10.0/24', + "--project", self.project.name, + "--project-domain", self.domain.name, + self._subnet_pool.name, + ] + verifylist = [ + ('prefixes', ['10.0.10.0/24']), + ('project', self.project.name), + ('project_domain', self.domain.name), + ('name', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_subnet_pool.assert_called_once_with(**{ + 'prefixes': ['10.0.10.0/24'], + 'tenant_id': self.project.id, + 'name': self._subnet_pool.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_address_scope_option(self): + arglist = [ + '--pool-prefix', '10.0.10.0/24', + '--address-scope', self._address_scope.id, + self._subnet_pool.name, + ] + verifylist = [ + ('prefixes', ['10.0.10.0/24']), + ('address_scope', self._address_scope.id), + ('name', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_subnet_pool.assert_called_once_with(**{ + 'prefixes': ['10.0.10.0/24'], + 'address_scope_id': self._address_scope.id, + 'name': self._subnet_pool.name, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_default_and_shared_options(self): + arglist = [ + '--pool-prefix', '10.0.10.0/24', + '--default', + '--share', + self._subnet_pool.name, + ] + verifylist = [ + ('prefixes', ['10.0.10.0/24']), + ('default', True), + ('share', True), + ('name', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_subnet_pool.assert_called_once_with(**{ + 'is_default': True, + 'name': self._subnet_pool.name, + 'prefixes': ['10.0.10.0/24'], + 'shared': True, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSubnetPool(TestSubnetPool): + + # The subnet pools to delete. + _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=2) + + def setUp(self): + super(TestDeleteSubnetPool, self).setUp() + + self.network.delete_subnet_pool = mock.Mock(return_value=None) + + self.network.find_subnet_pool = ( + network_fakes.FakeSubnetPool.get_subnet_pools(self._subnet_pools) + ) + + # Get the command object to test + self.cmd = subnet_pool.DeleteSubnetPool(self.app, self.namespace) + + def test_subnet_pool_delete(self): + arglist = [ + self._subnet_pools[0].name, + ] + verifylist = [ + ('subnet_pool', [self._subnet_pools[0].name]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.delete_subnet_pool.assert_called_once_with( + self._subnet_pools[0]) + self.assertIsNone(result) + + def test_multi_subnet_pools_delete(self): + arglist = [] + verifylist = [] + + for s in self._subnet_pools: + arglist.append(s.name) + verifylist = [ + ('subnet_pool', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._subnet_pools: + calls.append(call(s)) + self.network.delete_subnet_pool.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_subnet_pools_delete_with_exception(self): + arglist = [ + self._subnet_pools[0].name, + 'unexist_subnet_pool', + ] + verifylist = [ + ('subnet_pool', + [self._subnet_pools[0].name, 'unexist_subnet_pool']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._subnet_pools[0], exceptions.CommandError] + self.network.find_subnet_pool = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 subnet pools failed to delete.', str(e)) + + self.network.find_subnet_pool.assert_any_call( + self._subnet_pools[0].name, ignore_missing=False) + self.network.find_subnet_pool.assert_any_call( + 'unexist_subnet_pool', ignore_missing=False) + self.network.delete_subnet_pool.assert_called_once_with( + self._subnet_pools[0] + ) + + +class TestListSubnetPool(TestSubnetPool): + # The subnet pools going to be listed up. + _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=3) + + columns = ( + 'ID', + 'Name', + 'Prefixes', + ) + columns_long = columns + ( + 'Default Prefix Length', + 'Address Scope', + 'Default Subnet Pool', + 'Shared', + ) + + data = [] + for pool in _subnet_pools: + data.append(( + pool.id, + pool.name, + utils.format_list(pool.prefixes), + )) + + data_long = [] + for pool in _subnet_pools: + data_long.append(( + pool.id, + pool.name, + utils.format_list(pool.prefixes), + pool.default_prefixlen, + pool.address_scope_id, + pool.is_default, + pool.shared, + )) + + def setUp(self): + super(TestListSubnetPool, self).setUp() + + # Get the command object to test + self.cmd = subnet_pool.ListSubnetPool(self.app, self.namespace) + + self.network.subnet_pools = mock.Mock(return_value=self._subnet_pools) + + def test_subnet_pool_list_no_option(self): + arglist = [] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.subnet_pools.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_subnet_pool_list_long(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.subnet_pools.assert_called_once_with() + self.assertEqual(self.columns_long, columns) + self.assertEqual(self.data_long, list(data)) + + +class TestSetSubnetPool(TestSubnetPool): + + # The subnet_pool to set. + _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() + + _address_scope = network_fakes.FakeAddressScope.create_one_address_scope() + + def setUp(self): + super(TestSetSubnetPool, self).setUp() + + self.network.update_subnet_pool = mock.Mock(return_value=None) + + self.network.find_subnet_pool = mock.Mock( + return_value=self._subnet_pool) + + self.network.find_address_scope = mock.Mock( + return_value=self._address_scope) + + # Get the command object to test + self.cmd = subnet_pool.SetSubnetPool(self.app, self.namespace) + + def test_set_this(self): + arglist = [ + '--name', 'noob', + '--default-prefix-length', '8', + '--min-prefix-length', '8', + self._subnet_pool.name, + ] + verifylist = [ + ('name', 'noob'), + ('default_prefix_length', 8), + ('min_prefix_length', 8), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'name': 'noob', + 'default_prefixlen': 8, + 'min_prefixlen': 8, + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_that(self): + arglist = [ + '--pool-prefix', '10.0.1.0/24', + '--pool-prefix', '10.0.2.0/24', + '--max-prefix-length', '16', + self._subnet_pool.name, + ] + verifylist = [ + ('prefixes', ['10.0.1.0/24', '10.0.2.0/24']), + ('max_prefix_length', 16), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + prefixes = ['10.0.1.0/24', '10.0.2.0/24'] + prefixes.extend(self._subnet_pool.prefixes) + attrs = { + 'prefixes': prefixes, + 'max_prefixlen': 16, + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_nothing(self): + arglist = [self._subnet_pool.name, ] + verifylist = [('subnet_pool', self._subnet_pool.name), ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_len_negative(self): + arglist = [ + '--max-prefix-length', '-16', + self._subnet_pool.name, + ] + verifylist = [ + ('max_prefix_length', '-16'), + ('subnet_pool', self._subnet_pool.name), + ] + + self.assertRaises(argparse.ArgumentTypeError, self.check_parser, + self.cmd, arglist, verifylist) + + def test_set_address_scope(self): + arglist = [ + '--address-scope', self._address_scope.id, + self._subnet_pool.name, + ] + verifylist = [ + ('address_scope', self._address_scope.id), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'address_scope_id': self._address_scope.id, + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_no_address_scope(self): + arglist = [ + '--no-address-scope', + self._subnet_pool.name, + ] + verifylist = [ + ('no_address_scope', True), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'address_scope_id': None, + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_no_address_scope_conflict(self): + arglist = [ + '--address-scope', self._address_scope.id, + '--no-address-scope', + self._subnet_pool.name, + ] + verifylist = [ + ('address_scope', self._address_scope.id), + ('no_address_scope', True), + ('subnet_pool', self._subnet_pool.name), + ] + + # Exclusive arguments will conflict here. + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_set_default(self): + arglist = [ + '--default', + self._subnet_pool.name, + ] + verifylist = [ + ('default', True), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'is_default': True + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_no_default(self): + arglist = [ + '--no-default', + self._subnet_pool.name, + ] + verifylist = [ + ('no_default', True), + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'is_default': False, + } + self.network.update_subnet_pool.assert_called_once_with( + self._subnet_pool, **attrs) + self.assertIsNone(result) + + def test_set_no_default_conflict(self): + arglist = [ + '--default', + '--no-default', + self._subnet_pool.name, + ] + verifylist = [ + ('default', True), + ('no_default', True), + ('subnet_pool', self._subnet_pool.name), + ] + + # Exclusive arguments will conflict here. + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + +class TestShowSubnetPool(TestSubnetPool): + + # The subnet_pool to set. + _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() + + columns = ( + 'address_scope_id', + 'default_prefixlen', + 'default_quota', + 'id', + 'ip_version', + 'is_default', + 'max_prefixlen', + 'min_prefixlen', + 'name', + 'prefixes', + 'project_id', + 'shared', + ) + + data = ( + _subnet_pool.address_scope_id, + _subnet_pool.default_prefixlen, + _subnet_pool.default_quota, + _subnet_pool.id, + _subnet_pool.ip_version, + _subnet_pool.is_default, + _subnet_pool.max_prefixlen, + _subnet_pool.min_prefixlen, + _subnet_pool.name, + utils.format_list(_subnet_pool.prefixes), + _subnet_pool.tenant_id, + _subnet_pool.shared, + ) + + def setUp(self): + super(TestShowSubnetPool, self).setUp() + + self.network.find_subnet_pool = mock.Mock( + return_value=self._subnet_pool + ) + + # Get the command object to test + self.cmd = subnet_pool.ShowSubnetPool(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._subnet_pool.name, + ] + verifylist = [ + ('subnet_pool', self._subnet_pool.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_subnet_pool.assert_called_once_with( + self._subnet_pool.name, + ignore_missing=False + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestUnsetSubnetPool(TestSubnetPool): + + def setUp(self): + super(TestUnsetSubnetPool, self).setUp() + self._subnetpool = network_fakes.FakeSubnetPool.create_one_subnet_pool( + {'prefixes': ['10.0.10.0/24', '10.1.10.0/24', + '10.2.10.0/24'], }) + self.network.find_subnet_pool = mock.Mock( + return_value=self._subnetpool) + self.network.update_subnet_pool = mock.Mock(return_value=None) + # Get the command object to test + self.cmd = subnet_pool.UnsetSubnetPool(self.app, self.namespace) + + def test_unset_subnet_pool(self): + arglist = [ + '--pool-prefix', '10.0.10.0/24', + '--pool-prefix', '10.1.10.0/24', + self._subnetpool.name, + ] + verifylist = [('prefixes', ['10.0.10.0/24', '10.1.10.0/24'])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = {'prefixes': ['10.2.10.0/24']} + self.network.update_subnet_pool.assert_called_once_with( + self._subnetpool, **attrs) + self.assertIsNone(result) + + def test_unset_subnet_pool_prefix_not_existent(self): + arglist = [ + '--pool-prefix', '10.100.1.1/25', + self._subnetpool.name, + ] + verifylist = [('prefixes', ['10.100.1.1/25'])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, + self.cmd.take_action, + parsed_args) |
